对于分布式锁的实现,zookeeper天然携带的一些特性能够很完美的实现分布式锁。其内部主要是利用znode节点 特性和watch机制完成。
在zookeeper中节点会分为四类,分别是:
watch监听机制主要用于监听节点状态变更,用于后续事件触发,假设当B节点监听A节点时,一旦A节点发生修改、删除、子节点列表发生变更等事件,B节点则会收到A节点改变的通知,接着完成其他额外事情。
其实现思想是当某个线程要对方法加锁时,首先会在zookeeper中创建一个与当前方法对应的父节点,接着每个要获取当前方法的锁的线程,都会在父节点下创建一个临时有序节点,因为节点序号是递增的,所以后续要获取锁的线程在zookeeper中的序号也是逐次递增的。根据这个特性,当前序号最小的节点一定是首先要获取锁的线程,因此可以规定序号最小的节点获得锁。所以,每个线程再要获取锁时,可以判断自己的节点序号是否是最小的,如果是则获取到锁。当释放锁时,只需将自己的临时有序节点删除即可。
根据上图,在并发下,每个线程都会在对应方法节点下创建属于自己的临时节点,且每个节点都是临时且有序的。 那么zookeeper又是如何有序的将锁分配给不同线程呢? 这里就应用到了watch监听机制。每当添加一个新的临时节点时,其都会基于watcher机制监听着它本身的前一个节点等待前一个节点的通知,当前一个节点删除时,就轮到它来持有锁了。然后依次类推。
在通过zookeeper实现分布式锁时,有另外一种实现的写法,这种也是非常常见的,但是它的效率并不高,此处可以先对这种实现方式进行探讨。
此种实现方式,只会存在一个锁节点。当创建锁节点时,如果锁节点不存在,则创建成功,代表当前线程获取到锁,如果创建锁节点失败,代表已经有其他线程获取到锁,则该线程会监听锁节点的释放。当锁节点释放后,则继续尝试创建锁节点加锁。
这种方案的低效点就在于,只有一个锁节点,其他线程都会监听同一个锁节点,一旦锁节点释放后,其他线程都会收到通知,然后竞争获取锁节点。这种大量的通知操作会严重降低zookeeper性能,对于这种由于一个被watch的 znode节点的变化,而造成大量的通知操作,叫做羊群效应。
为了避免羊群效应的出现,业界内普遍的解决方案就是,让获取锁的线程产生排队,后一个监听前一个,依次排序。推荐使用这种方式实现分布式锁
按照上述流程会在根节点下为每一个等待获取锁的线程创建一个对应的临时有序节点,序号最小的节点会持有锁, 并且后一个节点只监听其前面的一个节点,从而可以让获取锁的过程有序且高效。
总的来说,就是通过操作标识结合zookeeper分布式锁,完成mysql乐观锁的操作,思想上都是相同的。
分布式锁的一个很重要的特性就是互斥性,同一时间内多个调用方加锁竞争,只能有一个调用方加锁成功。而redis是基于单线程模型的,可以利用这个特性让调用方的请求排队,对于并发请求,只会有一个请求能获取到锁。
redis实现分布式锁也很简单,基于客户端的几个API就可以完成,主要涉及三个核心API:
当对业务进行加锁时,锁的过期时间,绝对不能想当然的设置一个值。假设线程A在执行某个业务时加锁成功 并设置锁过期时间。但该业务执行时间过长,业务的执行时间超过了锁过期时间,那么在业务还没执行完 时,锁就自动释放了。接着后续线程就可以获取到锁,又来执行该业务。就会造成线程A还没执行完,后续线 程又来执行,导致同一个业务逻辑被重复执行。因此对于锁的超时时间,需要结合着业务执行时间来判断, 让锁的过期时间大于业务执行时间。
上面的方案是一个基础解决方案,但是仍然是有问题的。
业务执行时间的影响因素太多了,无法确定一个准确值,只能是一个估值。无法百分百保证业务执行期间, 锁只能被一个线程占有。
如想保证的话,可以在创建锁的同时创建一个守护线程,同时定义一个定时任务每隔一段时间去为未释放的锁增加过期时间。当业务执行完,释放锁后,再关闭守护线程。 这种实现思想可以用来解决锁续期。
在单点redis虽然可以完成锁操作,可一旦redis服务节点挂掉了,则无法提供锁操作。 在生产环境下,为了保证redis高可用,会采用异步复制方法进行主从部署。当主节点写入数据成功,会异步的将 数据复制给从节点,并且当主节点宕机,从节点会被提升为主节点继续工作。假设主节点写入数据成功,在没有将数据复制给从节点时,主节点宕机。则会造成提升为主节点的从节点中是没有锁信息的,其他线程则又可以继续加 锁,导致互斥失效。
redisson是redis官网推荐实现分布式锁的一个第三方类库。其内部完成的功能非常强大,对各种锁都有实现,同 时对于使用者来说非常简单,让使用者能够将更多的关注点放在业务逻辑上。此处重点利用Redisson解决单机 Redis锁产生的两个问题。
基于redisson实现分布式锁很简单,直接基于lock()&unlock()方法操作即可。
1. 添加依赖
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--Redis分布式锁-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.1</version>
</dependency>
2. 修改配置文件
server:
redis:
host: 192.168.200.150
port: 6379
database: 0
jedis:
pool:
max-active: 500
max-idle: 1000
min-idle: 4
3. 修改springboot启动类
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Bean
public RedissonClient redissonClient(){
RedissonClient redissonClient;
Config config = new Config();
String url = "redis://" + host + ":" + port;
config.useSingleServer().setAddress(url);
try {
redissonClient = Redisson.create(config);
return redissonClient;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
4. 定义锁工具类
@Component
public class RedissonLock {
@Autowired
private RedissonClient redissonClient;
/**
* 加锁
* @param lockKey
* @return
*/
public boolean addLock(String lockKey){
try {
if (redissonClient == null){
System.out.println("redisson client is null");
return false;
}
RLock lock = redissonClient.getLock(lockKey);
//设置锁超时时间为5秒,到期自动释放
lock.lock(10, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+": 获取到锁");
//加锁成功
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean releaseLock(String lockKey){
try{
if (redissonClient == null){
System.out.println("redisson client is null");
return false;
}
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
System.out.println(Thread.currentThread().getName()+": 释放锁");
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
}
5. 测试
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedissonLockTest {
@Autowired
private RedissonLock redissonLock;
@Test
public void easyLock(){
//模拟多个10个客户端
for (int i=0;i<10;i++) {
Thread thread = new Thread(new LockRunnable());
thread.start();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
private class LockRunnable implements Runnable {
@Override
public void run() {
redissonLock.addLock("demo");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
redissonLock.releaseLock("demo");
}
}
}
6. 执行效果
根据执行效果可知,多线程并发获取所时,当一个线程获取到锁,其他线程则获取不到,并且其内部会不断尝试获取锁,当持有锁的线程将锁释放后,其他线程则会继续去竞争锁。
在上述加锁方法实现中,最核心就是getLock()和lock()。get()源码非常简单,根据当前传入的锁名称创建并返回 一个RLock对象。
当获取到RLock对象后,调用其内部的lock()执行加锁操作。根据源码描述,当线程获取锁时,如果没有获取到锁,则会让其进入自旋,直到获取到锁。 如果获取到锁,则会一直保留到调用unLock()手动释放或根据传入的 leaseTime时间自动释放。
当前传入两个参数值:锁超时时间,时间单位。主要用于避免死锁的出现,假设持有锁的redis节点宕机,到期后锁可以自动释放。
lock()方法中还会调用lock()的另外一个重载方法,需要传入三个参数:过期时间、时间单位、是否中断。
在三个参数的lock()重载方法中,首先会获取当前线程id,接着调用tryAcquire()方法尝试获取锁,如果返回值为 null,代表获取到锁。 如果返回值不是null,则根据当前线程id创建异步任务并放入线程池中,接着进入自旋,在 自旋过程中,尝试调用tryAcquire()获取锁,如果获取到则退出自旋。否则会不断的尝试获取锁。
在lock()方法中,最核心的是tryAcquire()。其内部核心实现会调用tryAcquireAsync(),并传入过期时间、时间单位和当前线程id,进行锁的获取。如果leaseTime不为-1,代表设置了有效时间,接着调用tryAcquireAsync()去获取锁。如果是-1的话,则默认把永不过期改为30秒过期,并且创建异步任务,如果没有获取到锁,则什么都不做。如果获取到了锁,则调用scheduleExpirationRenewal()对当前线程id的锁进行延时。
最终的tryLockInnerAsync()则是获取锁的具体实现。可以看到,其内部是基于lua脚本语言完成锁获取的。因为获取锁的过程涉及到了多步,为了保证执行过程的原子性,所以使用了lua,最核心的就是要理解这段lua脚本的执行过程。
对于这款lua脚本来说,KEYS[1]代表需要加锁的key,ARGV[1]代表锁的超时时间,ARGV[2]代表锁的唯一标识。 对于这段lua脚本,简单来说:
在释放锁时,unlock()内部会调用unlockAsync()对当前线程持有的锁进行释放。其内部最终会执行 unlockInnerAsync()方法完成锁释放并返回结果。
在unlockInnerAsync()中仍然是结合lua脚本完成释放锁操作。
相关参数:
对于锁续期问题,在单点redis实现分布式锁时已经介绍过了,用于防止业务执行超时或宕机而引起的业务被重复执行。
根据对lock方法的解析,可以发现,当设置完过期时间后,当前锁的过期时间就已经被设定了,不会发生改变, 到期后则会被自动释放,因此在业务执行中,通过lock()方法加锁会造成隐患。
所谓的看门狗是redisson用于自动延长锁有效期的实现机制。其本质是一个后台线程,用于不断延长锁key的生存时间。
改造锁示例代码,让锁超时时间为1秒,但是业务执行时,需要耗时3秒,此时执行可以发现,多线程间在上一个锁没有释放的情况下,后续线程又获取到了锁。但是解锁的时候,出现异常,因为加锁时的唯一标识与解锁时的唯 一标识发生了改变,造成死锁。
因为业务执行多久无法确定一个准确值,所以在看门狗的实现中,不需要对锁key设置过期时间,当过期时间为-1 时,这时会启动一个定时任务,在业务释放锁之前,会一直不停的增加这个锁的有效时间,从而保证在业务执行完 毕前,这把锁不会被提前释放掉。
要开启看门狗机制也很简单,只需要将加锁时使用lock()改为tryLock()即可。
并且根据之前lock的源码分析,如果没有设置锁超时,默认过期时间为30秒即watchdog每隔30秒来进行一次续期,该值可以修改。
config.setLockWatchdogTimeout(3000L);
进行测试,当加锁后,线程睡眠10秒钟,然后释放锁,可以看到在这段时间内,当前线程会一直持有锁,直到锁释放。在多线程环境下,也是阻塞等待进行锁的获取。
当在单点redis中实现redis锁时,一旦redis服务器宕机,则无法进行锁操作。因此会考虑将redis配置为主从结 构,但在主从结构中,数据复制是异步实现的。假设在主从结构中,master会异步将数据复制到slave中,一旦某 个线程持有了锁,在还没有将数据复制到slave时,master宕机。则slave会被提升为master,但被提升为slave的 master中并没有之前线程的锁信息,那么其他线程则又可以重新加锁。
redlock是一种基于多节点redis实现分布式锁的算法,可以有效解决redis单点故障的问题。官方建议搭建五台redis服务器对redlock算法进行实现。
在redis官网中,对于redlock算法的实现思想也做了详细的介绍。地址:https://redis.io/topics/distlock。整个实现过程分为五步:
虽然通过redlock能够更加有效的防止redis单点问题,但是仍然是存在隐患的。假设redis没有开启持久化, clientA获取锁后,所有redis故障重启,则会导致clientA锁记录消失,clientB仍然能够获取到锁。这种情况虽然发生几率极低,但并不能保证肯定不会发生。
保证的方案就是开始AOF持久化,但是要注意同步的策略,使用每秒同步,如果在一秒内重启,仍然数据丢失。使用always又会造成性能急剧下降。
官方推荐使用默认的AOF策略即每秒同步,且在redis停掉后,要在ttl时间后再重启。 缺点就是ttl时间内redis无法对外提供服务。
redisson对于红锁的实现已经非常完善,通过其内部提供的api既可以完成红锁的操作。
1. 新建配置类
@Configuration
public class RedissonRedLockConfig {
public RedissonRedLock initRedissonClient(String lockKey){
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://192.168.200.150:7000").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://192.168.200.150:7001").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://192.168.200.150:7002").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
Config config4 = new Config();
config4.useSingleServer().setAddress("redis://192.168.200.150:7003").setDatabase(0);
RedissonClient redissonClient4 = Redisson.create(config4);
Config config5 = new Config();
config5.useSingleServer().setAddress("redis://192.168.200.150:7004").setDatabase(0);
RedissonClient redissonClient5 = Redisson.create(config5);
RLock rLock1 = redissonClient1.getLock(lockKey);
RLock rLock2 = redissonClient2.getLock(lockKey);
RLock rLock3 = redissonClient3.getLock(lockKey);
RLock rLock4 = redissonClient4.getLock(lockKey);
RLock rLock5 = redissonClient5.getLock(lockKey);
RedissonRedLock redissonRedLock = new RedissonRedLock(rLock1,rLock2,rLock3,rLock4,rLock5);
return redissonRedLock;
}
}
2. 新建测试类,完成加锁与解锁操作
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedLockTest {
@Autowired
private RedissonRedLockConfig redissonRedLockConfig;
@Test
public void easyLock(){
//模拟多个10个客户端
for (int i=0;i<10;i++) {
Thread thread = new Thread(new RedLockTest.RedLockRunnable());
thread.start();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
private class RedLockRunnable implements Runnable {
@Override
public void run() {
RedissonRedLock redissonRedLock = redissonRedLockConfig.initRedissonClient("demo");
try {
boolean lockResult = redissonRedLock.tryLock(100, 10, TimeUnit.SECONDS);
if (lockResult){
System.out.println("获取锁成功");
TimeUnit.SECONDS.sleep(3);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
redissonRedLock.unlock();
System.out.println("释放锁");
}
}
}
}
redissonRedLock加锁源码分析
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
/**
* 1. 允许加锁失败节点个数限制(N-(N/2+1)),当前假设五个节点,则允许失败节点数为2
*/
int failedLocksLimit = failedLocksLimit();
/**
* 2. 遍历所有节点执行lua加锁,用于保证原子性
*/
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
/**
* 3.对节点尝试加锁
*/
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 抛出异常表示获取锁失败
lockAcquired = false;
}
if (lockAcquired) {
/**
*4. 如果获取到锁则添加到已获取锁集合中
*/
acquiredLocks.add(lock);
} else {
/**
* 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
*/
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
/**
* 6.计算从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则申请锁失败,返回false
*/
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
/**
* 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
*/
return true;
}
阅读量:2012
点赞量:0
收藏量:0