baby sword‘s blog baby sword‘s blog
首页
  • java基础
  • java进阶
大数据
  • mysql

    • mysql索引
    • mysql日志
  • redis

    • 单机下的redis
    • 集群下的redis
  • Spring
  • springboot
  • RPC
  • netty
  • mybatis
  • maven
  • 消息队列
  • kafka
  • zookeeper
  • rocketmq
  • 七大设计原则
  • 创建型模式
  • 结构型模式
  • 行为型模式
  • SpringCloud

    • eureka
  • SpringCloud Alibaba

    • nacos
  • 计算机网络
  • 操作系统
  • 算法
  • 个人项目
  • 个人面试面经
  • 八股记忆
  • 工作积累
  • 逻辑题
  • 面试

    • 百度后端实习二面
GitHub (opens new window)

zhengjian

不敢承担失去的风险,是不可能抓住梦想的
首页
  • java基础
  • java进阶
大数据
  • mysql

    • mysql索引
    • mysql日志
  • redis

    • 单机下的redis
    • 集群下的redis
  • Spring
  • springboot
  • RPC
  • netty
  • mybatis
  • maven
  • 消息队列
  • kafka
  • zookeeper
  • rocketmq
  • 七大设计原则
  • 创建型模式
  • 结构型模式
  • 行为型模式
  • SpringCloud

    • eureka
  • SpringCloud Alibaba

    • nacos
  • 计算机网络
  • 操作系统
  • 算法
  • 个人项目
  • 个人面试面经
  • 八股记忆
  • 工作积累
  • 逻辑题
  • 面试

    • 百度后端实习二面
GitHub (opens new window)
  • mysql

  • redis

    • 单机下的redis

    • 集群下的redis

      • 集群下的redis
      • 集群下的redis2
      • redisson详解
        • 1.原来使用redis实现setnx的分布式锁有什么缺陷
        • 2.Redisson的使用
          • 引入jar包
          • 在springboot中创建客户端配置类
          • 执行流程简析
        • 3.源码分析
          • ①先看lock源码
          • ②unlock
          • ③默认非公平锁
          • ④默认可重入锁
          • ⑤boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
          • ⑥公平锁等等
        • 4.Redis主从架构锁失效问题
          • 那怎么解决呢?
        • 5.分布式锁怎么进行优化
      • raft协议浅析
      • 一致性hash与hash槽
  • MongoDB

  • 后端存储实战

  • 数据库
  • redis
  • 集群下的redis
xugaoyi
2022-12-28
目录

redisson详解

# 1.原来使用redis实现setnx的分布式锁有什么缺陷

  • 超时时间设置不当导致并发问题
  • 无法实现可重入锁
  • 自己编写setnx可能出现bug
  • 部分代码如finally中的代码无法保证原子性

锁续命,看门狗

原理:假如我们设置一把锁的过期时间为30s,那么当主线程抢占到锁后,开启一个异步线程,定时扫描过期时间和主线程,如果主线程还没有执行完,就将redis中key重新设置过期时间为30s。

实现:redisson

https://redisson.org/

# 2.Redisson的使用

# 引入jar包

<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.6.5</version>
</dependency>
1
2
3
4
5

# 在springboot中创建客户端配置类

@Bean
public Redisson redisson(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379");
        return (Redisson) Redisson.create(config);
    }
1
2
3
4
5
6

核心api

RLock lock = redisson.getLock(lockName);//获取一把锁
lock.lock();  //加锁
lock.unlock();//释放锁
lock.lock(10, TimeUnit.SECONDS); //占用多少秒后释放
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);//尝试获取锁,如果100s内没获取到则返回false,若成功获取则10s删除锁
1
2
3
4
5

# 执行流程简析

image-20221228155500908

一台机器中的线程1和另外一台机器的线程2准备抢占锁,若线程1抢占成功

  1. 线程1会开启另一个线程,每隔10s检测线程1是否还持有锁,如果持有就延长锁时间
  2. 线程2通过while+阻塞自旋间歇性尝试加锁

# 3.源码分析

# ①先看lock源码

public void lock() {
        try {
            this.lockInterruptibly();
        } catch (InterruptedException var2) {
            Thread.currentThread().interrupt();
        }

    }

public void lockInterruptibly() throws InterruptedException {
        this.lockInterruptibly(-1L, (TimeUnit)null);
    }


//核心:
//leaseTime :释放锁时间
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        
    
      //尝试获得锁:看详情1
      Long ttl = this.tryAcquire(leaseTime, unit, threadId); //重点
    
      //如果不为空,,则表示抢占锁失败
        if (ttl != null) {
            //请看详情3讲解
            RFuture<RedissonLockEntry> future = this.subscribe(threadId);
            this.commandExecutor.syncSubscription(future);

            try {
                while(true) {
                    //再次尝试获取锁
                    ttl = this.tryAcquire(leaseTime, unit, threadId);
                    if (ttl == null) {
                        return;
                    }
          
                    //获取锁失败,则进行阻塞
                    if (ttl >= 0L) {
                        //这里使用了semphere信号量机制进行线程的阻塞
                        this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        this.getEntry(threadId).getLatch().acquire();
                    }
                }
            } finally {
                //获取锁成功了,这里就取消当前线程订阅的管道
                this.unsubscribe(future, threadId);
            }
        }
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

详情1

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
    }

//返回的是个Tuture对象
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1L) {
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            //真正获取锁逻辑:看详情2
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            
            //给Tuture加上监听器,一旦获取结果则调用此方法
            ttlRemainingFuture.addListener(new FutureListener<Long>() {
                public void operationComplete(Future<Long> future) throws Exception {
                    
                    //如果执行成功
                    if (future.isSuccess()) {
                        Long ttlRemaining = (Long)future.getNow();
                        
                        //表示获取锁成功
                        if (ttlRemaining == null) {
                            //开启线程定期锁续命:详情3
                            RedissonLock.this.scheduleExpirationRenewal(threadId);
                        }

                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

详情2

//真正获取锁逻辑
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }
1
2
3
4
5
//KETS[1] : 锁的名字
//ARGV[1] : 超时时间leaseTime,其实是lockWatchDogTime 默认30s
//ARGV[2] : 线程id
if (redis.call('exists', KEYS[1]) == 0) then   //如果当前锁没有被任何线程占有
    redis.call('hset', KEYS[1], ARGV[2], 1);   //使用hash的结构创建锁 key(锁名) field(线程) value(重入次数)
  redis.call('pexpire', KEYS[1], ARGV[1]);   //设置过期时间,默认30s
  return nil;                 //设置成功则返回null
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then  //如果被占有的锁的线程是当前线程,即执行可重入了逻辑
    redis.call('hincrby', KEYS[1], ARGV[2], 1);         //可重入+1
  redis.call('pexpire', KEYS[1], ARGV[1]); return nil;  //在次设置过期时间30s
end; 
return redis.call('pttl', KEYS[1]); //如果没有成功获取锁,则返回该锁剩余的过期时间
1
2
3
4
5
6
7
8
9
10
11
12
13

详情3

锁续命逻辑:scheduleExpirationRenewal(threadId)

private void scheduleExpirationRenewal(final long threadId) {
        if (!expirationRenewalMap.containsKey(this.getEntryName())) {
            
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                //只要线程成功获取了锁,就会执行这里的逻辑
                public void run(Timeout timeout) throws Exception {
                    
                    //查看这把锁对应的线程有没有被释放或过期,没有的话,则重新设置为30s,并返回1;
                    RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
                    
                    //future监听
                    future.addListener(new FutureListener<Boolean>() {
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                            if (!future.isSuccess()) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
                            } else {
                                //如果续命成功,则通过递归的方式进行下一轮定时任务
                                if ((Boolean)future.getNow()) {
                                    RedissonLock.this.scheduleExpirationRenewal(threadId);
                                }

                            }
                        }
                    });
                }
                //这里可以看出来,每个10s执行一次
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
                task.cancel();
            }

        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

详情3

 RFuture<RedissonLockEntry> future = this.subscribe(threadId);
 this.commandExecutor.syncSubscription(future);
1
2

首先,线程没有获取锁成功,这里会根据redis中的消息队列机制,让当前一个线程去订阅一个通道channelRFuture<RedissonLockEntry> future = this.subscribe(threadId);

当通道channel中有消息时,线程可以执行特定的方法:this.commandExecutor.syncSubscription(future);

//订阅了通道chennel且channel收到数据后,就会执行以下方法 
protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
            
            //从信号量中的进行唤醒线程
            value.getLatch().release();

            //不重要
            while(true) {
                Runnable runnableToExecute = null;
                synchronized(value) {
                    Runnable runnable = (Runnable)value.getListeners().poll();
                    if (runnable != null) {
                        if (value.getLatch().tryAcquire()) {
                            runnableToExecute = runnable;
                        } else {
                            value.addListener(runnable);
                        }
                    }
                }

                if (runnableToExecute == null) {
                    return;
                }

                runnableToExecute.run();
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

以上可以看出,必须要管道中收到了消息,这里才可以唤醒AQS队列中沉睡得的线程,这就要引出unlock了

# ②unlock

public void unlock() {
        Boolean opStatus = (Boolean)this.get(this.unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + Thread.currentThread().getId());
        } else {
            if (opStatus) {
                this.cancelExpirationRenewal();
            }

        }
    }


protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//KEYS[1] 锁名字
//KEYS[2] 通道名
//

if (redis.call('exists', KEYS[1]) == 0) then //锁是否存在
    redis.call('publish', KEYS[2], ARGV[1]);  //若不存在,可能是过期了,这时需要向通道channel发布消息
return 1; 
end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then //如果锁的线程不是当前线程所占有的,表示当前线程尝试删除别人的锁,直接返回null
    return nil;
end; 
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); //让当前线程重入次数-1
if (counter > 0) then
    redis.call('pexpire', KEYS[1], ARGV[2]);  //如果锁的重入次数没有到0,表示锁继续被占有,这里进行一次锁续命
return 0; 
else redis.call('del', KEYS[1]); //表示重入次数为0,则直接删除
redis.call('publish', KEYS[2], ARGV[1]);  //向通道发送信息
return 1; 
end; 
return nil;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

从上面可以看出redisson采用订阅发布机制+AQS实现阻塞线程的唤醒。

  • 加锁:redis 中设置 hash 结构,默认过期时间为 30000ms。
  • 维持加锁:后台有一个调度任务,每10秒钟调度一次,只要客户端和key 都还存在,就会刷新当前 key 的过期时间。
  • 锁互斥:别的客户端或者别的线程再来加锁,会陷入 while(true) 的死循环中,等待。
  • 可重入锁:同一个线程可以加锁多次,每次的话,就是在 hash 结构上自增1。
  • 手动释放锁:在 hash 结构上递减1,对比剩余个数是否为0,为0则直接删除 key。
  • 宕机释放锁:当客户端宕机之后,后台的调度任务就会取消,key 的过期时间就不会在被刷新,默认30s后,key 自动消失。
  • 尝试加锁超时:在循环中,一直尝试获取锁,若时间到了之后,还没有获取到,就退出循环,返回 false。
  • 自动释放锁:在加锁的时候,设置超时时间,这样就不会有调度任务,key 会在设置的过期时间之后过期。

redisson - 可重入锁.png

# ③默认非公平锁

image-20221228195504460

调用lock方法,线程以上来就尝试获取锁,可以看出是非公平锁。当然在redisson中也有公平锁的实现

什么时候锁会释放?

  1. 宕机,看门狗机制无法执行,redis中数据自动过期
  2. 主动调用unlock

# ④默认可重入锁

从之前的lua脚本很容易看出来redisson采用hash结构来实现可重入锁

# ⑤boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);

redisson 提供了一个高阶用法,这个就是说,尝试100s,如果100s还获取不到锁就放弃;如果获取到锁,只会占有这个锁 10s , 如果 10s 还没有释放锁,就会自动进行释放。

  • 注意:看门狗和锁续命是没有手动设置过期时间、调用lock方法才会有的一个特殊功能,如果调用trylock,就可能不会使用锁续命和看门狗机制。

# ⑥公平锁等等

TODO

参考:

https://juejin.cn/post/6901951315962757134#heading-10 (opens new window)

# 4.Redis主从架构锁失效问题

为了提高锁的可用性,使用主从架构下

但是在主从架构下会有问题:

image-20221228214441952

如图所示,如果process1尝试去获取锁,并获取master锁成功,立即返回给process1。然后正准备将数据同步给slave,此时master挂了,slave当选新结点,但是其中没有锁信息,因为之前master还未同步就挂了。此时process2来获取锁,发现master可获取锁。最终导致锁失效。

其主要原因是因为redis是一种AP架构,其保证可用性,但是在一致性方面有些欠缺。

# 那怎么解决呢?

  1. 使用zookeeper作为分布式锁

zookeeper也可以做为分布式锁,其是CP架构,保证了数据的一致性,当process1向zookeeper集群中的leader请求锁时,只有当zookeeper集群中超过半数的结点同步了信息后,leader才会返回信息给process1。这样就算leader挂了,zookeeper集群也会选择同步数据最多的结点当选新leader,而新leader中一定有锁信息,process2就无法获取锁。

但是呢实际上公司大多数用redis作为分布式锁。因为zookeeper作为分布式锁,其请求锁时保证集群半数结点同步数据成功,这个过程是非常耗时间的,非常影响性能。

  1. redlock方案

其实底层也是借鉴了zookeeper的半数机制概念。

image-20221228205410925

三个redis结点是对等的。建立锁时,要超过半数以上的结点成果获取锁(相比于zookeeper效率高),才返回获取所成功信息。否则获取锁失败。

但是实际上,这种方案也是不建议使用的。

  • 高可用问题

上述的是3个单节点,那么如果超过半数以上结点挂了,整个集群就没了。所以我们会考虑主从架构问题,让每个结点再拉一个slave。

但是这是有问题的。

image-20221228215647698

如图所示,如果process1成功在master1、master2上成功加锁,当slave2还没有成功获取数据时,此时master2宕机,即选举新结点slave2作为新master,此时process2来获取锁,因为新的master2没有之前的锁信息,process2成功再master3和master2上加锁成功,固造成锁失效问题。

其实也就重现了redisson那个主从问题

那怎么办呢?不加slave,我们多加几个master吧,设置20个,总不会挂10个吧。确实可以,但是要考虑性能问题,redis加锁要加十个以上,加这么做锁不直接退化成了zookeeper低效率了吗。我们使用redis就是为了高性能,这样我们还不如使用zookeeper呢?

  • 持久化问题

通常我们aof持久化都是选用everyseconds每秒存储到aof文件。但是这样可能会丢失1s的数据。

image-20221228220210570

如图,当process1在master1和master2加锁成功,那么在master2还没有持久化时(没过1s),突然宕机,然后又重启,此时master2上是没有锁信息的,这时process2成功在master2和master3上加到数据。固锁失效

所以redlock的问题非常多,不建议高并发下使用。

至今,redis作为分布式锁还没有完全正确的方案。

# 5.分布式锁怎么进行优化

加锁本质:并行变串行

与高并发架构有违背

优化方向

  • 锁的粒度:锁的粒度越小,性能会提高

  • 使用分段锁

比如银行系统

一个用户账户有1000元。此时一个用户发起转账。

我们一般我设置一个锁user_account锁锁住这个账户,然后进行扣款,那么这肯定就是一段时间内只有一个线程可以扣款这个用户。效率可能比较低

如果使用分段锁呢?

我们将1000元 - user_account分段为

100 - user_account_1

100 - user_account_2

100 - user_account_3

………………

100 - user_account_10

这样每次扣款,就可以通过一定的算法定位线程该去获取哪一把锁,只要扣减后不小于0,就扣款成功,否则回去下一把锁,这样的情况下,在有很多次用户扣款请求下,就会有10个线程并行去运行。

当然,我们应该将数据取到内存,在将其分为100、100多分这样才可以。

比如 key value

user_account_1:100

user_account_2:100

user_account_5:66

……

每次扣减在缓存中进行,最后当业务结束,在异步进行累加后保存到数据库

image-20221228211400197

类似ConcurrentHashMap底层就有分段锁,可以参考下1.7,1.8锁粒度更细

锁优化的参考:

https://zhuanlan.zhihu.com/p/86935799

https://blog.csdn.net/hbtj_1216/article/details/77161198

编辑 (opens new window)
上次更新: 2024/02/22, 14:03:19
集群下的redis2
raft协议浅析

← 集群下的redis2 raft协议浅析→

最近更新
01
spark基础
02-22
02
mysql读写分离和分库分表
02-22
03
数据库迁移
02-22
更多文章>
Theme by Vdoing | Copyright © 2019-2024 Evan Xu | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式