我们配置时需要优先配置好redis-cache-spring-boot-starter,使用@AutoConfigureBefore({CustomRedisConfig.class, CacheNullValuesHandle.class})
直接使用,不再重复造轮子,然后我们根据自定义属性配置文件CustomRedisProperties来创建RedissonClient的Bean 。
编写META-INF/spring.factories进行自动配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.itdl.lock.config.RedisLockConfig
在测试模块缓存service添加分布式锁
@Cacheable(cacheNames = "demo2#3", key = "#id")public TestEntity getById2(Long id){// 创建分布式锁RLock lock = redissonClient.getLock("demo2_lock");// 加锁lock.lock(10, TimeUnit.SECONDS);if (id > 1000){log.info("id={}没有查询到数据,返回空值", id);return null;}TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);log.info("模拟查询数据库:{}", testEntity);// 释放锁lock.unlock();return testEntity;}
我们这里的@Cacheable没有加sync=true, 此时并发请求会存在线程安全问题,但是我们在方法体局部添加了分布式锁,因此我们的程序会按照顺序执行 。
如果我们的参数被定死了,最终请求会被先存储到缓存,所以后续的查询就会走缓存,这能很好的测试分布式锁的效果 。
编写测试程序
@SpringBootTestpublic class TestRedisLockRunner6 {@Autowiredprivate MyTestService myTestService;// 创建一个固定线程池private ExecutorService executorService = Executors.newFixedThreadPool(16);/*** 多线程访问请求,测试切面的线程安全性*/@Testpublic void testMultiMyTestService() throws InterruptedException {for (int i = 0; i < 100; i++) {executorService.submit(() -> {// 每次查询同一个参数TestEntity t1 = myTestService.getById2(1L);});}// 主线程休息10秒种Thread.sleep(10000);}}
文章插图
我们可以看到,结果并没有符合我们预期,但是又部分符合我们预期,为什么呢?
因为我们的@Cacheable是存在线程安全问题的,因为它先查询缓存这个操作存在并发问题,查询时就同时有N个请求进入@Cacheable, 并且都查询没有缓存 。
然后同时执行方法体,但方法体加了分布式锁,所以排队进行处理,因此序号有序 。
但打印数量不足总数,是因为这一批次没有全部到达@Cacheable,而是执行完毕之后才将缓存回填,所以后续的请求就是走缓存了 。
解决方案:我们加上sync=true之后就能实现,只查询一次数据库,就可以回填缓存了 。如果我们去掉@Cacheable注解,则会每一次都查询数据库,但是时按照顺序执行的 。
加上sync=true测试
文章插图
效果达到了我们的预期,继续看一下去掉@Cacheable注解的情况 。
去掉@Cacheable注解测试
文章插图
我们的分布式锁功能是没有问题的,但是每次我们都需要执行getLock(), lock.lock(), lock.unlock(),是不是很麻烦,能不能一个注解搞定?
当然是可以的 。
4.2、封装注解版分布式锁编写@RedisLock注解
/** * 自定义Redis分布式锁 */@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface RedisLock {/**分布式锁的名称,支持el表达式*/String lockName() default "";/**锁类型 默认为可重入锁*/LockType lockType() default REENTRANT_LOCK;/**获取锁等待时间,默认60秒*/long waitTime() default 30000L;/** 锁自动释放时间,默认60秒*/long leaseTime() default 60000L;/*** 被加锁方法执行完是否立即释放锁*/boolean immediatelyUnLock() default true;/** 时间单位, 默认毫秒*/TimeUnit timeUnit() default TimeUnit.MILLISECONDS;}
编写分布式锁切面RedisLockAop/** * Redis分布式锁的切面逻辑实现 */@ConditionalOnProperty(value = https://www.isolves.com/it/cxkf/bk/2023-08-28/"redis.enable", havingValue = "true")// 开启redis.enable=true时生效@AutoConfigureBefore(RedisLockConfig.class)@Aspect@Configuration@Slf4jpublic class RedisLockAop {@Resourceprivate RedissonClient redissonClient;/*** 切点*/@Pointcut("@annotation(com.itdl.lock.anno.RedisLock)")public void pointcut(){}/*** 环绕通知 注解针对的是方法,这里切点也获取方法进行处理就可以了*/@Around("pointcut()")public Object around(ProceedingJoinPoint joinPoint) throws Throwable {// 获取方法Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();// 获取方法上的分布式锁注解RedisLock redisLock = method.getDeclaredAnnotation(RedisLock.class);// 获取注解的参数// 锁名称String lockName = redisLock.lockName();// 锁类型LockType lockType = redisLock.lockType();// 获取RedissonClient的LockRLock lock = getRLock(lockName, lockType, redisLock);//获取到锁后, 开始执行方法,执行完毕后释放锁try {log.debug("=========>>>获取锁成功, 即将执行业务逻辑:{}", lockName);Object proceed = joinPoint.proceed();// 释放锁if (redisLock.immediatelyUnLock()) {//是否立即释放锁lock.unlock();}log.debug("=========>>>获取锁成功且执行业务逻辑成功:{}", lockName);return proceed;} catch (Exception e) {log.error("=========>>>获取锁成功但执行业务逻辑失败:{}", lockName);e.printStackTrace();throw new RedisLockException(LockErrCode.EXEC_BUSINESS_ERR);}finally {// 查询当前线程是否保持此锁定 被锁定则解锁lock.unlock();log.debug("=========>>>释放锁成功:{}", lockName);}}/*** 根据锁名称和类型创建锁* @param lockName 锁名称* @param lockType 锁类型* @return 锁*/private RLock getRLock(String lockName, LockType lockType, RedisLock redisLock) throws InterruptedException {RLock lock;switch (lockType){case FAIR_LOCK:lock = redissonClient.getFairLock(lockName);break;case READ_LOCK:lock = redissonClient.getReadWriteLock(lockName).readLock();break;case WRITE_LOCK:lock = redissonClient.getReadWriteLock(lockName).writeLock();break;default:// 默认加可重入锁,也就是普通的分布式锁lock = redissonClient.getLock(lockName);break;}// 首先尝试获取锁,如果在规定时间内没有获取到锁,则调用lock等待锁,直到获取锁为止if (lock.tryLock()) {lock.tryLock(redisLock.waitTime(), redisLock.leaseTime(), redisLock.timeUnit());}else {// 如果leaseTime>0,规定时间内获取锁,超时则自动释放锁long leaseTime = redisLock.leaseTime();if (leaseTime > 0) {lock.lock(redisLock.leaseTime(), redisLock.timeUnit());} else {// 自动释放锁时间设置为0或者负数,则加锁不设置超时时间lock.lock();}}return lock;}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- EasyNetQ库:让你的分布式系统消息开发快人一步!
- Kubernetes 微内核的分布式操作系统
- Seata Stellar:无缝整合不同框架的分布式事务解决方案
- 智能体的「一方有难八方支援」,一种分布式AI计算新范式诞生了
- 清华发布SmartMoE:一键实现高性能MoE稀疏大模型分布式训练
- 分布式系统中的CAP理论
- 四种分布式限流算法实现!
- 分布式基础理论 CAP & BASE
- App在线封装容易吗?是什么原理?
- 再聊聊分布式数据库,你知道了吗?