Redis 分布式锁详解( 二 )

Redis 分布式锁详解
文章插图
 

  1. 客户端 A 设置 lock_order 锁成功 , 锁值为 123uD , 超时间为 10000ms 。
  2. 客户端 A 业务代码执行完成 , 释放锁前需要获取 lock_order 锁的值 。
  3. 客户端 A 判断锁值是否是 123uD , 执行缓慢 。
  4. 客户端 A 的锁超时时间已到 , Redis 自动移除了锁 。
  5. 此时客户端 B 设置锁 , lock_order 锁不存在 , 所以加锁成功 。
  6. 客户端 A 判断锁值相等 , 执行 del 释放锁 , 此时客户端 A 释放的锁是客户端 B 的而不是自己的 , 锁出现错误 。
这也好解决 , Redis 提供了 EVAL (
https://redis.io/commands/eval ) 命令去解析 Lua 脚本 , 可以发一段 Lua 脚本给 Redis 执行:
if redis.call("get",KEYS[1]) == ARGV[1] -- 判断锁的值是否相等 。KEYS[1], ARGV[1] , 是指传入的参数 , 以上面为例 , KEYS[1] 指的是 lock_order , ARGV[1] 指的是 123uD ,  thenreturn redis.call("del",KEYS[1])-- 删除这个 key , 返回删除 key 的个数elsereturn 0-- 锁值不相等返回 0end复制代码
这样就可以保证原子执行了 。
五、基于Set命令实现 Redis 分布式锁基于 Redisson 客户端实现 Redis 分布式锁:
/** * 加锁利用 set(key, value, "PX", "NX") 函数实现 * 解锁利用 Lua 脚本实现 * <p> * Created by jie.li on 2021/1/4 7:50 下午 */@Componentpublic class RedisLock1 {@Resourceprivate RedissonClient redissonClient;/*** 尝试加锁** @param namelock name* @param value lock value* @return true 加锁成功, false 加锁失败*/public boolean tryLock(String name, String value) {RBucket<Object> bucket = redissonClient.getBucket(name);// 执行的是 set(key, value, "PX", "NX") 命令return bucket.trySet(value, 10000, TimeUnit.MILLISECONDS);}/*** 解锁** @param namelock name* @param value lock value*/public void unLock(String name, String value) {redissonClient.getScript().eval(RScript.Mode.READ_WRITE, DEL_LOCK_SCRIPT, RScript.ReturnType.INTEGER, Collections.singletonList(name), value);}// 解锁脚本private static final String DEL_LOCK_SCRIPT ="if redis.call("get",KEYS[1]) == ARGV[1] then" +// 如果 KEYS[1] 对应的 Value 值等于 ARGV[1]" return redis.call("del",KEYS[1])" +// 删除 KEYS[1]" else" +// 否则" return 0" +// 返回 0" end;";}复制代码
测试代码:
/** * 测试手动加锁解锁 * <p> * Created by jie.li on 2021/1/7 2:54 下午 */@Servicepublic class RedisLockTestService {@Resourceprivate RedisLock1 redisLock1;private int i = 50;/*** 测试手动实现 redis 分布式锁** @return int*/public int biz() {String lockName = "redis:lock:1";String lockValue = https://www.isolves.com/it/sjk/Redis/2022-06-17/UUID.randomUUID().toString();try {boolean b = redisLock1.tryLock(lockName, lockValue);if (b) {if (i > 0) {i--;}}} catch (Exception e) {e.printStackTrace();} finally {redisLock1.unLock(lockName, lockValue);}return i;}}复制代码
@Testpublic void testBiz() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(200);for (int i = 0; i < 200; i++) {new Thread(() -> {try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}int i1 = redisLockTestService.biz();System.out.println(Thread.currentThread().getName() + " -> " + i1);}, "Thread" + i).start();countDownLatch.countDown();}TimeUnit.SECONDS.sleep(5);}复制代码
六、Redisson 实现分布式锁1. Redisson 实现锁简介Redisson 实现的分布式锁相对于我们自己实现的锁更加完善 , 主要有以下两点:
1、可重入
2、锁重试
3、锁自动延期(看门狗机制)
Redisson 锁的依赖图:
Redis 分布式锁详解

文章插图
 
Redisson 实现了很多种类型的锁 , 所有的锁都实现了 JUC 中的 Lock 接口 , 并且做了扩展( RLock ) ,  所以使用方法和使用 ReentrantLock 差不多 。这里我们只针对 RedissonLock 进行讲解 。
2. Redisson 源码解析尝试加锁// waitTime等待获取锁的时间// leaseTime 锁的有效期// unit使用的时间单位@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();// 1、尝试加锁 , 如果当前有锁 , 返回锁的剩余时间ttl , 否则返回空Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquired// 2、加锁成功 , 返回trueif (ttl == null) {return true;}// 剩余的等待时间 waitTimetime -= System.currentTimeMillis() - current;// 剩余等待时间已过if (time <= 0) {// 获取锁失败acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();// 3、订阅锁释放事件 。利用semaphore(信号量) , 订阅(Redis 发布订阅)锁的释放事件 , // 锁释放后立即通知等待的线程竞争获取锁 。RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);// 4、线程阻塞// - 返回 false: 阻塞时间已经超过了剩余等待时间(waitTime) , 取消订阅事件 , 加锁失败// - 返回 ture:继续尝试加锁if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;// 剩余等待时间已过 , 加锁失败if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 5、继续以同样的方式获加锁 , 如果过了最大的等待加锁时间 , 则加锁失败 , 返回falsewhile (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();// 6、通过信号量(共享锁)阻塞 , 等待释放锁消息// 锁剩余时间小于剩余的waitTime时间if (ttl >= 0 && ttl < time) {// 非阻塞的获取结果 , 获得信号量 , 在给定的时间内从信号量获取一个许可 。subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}// 7、剩余的waitTimetime -= System.currentTimeMillis() - currentTime;// 加锁最大等待时间已过 , 加锁失败 , 返回falseif (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {// 取消订阅事件unsubscribe(subscribeFuture, threadId);}}


推荐阅读