建设网站方式有哪些,东莞网站制作支付通道,注册一个公司大概要多少钱,建筑模型分布式锁
概述
分布式锁指的是#xff0c;所有服务中的所有线程都去获取同一把锁#xff0c;但只有一个线程可以成功的获得锁#xff0c;其他没有获得锁的线程必须全部等待#xff0c;直到持有锁的线程释放锁。
分布式锁是可以跨越多个实例#xff0c;多个进程的锁 分布…分布式锁
概述
分布式锁指的是所有服务中的所有线程都去获取同一把锁但只有一个线程可以成功的获得锁其他没有获得锁的线程必须全部等待直到持有锁的线程释放锁。
分布式锁是可以跨越多个实例多个进程的锁 分布式锁具备的条件
互斥性任意时刻只能有一个客户端持有锁锁超时释放持有锁超时可以释放防止死锁可重入性一个线程获取了锁之后可以再次对其请求加锁高可用、高性能加锁和解锁开销要尽可能低同时保证高可用安全性锁只能被持有该锁的服务(或应用)释放。容错性在持有锁的服务崩溃时锁仍能得到释放避免死锁。
分布式锁实现方案
分布式锁都是通过第三方组件来实现的目前比较流行的分布式锁的解决方案有
数据库通过数据库可以实现分布式锁但是在高并发的情况下对数据库压力较大所以很少使用。Redis借助Redis也可以实现分布式锁而且Redis的Java客户端种类很多使用的方法也不尽相同。ZookeeperZookeeper也可以实现分布式锁同样Zookeeper也存在多个Java客户端使用方法也不相同
Redis实现分布式锁
SETNX
基本方案Redis提供了setXX指令来实现分布式锁
highlighter-
格式: setnx key value
将key 的值设为value 当且仅当key不存在。
若给定的 key已经存在则SETNX不做任何动作。 设置分布式锁后能保证并发安全但上述代码还存在问题如果执行过程中出现异常程序就直接抛出异常退出导致锁没有释放造成最终死锁的问题。即使将锁放在finally中释放但是假如是执行到中途系统宕机锁还是没有被成功的释放掉依然会出现死锁现象
设置超时时间
highlighter- SQL
SET lock_key unique_value NX PX 10000
但是即使设置了超时时间后还存在问题。
假设有多个线程假设设置锁的过期时间10s,线程1上锁后执行业务逻辑的时长超过十秒锁到期释放锁线程2就可以获得锁执行此时线程1执行完删除锁删除的就是线程2持有的锁线程3又可以获取锁线程2执行完删除锁删除的是线程3的锁如此往后这样就会出问题。
让线程只删除自己的锁
解决办法就是让线程只能删除自己的锁即给每个线程上的锁添加唯一标识这里UUID实现基本不会出现重复删除锁时判断这个标识 但上述红框中由于判定和释放锁不是原子的极端情况下可能判定可以释放锁在执行删除锁操作前刚好时间到了其他线程获取锁执行前者线程删除锁删除的依然是别的线程的锁所以要让删除锁具有原子性可以利用redis事务或lua脚本实现原子操作判断删除 Redis的单条命令操作是原子性的但是多条命令操作并不是原子性的因此Lua脚本实现的就是令Redis的多条命令也实现原子操作 redis事务不是原子操作的详情请看 Redis的事务 但是可以利用Redis的事务和watch实现的乐观锁 来监视锁的状态 java RequestMapping( /deduct_stock)public String deductStock() {String REDIS_LOCK good_lock;// 每个人进来先要进行加锁key值为good_lockString value UUID.randomUUID().toString().replace(-,);try{// 为key加一个过期时间Boolean flag template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);// 加锁失败if(!flag){return 抢锁失败;}System.out.println( value 抢锁成功);String result template.opsForValue().get(goods:001);int total result null ? 0 : Integer.parseInt(result);if (total 0) {// 如果在此处需要调用其他微服务处理时间较长。。。int realTotal total - 1;template.opsForValue().set(goods:001, String.valueOf(realTotal));System.out.println(购买商品成功库存还剩 realTotal 件 服务端口为8002);return 购买商品成功库存还剩 realTotal 件 服务端口为8002;} else {System.out.println(购买商品失败服务端口为8002);}return 购买商品失败服务端口为8002;}finally {// 谁加的锁谁才能删除// 也可以使用redis事务// https://redis.io/commands/set// 使用Lua脚本进行锁的删除Jedis jedis null;try{jedis RedisUtils.getJedis();String script if redis.call(get,KEYS[1]) ARGV[1] then return redis.call(del,KEYS[1]) else return 0 end;Object eval jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));if(1.equals(eval.toString())){System.out.println(-----del redis lock ok....);}else{System.out.println(-----del redis lock error ....);}}catch (Exception e){}finally {if(null ! jedis){jedis.close();}}// redis事务
// while(true){
// template.watch(REDIS_LOCK);
// if(template.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
// template.setEnableTransactionSupport(true);
// template.multi();
// template.delete(REDIS_LOCK);
// ListObject list template.exec();
// if(list null){
// continue;
// }
// }
// template.unwatch();
// break;
// }}}
}
尽管这样还是会有问题锁超时释放虽然可以避免死锁但如果是业务执行耗时较长也会导致锁的释放但其实此时业务还在执行中还是应该将业务执行结束之后再释放锁。
续时
因此可以设定任务不完成锁就不释放。
可以维护一个定时线程池 ScheduledExecutorService每隔 2s 去扫描加入队列中的 Task判断失效时间是否快到了如果快到了则给锁续上时间。
那如何判断是否快到失效时间了呢可以用以下公式【失效时间】 【当前时间】【失效间隔三分之一超时】
java
// 扫描的任务队列
private static ConcurrentLinkedQueueRedisLockDefinitionHolder holderList new ConcurrentLinkedQueue();
/*** 线程池维护keyAliveTime*/
private static final ScheduledExecutorService SCHEDULER new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern(redisLock-schedule-pool).daemon(true).build());
{// 两秒执行一次「续时」操作SCHEDULER.scheduleAtFixedRate(() - {// 这里记得加 try-catch否者报错后定时任务将不会再执行-IteratorRedisLockDefinitionHolder iterator holderList.iterator();while (iterator.hasNext()) {RedisLockDefinitionHolder holder iterator.next();// 判空if (holder null) {iterator.remove();continue;}// 判断 key 是否还有效无效的话进行移除if (redisTemplate.opsForValue().get(holder.getBusinessKey()) null) {iterator.remove();continue;}// 超时重试次数超过时给线程设定中断if (holder.getCurrentCount() holder.getTryCount()) {holder.getCurrentTread().interrupt();iterator.remove();continue;}// 判断是否进入最后三分之一时间long curTime System.currentTimeMillis();boolean shouldExtend (holder.getLastModifyTime() holder.getModifyPeriod()) curTime;if (shouldExtend) {holder.setLastModifyTime(curTime);redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);log.info(businessKey : [ holder.getBusinessKey() ], try count : holder.getCurrentCount());holder.setCurrentCount(holder.getCurrentCount() 1);}}}, 0, 2, TimeUnit.SECONDS);
}
Redisson
使用Redis lua方式可能存在的问题
不可重入性。同一个线程无法多次获取同一把锁不可重试。获取锁只尝试一次就返回false没有重试机制超时释放。锁超时释放虽然可以避免死锁但如果是业务执行耗时较长也会导致锁的释放存在安全隐患主从一致性。如果Redis是主从集群主从同步存在延迟当主机宕机时从成为了主但可能存在从此时还未完成同步因此从上就没有锁标识此时会出现线程安全问题。
RLock是Redisson分布式锁的最核心接口继承了concurrent包的Lock接口和自己的RLockAsync接口RLockAsync的返回值都是RFuture是Redisson执行异步实现的核心逻辑也是Netty发挥的主要阵地。
RLock如何加锁解锁实现可重入性
从RLock进入找到RedissonLock类找到tryLock 方法再继续找到tryAcquireOnceAsync 方法这是加锁的主要代码版本不一此处实现有差别和最新3.15.x有一定出入但是核心逻辑依然未变。此处以3.13.6为例
java
// waitTime 等待时间多久时间内都会在这尝试获取锁
// leaseTime 加锁时是否设置过期时间
private RFutureBoolean tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime ! -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {RFutureBoolean ttlRemainingFuture this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) - {if (e null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}}
此处出现leaseTime时间判断的2个分支实际上就是加锁时是否设置过期时间未设置过期时间-1时则会有watchDog 的锁续约 下文一个注册了加锁事件的续约任务。我们先来看有过期时间tryLockInnerAsync 部分
evalWriteAsync方法是eval命令执行lua的入口
java
T RFutureT tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {this.internalLockLeaseTime unit.toMillis(leaseTime);return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);, Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
eval命令执行Lua脚本的地方此处将Lua脚本展开
lua
-- 不存在该key时
if (redis.call(exists, KEYS[1]) 0) then -- 新增该锁并且hash中该线程id对应的count置1redis.call(hincrby, KEYS[1], ARGV[2], 1); -- 设置过期时间redis.call(pexpire, KEYS[1], ARGV[1]); return nil;
end; -- 存在该key 并且 hash中线程id的key也存在
if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then -- 线程重入次数redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil;
end;
return redis.call(pttl, KEYS[1]);
lua
// keyName
KEYS[1] Collections.singletonList(this.getName())
// leaseTime
ARGV[1] this.internalLockLeaseTime
// uuidthreadId组合的唯一值
ARGV[2] this.getLockName(threadId)
总共3个参数完成了一段逻辑
判断该锁是否已经有对应hash表存在 没有对应的hash表则set该hash表中一个entry的key为锁名称value为1之后设置该hash表失效时间为leaseTime存在对应的hash表则将该lockName的value执行1操作也就是计算进入次数再设置失效时间leaseTime最后返回这把锁的ttl剩余时间
再看看RLock如何解锁
看unlock方法同样查找方法名一路到unlockInnerAsync
java
protected RFutureBoolean unlockInnerAsync(long threadId) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil;end; local counter redis.call(hincrby, KEYS[1], ARGV[3], -1); if (counter 0) then redis.call(pexpire, KEYS[1], ARGV[2]); return 0; else redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1; end; return nil;, Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}
将lua脚本展开
lua
-- 不存在key
if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil;
end;
-- 存在计数器 -1
local counter redis.call(hincrby, KEYS[1], ARGV[3], -1);
if (counter 0) then -- 过期时间重设redis.call(pexpire, KEYS[1], ARGV[2]); return 0;
else-- 删除并发布解锁消息redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1;
end;
return nil;
该Lua KEYS有2个Arrays.asList(getName(), getChannelName())
lua
name 锁名称
channelName用于pubSub发布消息的channel名称
ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
lua
LockPubSub.UNLOCK_MESSAGEchannel发送消息的类别此处解锁为0
internalLockLeaseTimewatchDog配置的超时时间默认为30s
lockName 这里的lockName指的是uuid和threadId组合的唯一值
具体执行步骤如下
如果该锁不存在则返回nil如果该锁存在则将其线程的hash key计数器-1计数器counter0重置下失效时间返回0否则删除该锁发布解锁消息unlockMessage返回1
加锁解锁流程总结如下 总的来说就是通过Hash类型来存储锁的次数 RLock的锁重试问题
需要分析的是锁重试的所以在使用lock.tryLock()方法的时候不能用无参的。
java
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return this.tryLock(waitTime, -1L, unit);
}
在调用tryAcquire方法后返回了一个Long的ttl
java public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time unit.toMillis(waitTime);long current System.currentTimeMillis();long threadId Thread.currentThread().getId();Long ttl this.tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl null) {return true;} else {time - System.currentTimeMillis() - current;if (time 0L) {this.acquireFailed(waitTime, unit, threadId);return false;} else {//省略
继续跟着代码进去查看最后会发现调用tryLockInnerAsync方法。这个方法就是获取锁的Lua脚本的。
java
T RFutureT tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {this.internalLockLeaseTime unit.toMillis(leaseTime);return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);, Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
这个lua脚本上面提到了。就是 判断如果获取到锁返回一个nil.也就是null。如果没有获取到就调用 pttlname。其实就是获取当前name锁的剩余有效期。
获取到ttl。如果返回null说获取锁成功直接返回true.如果返回的不是null说明需要进行重试操作了。主要是根据时间进行判断的。经过一系列判断后do,while是真正执行重试相关逻辑的。如下
java
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time unit.toMillis(waitTime);long current System.currentTimeMillis();long threadId Thread.currentThread().getId();Long ttl this.tryAcquire(waitTime, leaseTime, unit, threadId);//如果返回null,说明获取到了锁直接返回if (ttl null) {return true;} else {//当前时间与进入方法时的时间进行比较//System.currentTimeMillis() - current表示前面获取锁消耗时间time - System.currentTimeMillis() - current;time是重试锁的等待时间if (time 0L) {//剩余等待时间如果剩余等待时间0设置获取锁失败。this.acquireFailed(waitTime, unit, threadId);return false;} else {//再次获取当前时间current System.currentTimeMillis();//刚刚尝试完获取锁失败如果继续立即尝试一般是获取不到锁的因此这里选择订阅的方式//订阅当前锁在unlock释放锁的时候有个redis.call(publish, KEYS[2], ARGV[1]); 所以这里就订阅了RFutureRedissonLockEntry subscribeFuture this.subscribe(threadId);//进行等待RFuture的结果等多久等time的时间if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {//time时间过完了还没有等到锁释放的通知if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) - {if (e null) {//如果等待超时就取消订阅this.unsubscribe(subscribeFuture, threadId);}});}this.acquireFailed(waitTime, unit, threadId);//返回获取锁失败return false;} else {//到这里表示在tme时间内获得了释放锁的通知boolean var16;try {//检查之前订阅等待的消耗时间time - System.currentTimeMillis() - current;if (time 0L) {//当前的剩余等待时间this.acquireFailed(waitTime, unit, threadId);boolean var20 false;return var20;}//这里开始进行重试相关逻辑。主要就是当前时间和进入方法时候的时间进行比较do {long currentTime System.currentTimeMillis();//这里就是第一次重试ttl this.tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl null) {//null表示获取锁失败var16 true;return var16;}//再试一次time - System.currentTimeMillis() - currentTime;if (time 0L) {this.acquireFailed(waitTime, unit, threadId);var16 false;return var16;}currentTime System.currentTimeMillis();if (ttl 0L ttl time) { //也不是一直试等别人释放((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time - System.currentTimeMillis() - currentTime;} while(time 0L);//时间还充足继续等待//时间到期了还没获取到锁返回失败this.acquireFailed(waitTime, unit, threadId);var16 false;} finally {this.unsubscribe(subscribeFuture, threadId);}return var16;}}}
}
主要是do while机制进行锁重试的while会检查时间是否还充足会继续循环。当然这个循环不是直接while(true)的盲等机制而是利用信号量和订阅的方式实现的会等别人释放锁再进行尝试这种方式对cpu友好
Redisson的超时续约
跟随tryLock代码在RedissonLock类中的tryAcquireOnceAsync方法中会看到如下代码:
java
private RFutureBoolean tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime ! -1L) {//设置了锁过期时间return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {//leaseTime -1时即没有设置了锁过期时间RFutureBoolean ttlRemainingFuture this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),//默认30秒TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);//ttlRemainingFuture完成以后ttlRemainingFuture.onComplete((ttlRemaining, e) - {if (e null) {//没有抛异常if (ttlRemaining) {//获取锁成功this.scheduleExpirationRenewal(threadId);//自动更新续期时间的任务调度}}});return ttlRemainingFuture;}
}
在使用trylock的时候如果设置了锁过期时间就不会执行续命相关逻辑了。其中默认的watchdogTimeout时间是30秒。
java
private void scheduleExpirationRenewal(long threadId) {RedissonLock.ExpirationEntry entry new RedissonLock.ExpirationEntry();//获取一个entry,将entry放到map里getEntryName()就是当前锁名称。//放到map里即一个锁对应一个entryRedissonLock.ExpirationEntry oldEntry (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);if (oldEntry ! null) {//表示重入的第二次放oldEntry.addThreadId(threadId);} else {//表示第一次放entry.addThreadId(threadId);this.renewExpiration();//第一次放进行续约}}
看门狗机制在获取锁成功以后开启一个定时任务每隔一段时间就会去重置锁的超时时间以确保锁是在程序执行完unlock手动释放的不会发生因为业务阻塞key超时而自动释放的情况。
到期续约方法
java
private void renewExpiration() {RedissonLock.ExpirationEntry ee (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee ! null) { //Timeout定时任务或者叫周期任务Timeout task this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {RedissonLock.ExpirationEntry ent (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());if (ent ! null) {Long threadId ent.getFirstThreadId();if (threadId ! null) {//执行续命的操作RFutureBoolean future RedissonLock.this.renewExpirationAsync(threadId);future.onComplete((res, e) - {if (e ! null) {RedissonLock.log.error(Cant update lock RedissonLock.this.getName() expiration, e);} else {if (res) {RedissonLock.this.renewExpiration();//再次调用}}});}}}//刷新周期 this.internalLockLeaseTime / 3L 默认释放时间是30秒除以3就是每10秒更新一次//续命时间为1/3的过期时间设置续命单位是秒},this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task);}
}
查看renewExpirationAsync方法源码其调用了Lua脚本执行续命操作的。
java
protected RFutureBoolean renewExpirationAsync(long threadId) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(pexpire, KEYS[1], ARGV[1]); return 1; end; return 0;, Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
pexpire重置锁的有效期。
总体逻辑如下
开启一个任务10秒钟后执行开始的这个任务中重置有效期。假设设置的是默认30秒则重置为30秒更新后又重复步骤1、2
那么什么时候取消这个续约的任务呢在释放锁unlock时
java public RFutureVoid unlockAsync(long threadId) {RPromiseVoid result new RedissonPromise();RFutureBoolean future this.unlockInnerAsync(threadId);future.onComplete((opStatus, e) - {//取消这个任务this.cancelExpirationRenewal(threadId);if (e ! null) {result.tryFailure(e);} else if (opStatus null) {IllegalMonitorStateException cause new IllegalMonitorStateException(attempt to unlock lock, not locked by current thread by node id: this.id thread-id: threadId);result.tryFailure(cause);} else {result.trySuccess((Object)null);}});return result;
}
multilock解决主从一致性问题
如果Redis是主从集群主从同步存在延迟当主机宕机时从成为了主但可能存在从此时还未完成同步因此从上就没有锁标识此时会出现并发安全问题。
因此redisson提出来了MutiLock锁使用这把锁就不使用主从了每个节点的地位都是一样的 这把锁加锁的逻辑需要写入到每一个主丛节点上只有所有的服务器都写入成功此时才是加锁成功假设现在某个节点挂了那么他去获得锁的时候只要有一个节点拿不到都不能算是加锁成功就保证了加锁的可靠性。 使用multilock方法。必须在所有的节点都获取锁成功才算成功。 缺点是运维成本高实现复杂。
java
Resource
private RedissonClient redissonClient;
Resource
private RedissonClient2 redissonClient2;
Resource
private RedissonClient3 redissonClient3;RLock lock redissonClient.getMultilock(lock1,lock2,lock3)
总结Redisson
Redisson分布式锁解决前三个问题原理 总结Redisson分布式锁原理
可重入利用hash结构记录线程id和重入次数可重试利用信号量和PubSub功能来实现等待、唤醒获取锁失败的重试机制超时续约利用watchDog开启一个定时任务每隔一段时间(releaseTime/3)重置超时时间。使用multilock: 多个独立的redis节点必须在所有节点都获取重入锁,才算获取成功;
redLock
不管是redLock还是redissonLock两者底层都是通过相同的lua脚本来加锁、释放锁的所以两者只是外部形态的不同底层是一样的。redLock是继承了redissonMultiLock大部分的逻辑都是在redissonMultiLock中去实现的所以源码部分大部分都是RedissonMultiLock
原理
redLock的使用需要有奇数台独立部署的Redis节点在加锁的时候会分别去N台节点上加锁如果半数以上的节点加锁成功就认为当前线程加锁成功