优惠券秒杀

今天继续来学习Redis实战中的优惠券秒杀场景下碰到的问题和解决的方案

全局唯一ID

优惠券在购买后会一个购买优惠券的订单ID,这个ID是要返还给用户的。这个ID也会被用于用户退款,或者商家自己查询到底被谁买了,那么问题就来了:

  • 如果使用MySQL数据库的自增id,那么总是从1开始,这样的数字过于简短,有太明显的规律,易于被用户和商业对手猜测出购买的数量等等
  • 如果数量规模过大,要分库分表的话,不同的表又是从1开始,这样就会出现都是优惠券订单表,但是id相同的情况,这是很不应该的

所以我们就应该构造一个全局ID生成器,我们便可以利用Redis的可increment特性,再拼接一些信息(不同公司规则不同),去产生一个全局唯一ID

实现方法:

1

成部分:符号位:1bit,永远为0

时间戳:31bit,以秒为单位,可以使用69年

序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

使用Redis实现全局唯一ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class RedisIdWorker {
/**
* 2022年1月1日0:0:0的时间戳
*/
public static final long BEGIN_TIMESTAMP = 1640995200L;
@Resource
private StringRedisTemplate stringRedisTemplate;
public long nextId(String keyPrefix){
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowTimestamp = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowTimestamp - BEGIN_TIMESTAMP;
// 2.生成序列号
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
Long incrCount = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3.拼接并返回
return timestamp << 32 | incrCount;
}
}

在上面的代码中,使用long型的时间戳(当前时间戳与指定时间戳的差值),左移32位,再或运算上Redis中increment后生成的序列号,则可生成全局唯一ID

代码实现优惠券秒杀

这里的业务逻辑较为简单,就不多赘述,只需注意判断是否有库存即可

2

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Transactional
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券信息
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀还未开始");
}
// 3.秒杀没有开始或已结束,返回异常结果
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
// 4.秒杀正在进行,判断库存
Integer stock = seckillVoucher.getStock();
// 5.无库存,返回异常结果
if(stock < 1){
return Result.fail("该优惠券已经抢光");
}
// 6.有库存
// 6.1扣减库存
boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id",voucherId).update();
// 6.2扣减失败
if(!success){
return Result.fail("库存不足");
}
// 6.3创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderId);
}

库存超卖问题

超卖问题分析

我们现在来分析一下原有逻辑的问题,在下面的代码中,我们使用了<1作为库存量的判断。

假设库存还剩1件,现在有两个线程:第一个线程查到了库存还剩1,但是还未删减库存时,另一线程也查到了库存为1,这下第一个线程先-1,由于后一个线程也查到了库存可用,也会-1,就会出现库存的超卖问题。

1
2
3
4
// 5.无库存,返回异常结果
if(stock < 1){
return Result.fail("该优惠券已经抢光");
}

同样,我们使用JMeter来模拟200个人同时购买这个优惠券的情况,如图所示,会出现部分成功部分失败的情况,因为库存只有100个,只有成功的才会返回了订单ID

4

但是,并没有达到我们的预期,错误率只为45.5%而不是50%,证明还是有部分不应该成功的请求成功了

5

我们回到数据库中查看,此时的库存变成了-9,有109个人买到了库存仅为100的优惠券,这是绝对不允许发生的

6

超卖问题解决办法

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:见下图:

3

悲观锁:

悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等

乐观锁:

乐观锁:会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过,当然乐观锁还有一些变种的处理方式比如cas

之前用到的避免缓存击穿的互斥锁就是一个典型的悲观锁,所以我们尝试用乐观锁来解决炒卖问题

乐观锁解决超卖问题

乐观锁的关键是判断之前查询到的数据有没有被修改过,常见的方法有两种

77

8

在本案例中,由于是库存,我们可以直接把stock(库存)当作是版本号,就不用新加版本号了

1
2
3
4
5
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1").
eq("voucher_id",voucherId).
eq("stock",stock)//加上库存判断where stock = oldStock
.update();

再次执行测试,发现错误率高达92.50%,仅有200人中只有15人成功了!!

9

这是乐观锁的一个弊端,在并发线程中,同时访问到相同库存的,只有第一个可以改,其他的都发现库存被改过了,然后就不能去更改,相当于直接取消了,所以失败率大大升高

1
2
//eq("stock",stock)
gt("stock",0)//去掉判断stock和原来相等,取而代之的是stock>0

此时刚好库存用完,成功率100/200=0.5,达到预期,一张不超一张不少,且前100个请求成功,符合逻辑

10

当然还有其他乐观锁的解决方案,但是这里的库存比较简单,就没有用上。此方案还是查询到了数据库,对于高高高并发的场景可能还会出现问题,继续学习吧,方法总比困难多。

一人一单

前面我们并没有对用户进行限制,而且为了方便,也是拿了一个token(用户)进行的测试。

实际情况中,秒杀情况正常下不允许一人买多单,出现囤货情况,避免黄牛等等

  • 最简单的方法,我们判断这个userId和voucherId是否在order表中出现
1
2
3
4
5
6
7
8
9
// 5.一人一单逻辑
// 5.1.用户id
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

代码完成后执行,还是存在问题:并发查询数据库都显示不存在购买记录的情况,所以我们应该给他加上一个锁

乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作

注意:在这里提到了非常多的问题,我们需要慢慢的来思考,首先我们的初始方案是封装了一个createVoucherOrder方法,同时为了确保他线程安全,在方法上添加了一把synchronized 锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {

Long userId = UserHolder.getUser().getId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}

但是这样添加锁,锁的粒度太粗了,在使用锁过程中,控制锁粒度 是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会锁住,所以我们需要去控制锁的粒度。

以下这段代码需要修改为:
intern() 这个方法是从常量池中拿到数据,如果我们直接使用userId.toString() 他拿到的对象实际上是不同的对象,new出来的对象,我们使用锁必须保证锁必须是同一把,所以我们**需要使用intern()**方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()){
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}
}

但是以上代码还是存在问题,问题的原因在于当前方法被spring的事务控制。如果在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题。所以我们选择将当前方法整体包裹起来,确保事务不会出现问题:如下:

在seckillVoucher 方法中,添加以下逻辑,这样就能保证事务的特性,同时也控制了锁的粒度

1
2
3
4
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
return createVoucherOrder(voucherId);
}

但是以上做法依然有问题,因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务

  • 引入依赖
1
2
3
4
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

给启动类加上@EnableAspectJAutoProxy

1
@EnableAspectJAutoProxy(exposeProxy = true)

在IVoucherOrderService接口中声明方法

1
Result createVoucherOrder(Long voucherId);

最后修改业务代码

1
2
3
4
5
6
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 获取与事务相关的代理对象
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}

运行测试,同ID只有一个请求成功,功能实现

11

集群环境下的并发问题

通过加syn锁可以解决在单机情况下的一人一单,但是到集群就不行了

  • 在IDEA中开启两个服务
12
  • 在nginx中开启负载均衡,请求到后端两台服务器上
1
2
3
4
5
6
         proxy_pass http://backend;

upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
}

然后我们使用Apifox请求两次8080前端端口,请求会分别发送到8081服务器和8082服务器,会出现一人两单的情况

原因分析:

由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的JVM,JVM中有自己的锁监视器。

那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的。

但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是集群环境下,syn锁失效的原因。

在这种情况下,我们就需要使用分布式锁来解决这个问题,实现不同JVM使用同一个锁

13

分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁的基本原理

synchronized只能保证当前JVM中的线程互斥,而没有办法让集群下的多个JVM中线程互斥

分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路

分布式锁的实现方式

常见的分布式锁有三种

Mysql:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见

Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁

Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述

14

Redis分布式锁的核心思路

实现分布式锁时需要实现的两个基本方法:

  • 获取锁:

    • 互斥:确保只能有一个线程获取锁
    • 非阻塞:尝试一次,成功返回true,失败返回false(阻塞式会浪费CPU资源)
  • 释放锁:

    • 手动释放
    • 超时释放:获取锁时添加一个超时时间(兜底,避免宕机没有完成手动释放)

还是基于Redis的SetNX命令来获取锁,DEL命令来释放锁

实现初级Redis分布式锁

定义ILock接口,定义两个抽象方法:tryLock()方法和unlock()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @Author JunWei Li
* @Date 2024-07-24 9:56
*/
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁的过期时间,过期后自动释放
* @return true表示获取锁成功,false表示获取锁失败
*/
boolean tryLock(long timeoutSec);

/**
* 释放锁
*/
void unlock();
}

定义ILock接口的实现类SimpleRedisLock,其中key为lock:order:userId,value为当前线程ID

利用setnx方法进行加锁,同时增加过期时间,防止死锁,此方法可以保证加锁和增加过期时间具有原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* @Author JunWei Li
* @Date 2024-07-24 9:58
*/
public class SimpleRedisLock implements ILock{
private final String name;
private final StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
public static final String LOCK_PREFIX = "lock:";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程
long threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + name,String.valueOf(threadId), timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(success);
}

@Override
public void unlock() {
// 释放锁
stringRedisTemplate.delete(LOCK_PREFIX + name);
}
}

修改业务代码,取消synchronized锁,取而代之的是我们自己创建的Redis分布式锁对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 获取锁
boolean isLock = lock.tryLock(20000);
if(!isLock){
return Result.fail("此优惠券限购一单");
}
try {
// 获取与事务相关的代理对象
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
lock.unlock();
}

至此,我们就完成了初级的Redis分布式锁实现

Redis分布式锁误删情况说明

逻辑说明:

持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放。这时其他线程,线程2来尝试获得锁,就拿到了这把锁。然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况

15

解决方案:每个线程释放锁的时候,去判断一下当前这把锁是否属于自己。如果不属于自己,则不进行锁的删除。

  • 存入线程标识

16

解决Redis分布式锁误删问题

改造tryLock()方法和unlock()方法

  • 获取锁前存入线程标识(原先使用ThreadId,但是问题是不同JVM中线程都是自增的,所以会有重复,可以使用UUID结合一下)
  • 释放锁时现判断锁是不是自己的

使用UUID生成一个不带“-”(传入true参数)的随机值

1
public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

修改tryLock()方法

1
2
3
4
5
6
7
public boolean tryLock(long timeoutSec) {
// 获取线程标识,拼上UUID
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + name,threadId, timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(success);
}

修改unlock()方法

1
2
3
4
5
6
7
8
9
10
11
public void unlock() {
// 获取锁
String threadId = ID_PREFIX + Thread.currentThread().getId();
String id = stringRedisTemplate.opsForValue().get(LOCK_PREFIX + name);
// 判断锁是否是自己的
if(!threadId.equals(id)){
return;
}
// 释放锁
stringRedisTemplate.delete(LOCK_PREFIX + name);
}

至此,误删问题就可以得到解决

分布式锁的原子性问题

更为极端的误删逻辑说明:

线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的正准备删除锁,但是此时他的锁到期了。那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题。

之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的

17

Lua脚本解决多条命令原子性问题

我们想要比较锁和删除锁的操作具有原子性,就需要用到Lua脚本

Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html

这里不对Lua语言进行深究,只学习怎么写Lua去操作Redis,保证原子性

  • Redis提供的调用函数
1
redis.call('命令名称', 'key', '其它参数', ...)
  • 在Redis中使用EVAL调用脚本,例如下面的脚本可以添加name-Rose,age-12两个key-value
1
EVAL "return redis.call('set',KEYS[2],ARGV[2])" 2 name age Rose 12
  • key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数
  • 调用函数和参数类型间的数字代表脚本想要的key类型的参数个数

回顾释放锁的业务流程:

  1. 获取锁中的线程标识

  2. 判断是否与指定的标示(当前线程标示)一致

  3. 如果一致则释放锁(删除)

  4. 如果不一致则什么都不做

对应的Lua脚本

1
2
3
4
if(redis.call('get',KEYS[1]) == ARGV[1]) then
return redis.call('del',KEYS[1])
end
return 0

利用Java代码调用Lua脚本改造分布式锁

在RedisTemplate中有这样一个方法,对应着EVAL命令

1
2
3
4
5
6
7
8
/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.RedisOperations#execute(org.springframework.data.redis.core.script.RedisScript, java.util.List, java.lang.Object[])
*/
@Override
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
return scriptExecutor.execute(script, keys, args);
}

在IDEA中下载EmmyLua插件方便写Lua脚本代码,并插件unlock.lua文件,写入代码

18

Lua脚本为一个文件,到用的时候才初始化读取是不应该的,我们应该事先读取好,所以可以使用static在类加载时静态初始化

1
2
3
4
5
6
7
8
public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
// Lua脚本初始化
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// 到resource文件夹中读取文件
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

修改unlock()方法,取代原先代码,可以看到使用Lua脚本就只有一行代码,很好的保证了原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    public void unlock() {
// 调用Lua脚本,不需要管返回值,成功与否由系统判定
stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(LOCK_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
// // 获取锁
// String threadId = ID_PREFIX + Thread.currentThread().getId();
// String id = stringRedisTemplate.opsForValue().get(LOCK_PREFIX + name);
// // 判断锁是否是自己的
// if(!threadId.equals(id)){
// return;
// }
// // 释放锁
// stringRedisTemplate.delete(LOCK_PREFIX + name);
}
}

到此,我们就实现了一个生产可用、相对完善的锁了。

小总结

基于Redis的分布式锁实现思路:

  • 利用set nx ex获取锁,设置过期时间,保存线程标识
  • 释放锁现判断线程标识是否与自己的一致,避免锁误删

特性:

  • 利用set nx满足互斥性
  • 利用set ex保证故障时锁仍能释放,避免死锁
  • 利用Lua脚本保证释放锁的原子性
  • 利用Redis集群保证高可用和高并发

分布式锁-Redisson

Redisson: Easy Redis Java client and Real-Time Data Platform是 Redis Java 客户端和实时数据平台。

Redisson 对象提供了关注点分离,使您可以专注于数据建模和应用程序逻辑。

基于setnx实现的分布式锁存在的问题

重入问题:获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。

不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。

超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患

主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

使用现成的分布式锁工具Redisson可以很好的解决这些问题

19

Redisson快速入门

在pom中引入Redisson依赖

1
2
3
4
5
6
<!-- Redission-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.33.0</version>
</dependency>

创建RedissonConfig类,完成Redisson中redis的连接配置

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @Author JunWei Li
* @Date 2024-07-25 9:38
*/
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.228.128:6379").setPassword("junwei.com");
return Redisson.create(config);
}
}

使用Redisson创建的锁代替我们写的SimpleRedisLock

1
2
@Resource
private RedissonClient redissonClient;
1
2
3
4
5
6
// 创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 获取锁
// boolean isLock = lock.tryLock(20000);
boolean isLock = lock.tryLock();

其中Redisson的tryLock()方法中可以接收三个参数,分别为获取锁的最大等待时间锁自动释放时间时间单位

20

Redisson可重入锁原理

在分布式锁中,他采用hash结构用来存储锁,其中key表示表示这把锁是否存在,用field表示当前这把锁被哪个线程持有

流程图如下,需要注意的是值为0才可以释放锁,不为0时要刷新时间

21

获取锁的Lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local key = KEYS[1]; --锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists',key) == 0) then
-- 不存在,获取锁
redis.call('hset',key,threadId,'1');
-- 设置有效期
redis.call('expire',key,releaseTime);
-- 返回结果
return 1;
end
-- 锁已存在,判断threadId是否是自己
if(redis.call('hexists',key,threadId) == 1) then
-- 存在,获取锁,重入次数+1
redis.call('hincrby',key,threadId,1);
-- 重置有效期
redis.call('expire',key,releaseTime);
return 1;
end
-- 锁不是自己的,获取锁失败
return 0;

释放锁的Lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
local key = KEYS[1]; --锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断锁是否为自己持有
if(redis.call('hexists',key,threadId) == 0) then
-- 锁不是自己的返回nil
return nil;
end
-- 是自己的锁,则重入次数-1
local count = redis.call('hincrby',key,threadId,-1);
-- 判断重入次数是否为0
if(count > 0) then
-- 大于0说明还不能释放锁,刷新有效期后返回
redis.call('expire',key,releaseTime);
return nil;
else
-- 等于0说明可以释放锁
redis.call('del',key);
return nil
end

Redission锁重试和WatchDog机制

  • 重试:在第一次尝试锁失败以后,不会立刻失败,而是去做一个等待,去订阅和等待释放锁的消息,利用PubSub锁的机制实现等待、唤醒,在其他线程释放锁的时候会去发送一条锁已可用的消息,可以被等待的线程捕获到,那么就可以重新获取锁了,再次获取又失败了又继续等待。当然不是无限次尝试,会有一个等待的时间,如果说超过了这个时间,就不重试了。

  • 锁超时释放:在没有传入过期时间时,在获取锁成功后会由WatchDog开启一个定时任务,每隔一段时间就会去重置锁的有效时间,那么锁的时间就会重新计时

25

watchdog的默认过期时间为30秒,而规定了刷新时间为internalLockLeaseTime / 3

22

Redisson源码中的Lua脚本:

22

23

在释放锁的Lua脚本中redis.call('publish', KEYS[2], ARGV[1]); 会向订阅者发送锁已经释放的消息,那么订阅者就可以试着拿锁

如果是没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间 commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()

ttlRemainingFuture.onComplete((ttlRemaining, e) 这句话相当于对以上抢锁进行了监听,也就是说当上边抢锁完毕后,此方法会被调用,具体调用的逻辑就是去后台开启一个线程,进行续约逻辑,也就是看门狗线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}

// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;

此逻辑就是续约逻辑,注意看commandExecutor.getConnectionManager().newTimeout() 此方法

Method( new TimerTask() {},参数2 ,参数3 )

指的是:通过参数2,参数3 去描述什么时候去做参数1的事情,现在的情况是:10s之后去做参数一的事情

因为锁的失效时间是30s,当10s之后,此时这个timeTask 就触发了,他就去进行续约,把当前这把锁续约成30s,如果操作成功,那么此时就会递归调用自己,再重新设置一个timeTask(),于是再过10s后又再设置一个timerTask,完成不停的续约

假设我们的线程出现了宕机,因为没有人再去调用renewExpiration这个方法,所以等到时间之后自然就释放了,也不会尝试死锁问题

WatchDog

在Redisson的源码中,拿到锁之后开启watchdog(看门狗)更新有效期的主要原因是为了防止由于锁过期而导致的锁丢失问题,从而确保锁的可靠性。具体原因如下:

  1. 自动续期:当一个线程获取到分布式锁时,会设置一个初始的过期时间。如果在锁持有期间,该线程由于某些原因(例如长时间的业务处理、网络延迟等)未能及时释放锁,那么锁就有可能在过期后自动释放,从而被其他线程获取。为了避免这种情况,Redisson引入了watchdog机制。
  2. 保持锁的有效性:watchdog会定期(默认每隔30秒)检查并延长锁的过期时间,确保在锁持有期间锁不会意外释放。这样,即使业务处理时间较长,也能保证锁一直由当前线程持有,不会被其他线程误抢。
  3. 防止死锁:虽然watchdog能有效延长锁的持有时间,但也设置了一个锁的最大持有时间(默认为30分钟)。即使出现了极端情况,比如持有锁的线程发生了故障,也能通过这个机制防止死锁的发生,确保系统的健壮性。

Redisson中watchdog机制的实现原理如下:

  • 当一个线程获取到锁时,会启动一个看门狗定时任务。
  • 该定时任务会定期检查当前线程是否仍然持有锁,如果是则续期。
  • 续期操作会不断延长锁的有效期,直到锁被显式释放或者超过最大持有时间为止。

这个机制保证了在分布式环境中,锁能够更加可靠地被持有和释放,避免了由于锁过期导致的并发问题。

Redission锁的MutiLock

多个独立的Redis节点,必须在所有节点都重入锁,才算获取锁成功。简而言之,每个Redis节点都看看有没有这把锁,都有才行

  • 以主从为例:我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
26
  • 解决方法

Redission提出来了MutiLock锁,使用这把锁就不使用主从了,每个节点的地位都是一样的。

这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

27

MutiLock(联锁)原理

multiLock 的核心思想是将多个锁组合在一起,以确保这些锁在同一个操作中被同时获取和释放。具体来说,multiLock 使用 ArrayList 存储每一个锁,并按照特定的逻辑依次尝试获取和释放这些锁。

  1. 创建 MultiLock 对象

首先,通过传入多个 RLock 对象来创建一个 RedissonMultiLock 实例。以下是一个简单的示例:

1
2
3
4
5
RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RLock lock3 = redisson.getLock("lock3");

RedissonMultiLock multiLock = new RedissonMultiLock(lock1, lock2, lock3);
  1. 存储锁对象

RedissonMultiLock 内部使用一个 ArrayList 来存储传入的锁对象。这些锁对象会在获取和释放锁时被依次处理

1
2
3
4
5
6
7
private final List<RLock> locks = new ArrayList<>();

public RedissonMultiLock(RLock... locks) {
for (RLock lock : locks) {
this.locks.add(lock);
}
}
  1. 获取锁

multiLocklock 方法会依次尝试获取所有存储在 ArrayList 中的锁。如果某一个锁获取失败会释放已经获取的所有锁,并返回获取失败的状态。

在获取锁失败的时候还会对失败次数继续判断:

1
locks.size() - acquiredLocks.size() == failedLocksLimit()

failedLocksLimit 方法可能返回一个允许的锁获取失败的数量上限。例如,如果 failedLocksLimit 返回 1,表示在获取多个锁时,允许有一个锁获取失败。这个机制的主要目的可能是为了实现某种宽松的锁策略,在某些场景下,即使部分锁获取失败,也可以继续执行后续操作,即允许某个不太重要的资源锁获取失败。

  1. 释放锁

multiLockunlock 方法会依次释放所有存储在 ArrayList 中的锁。

1
2
3
4
5
public void unlock() {
for (RLock lock : locks) {
lock.unlock();
}
}

实现细节

  1. 顺序处理:获取和释放锁时,按照锁在 ArrayList 中的顺序依次处理,确保每个锁的获取和释放都是有序的。
  2. 超时处理:在尝试获取多个锁时,会计算剩余的等待时间,确保在指定的时间内尝试获取所有锁
  3. 失败处理:如果在获取锁的过程中任意一个锁获取失败,会释放已经获取的所有锁,以避免资源的死锁和浪费。

小总结

不可重入Redis分布式锁:

  • 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示

  • 缺陷:不可重入,无法重试,锁超时失效,主从一致性问题

可重入的Redis分布式锁(Redisson):

  • 原理

    • 可重入:利用hash结构记录线程id和重入次数
    • 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
    • 超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间
    • 主从一致性:使用Redisson的multiLock,采用多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
  • 前三个容易出现缺陷:redis宕机引起锁失效问题

  • 使用Redisson的multiLock的运维成本高,实现复杂

Redis秒杀优化

之前的秒杀流程图如下,6个过程是串行执行的,并且有4个过程是需要操作到数据库的,甚至最后两个过程还是数据库的写操作

28

优化方案:我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池

29

  • 难点1:是我们怎么在redis中去快速校验一人一单,还有库存判断

  • 难点2:是由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时将这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。

秒杀优化实现思路

根据Lua脚本中不同的返回值完成业务的判断,开启新线程进行异步下单

库存使用String类型存储,实现自减;下单的用户id存入Set集合,具有不可重复的特性

30

Redis完成库存存储和秒杀资格判断

需求:

  • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中

  • 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

新建秒杀优惠券时同步写入Redis

1
2
3
4
5
6
7
8
    @Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// …………保存秒杀优惠券到数据库
//将优惠券库存写入Redis
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY +voucher.getId(),voucher.getStock().toString());
}
}

基于Lua脚本判断库存和一人一单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 1.参数列表
local voucherId = ARGV[1];
local userId = ARGV[2];
-- 2.数据key
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
-- 判断库存
if (tonumber(redis.call('get', stockKey)) <= 0) then
return 1;
end
-- 判断是否第一次下单
if (redis.call('SISMEMBER', orderKey, userId) == 1) then
return 2;
end
-- 扣库存
redis.call('incrby', stockKey, -1)
-- 下单
redis.call('sadd', orderKey, userId)
-- 返回0表示正常完成
return 0;

完善业务代码并进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public Result seckillVoucher (Long voucherId){
// 1.执行Lua脚本
Long result = stringRedisTemplate
.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), UserHolder.getUser().getId().toString());
// 2.判断结果 1->库存不足 2->已购买过 0->购买成功
int res = result.intValue();
if(res != 0){
return Result.fail(res == 1?"库存不足":"每个用户限购一单");
}
long orderId = redisIdWorker.nextId("order");
// TODO:保存阻塞队列
return Result.ok(orderId);
}

到Navicat中检查Redis,确认Lua脚本执行无误

31

至此,我们完成了前两步,接下来继续完成后两步

基于阻塞队列完成异步秒杀优化

我们继续实现后两步

  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

创建阻塞队列

1
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

创建异步线程池

1
public static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

定义proxy为变量,与ThreadLocal同理,拿不到代理对象,只能给值

1
private IVoucherOrderService proxy;

修改前两步未完成seckillVoucher()方法,补上放入阻塞队列的代码,并获取代理对象赋值

1
2
3
4
5
6
7
8
9
10
// 创建订单ID,封装订单信息
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 放入阻塞队列
orderTasks.add(voucherOrder);
// 获取与事务相关的代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();

@PostConstruct注解在类加载时执行,死循环一致尝试读取阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
try {
// 获取阻塞队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常",e);
}
}
}
}

修改订单处理代码,其中的锁代码可以省略,Redis已经判断过了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// 只能从voucherOrder中取用户ID,因为这个是单独子线程,拿不到ThreadLocal中的值
Long userId = voucherOrder.getUserId();
// 创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("不允许重复下单");
return;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}

修改创建订单代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1").
eq("voucher_id", voucherOrder.getVoucherId()).
gt("stock",0)
.update();
// 6.2扣减失败
if(!success){
log.error("库存不足");
}
save(voucherOrder);
}

小总结

秒杀业务的优化思路

  • 先利用Redis完成库存余量、一人一单判断,完成抢单业务
  • 再将下单业务放入阻塞队列,利用独立线程异步下单
  • 基于阻塞队列的异步秒杀存在哪些问题?
    • 内存限制问题(手动设置了阻塞队列的长度,容易溢出)
    • 数据安全问题
      • 数据存储在Redis中,宕机后数据丢失。
      • 阻塞队列存在JVM中,JVM宕机时阻塞队列全部订单信息会丢失

Redis消息队列

消息队列:存放消息的队列

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息
32

使用队列的好处在于解耦:快递员会把快递放到菜鸟驿站,然后再由菜鸟驿站通知您的快递到达菜鸟了,而快递员无需等待,就可以去送下一批货,而不是说你不在家就一直等你。

由于这门课是学习Redis的,所以我们使用Redis实现消息队列,对于中小型企业或者不是大型项目,Redis已经足够了。

如果要求更高的话,可以使用RabbitMQ: One broker to queue them all或者Apache Kafka,功能更加强大,可以去学习2024最新SpringCloud微服务开发与实战,java黑马商城项目微服务实战开发(涵盖MybatisPlus、Docker、MQ、ES、Redis高级等)

基于List实现的消息队列

Redis的list数据结构是一个双向链表,很容易模拟出队列效果。搭配LPUSH和RPOP或者RPUSH和LPOP就好了

基于List的消息队列有哪些优缺点?
优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub实现的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。Commands | Docs (PubSub)

  • SUBSCRIBE channel [channel] :订阅一个或多个频道
  • PUBLISH channel msg:向一个频道发送消息
  • PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
    • Supported glob-style patterns:
      • h?llo subscribes to hello, hallo and hxllo
      • h*llo subscribes to hllo and heeeello
      • h[ae]llo subscribes to hello and hallo, but not hillo

基于PubSub的消息队列有哪些优缺点?
优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。Commands | Docs (Stream)

  • XADD发送消息

33

  • XREAD读取消息

34

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

35

  • 创建消费者组:
1
XGROUP CREATE key groupName Id [MKSTREAM]

key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列

  • 删除指定的消费者组
1
XGROUP DESTORY key groupName
  • 给指定的消费者组添加消费者
1
XGROUP CREATECONSUMER key groupname consumername
  • 删除消费者组中的指定消费者
1
XGROUP DELCONSUMER key groupname consumername
  • 从消费者组读取消息:
1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称

  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者

  • count:本次查询的最大数量

  • BLOCK milliseconds:当没有消息时最长等待时间

  • NOACK:无需手动ACK,获取到消息后自动确认

  • STREAMS key:指定队列名称

  • ID:获取消息的起始ID:

    • “>”:从下一个未消费的消息开始
    • 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

逻辑伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
while(true){
// 读取监测实例,返回批处理消息,最长等待 2000 毫秒
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
if(msg == null){ // null说明没有消息,继续下一次
continue;
}

try {
// 处理消息,完成后一定要ACK
handleMessage(msg);
} catch(Exception e) {
while(true) {
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if(msg == null){ // null说明没有异常消息,所有消息均已确认,结束循环
break;
}

try {
// 说明有异常消息,再次处理
handleMessage(msg);
} catch(Exception e) {
// 再次出现异常,记录日志,继续循环
continue;
}
}
}
}

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

各种方式实现消息队列的对比

36

基于Stream结构作消息队列,实现异步秒杀

实现步骤:

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
  1. 在Redis中新建一个stream类型的消息队列
37
  1. 修改Lua脚本,添加两行代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 1.参数列表
local voucherId = ARGV[1];
local userId = ARGV[2];
local orderId = ARGV[3];//**新增**
-- 2.数据key
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
-- 判断库存
if (tonumber(redis.call('get', stockKey)) <= 0) then
return 1;
end
-- 判断是否第一次下单
if (redis.call('SISMEMBER', orderKey, userId) == 1) then
return 2;
end
-- 扣库存
redis.call('incrby', stockKey, -1)
-- 下单
redis.call('sadd', orderKey, userId)
-- 发送消息到队列中
redis.call('XADD', 'stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId);//**新增**
-- 返回0表示正常完成
return 0;
  1. 修改操作Redis的代码,因为Lua中新增了将orderId需要传入,才能确保订单信息被存入消息队列时是完整的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public Result seckillVoucher (Long voucherId){
// 创建订单ID,封装订单信息
long orderId = redisIdWorker.nextId("order");
Long userId = UserHolder.getUser().getId();
// 1.执行Lua脚本
Long result = stringRedisTemplate
.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(),String.valueOf(orderId));
// 2.判断结果 1->库存不足 2->已购买过 0->购买成功
int res = result.intValue();
if(res != 0){
return Result.fail(res == 1?"库存不足":"每个用户限购一单");
}
// 获取与事务相关的代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}
  1. 修改单线程拿取消息队列中数据的代码,使用StringRedisTemplate中的opsForStream()方法操作Stream类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
try {
// 获取阻塞队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
String queueName = "stream.orders";
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed()));
// 获取消息失败
if(list == null || list.isEmpty()){
continue;
}
// 获取消息成功,可以下单
// 从消息list中拿到消息记录,获取第一条
MapRecord<String, Object, Object> record = list.get(0);
// 获取出其中的键值对Map
Map<Object, Object> value = record.getValue();
// 转化为Bean对象
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 下单业务
handleVoucherOrder(voucherOrder);
// ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理订单异常",e);
handlePendingList();
}
}
}
  1. 新增handlePendingList()方法,处理已经读取消息但未ACK的情况,从pending-list中重新尝试拿到数据写入数据库并ACK
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void handlePendingList() {
while(true){
try {
// 获取阻塞队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0 取消等待,从0开始读
String queueName = "stream.orders";
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0")));
// 获取消息失败,说明pending-list中没有消息
if(list == null || list.isEmpty()){
break;
}
// 获取消息成功,可以下单
// 从消息list中拿到消息记录,获取第一条
MapRecord<String, Object, Object> record = list.get(0);
// 获取出其中的键值对Map
Map<Object, Object> value = record.getValue();
// 转化为Bean对象
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 下单业务
handleVoucherOrder(voucherOrder);
// ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理订单异常",e);
}
}
}

执行代码检查无误,库存正常扣减,数据库写入正常,消息队列中保存了order消息

38

Redis消息队列中的订单ID与实际存入数据库中的一致

39