Redis 分布式锁详解( 四 )


private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 如果指定了锁的有效期 , 则直接返回加锁结果 , 不会走后面的 Watch Dog 机制if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 实际加锁RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);// 加锁执行完成后ttlRemainingFuture.onComplete((ttlRemaining, e) -> {// 加锁执行有异常 , 直接返回if (e != null) {return;}// lock acquired// 获取到锁if (ttlRemaining == null) {// 自动续期(watch dog)scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}复制代码
private void scheduleExpirationRenewal(long threadId) {// ExpirationEntry 维护锁的线程重入计数器和续期任务ExpirationEntry entry = new ExpirationEntry();// 将 entry 放入 ConcurrentHashMapExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {// 锁重入 , 当前线程计数器+1oldEntry.addThreadId(threadId);} else {// 第一次 , 当前线程计数器+1entry.addThreadId(threadId);// 第一次触发锁续期renewExpiration();}}复制代码
private void renewExpiration() {// 在 ConcurrentHashMap 中拿到 ExpirationEntry 对象ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}// 新建一个定时任务 , 自动续期的主要实现Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}// 获取第一个线程IdLong threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 异步续期RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {// 续期异常 , 打印错误日志 , 并且清除Map , 不再执行续期 。log.error("Can't update lock " + getName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}// 续期成功后 , 递归调用 , 继续调用达到持续续期目的if (res) {// reschedule itselfrenewExpiration();}});}// 延迟执行时间为 internalLockLeaseTime / 3 , internalLockLeaseTime 默认时间是 30s , 也可以自定义指定 。}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task); }复制代码
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +// 如果存在指定的 key 和 filed"redis.call('pexpire', KEYS[1], ARGV[1]); " +// 续期"return 1; " +// 返回续期成功"end; " +"return 0;",// 返回续期失败Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}复制代码
// 看门狗超时时间默为 30s ,  自定义的话可以修改 lockWatchdogTimeout 配置this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();private long lockWatchdogTimeout = 30 * 1000;复制代码
锁自动续期总结:

  1. 在没有指定锁超时时间(leaseTime)的情况下 , 加锁成功后就会执行自动续期 。
  2. 如果当前线程持有的是重入锁 , 则对锁重入次数+1 , 如果是首次加锁 , 除了锁次数+1 还需要执行锁续期 。这里需要清楚是只有首次加锁才会续期 , 重入锁不会执行续期操作 。将锁对应的线程 Id 及重入次数放入对象 ExpirationEntry 中 ,  ExpirationEntry 对像使用 LinkedHashMap 维护了锁的线程 Id 和重入计数器 。然后将 ExpirationEntry 对象放 EXPIRATION_RENEWAL_MAP (ConcurrentHashMap) ,  EXPIRATION_RENEWAL_MAP 中存放着所有需要续期的锁 。
  3. 新建一个延迟任务 , 10s(默认)之后执行 , 在 EXPIRATION_RENEWAL_MAP 中取出 ExpirationEntry 对象 , 拿到第一个线程 Id , 然后执行 Lua 脚本 , 检查线程 Id 对应的 key 和 filed 是否存在(锁) , 如果存在则重置锁的超时时间为 30s(默认) , 如果不存在则说明已经解锁了不需要续期 。
  4. 续期成功后 , 继续递归调用步骤 3 , 保证持续锁续期 , 续期失败则说明锁已经不存在了 , 停止续期 。


    推荐阅读