day08-优惠券秒杀04

功能03-优惠券秒杀04

4.功能03-优惠券秒杀

4.7Redis优化秒杀

4.7.1优化分析

现在来回顾一下优惠券秒杀业务的两个主要问题:

(1)首先是对优惠券的扣减,需要防止库存超卖现象;

(2)其次,需要对每个用户下单数量进行限制,实现一人一单的功能。

处理秒杀优惠券的业务:

  1. 先根据获取到的优惠券id,先到数据库中判断是否存在,若存在;

  2. 再判断优惠券是否在设定的有效期,如果是,则进行一人一单的业务处理:

    • 2.1 利用分布式锁,key存储的是order+用户id:当同一时间,一个用户发起了多个线程请求,其中的某个线程获取到了锁,由于互斥性,无论这个用户发起了多少个请求,只有一个线程能进入接下来的业务。(不同用户发起的不同线程之间不影响)

    • 2.2 接下来,查询该用户是否已经买过这张秒杀券了,如果买过了,则不允许重复购买,如果是第一次购买,就进入到防止超卖的业务:

      • 2.2.1 到这一步可能会有多个用户的单个线程进入这个业务,为了防止超卖问题,这里使用乐观锁方案。乐观锁的关键是判断之前查询到的数据是否有被修改过,但缺点是失败率高,因此我们又使用了mysql的行锁解决。(详见day05-优惠券秒杀01)

因为整个过程有很多对数据库的操作(查询优惠券、查询订单、减库存、创建订单),因此这个业务的性能并不是很好:

优化前:

上述业务看似复杂,实际上只有两个过程:(1)对于用户资格的校验:库存够不够,该用户买没买过(一人一单)(2)然后才是真正的下单业务。

我们可以对这两个过程进行分离,别分使用两个线程进行操作:主线程负责对用户购买资格的校验,如果有购买的资格,再开启一个独立的线程,来处理耗时较久的减库存和创建订单操作。

为了提高效率,使用redis判断秒杀库存和校验一人一单,如果校验通过,则redis会记录优惠券信息、用户信息、订单信息到阻塞队列。一方面:tomcat服务器去读取这个队列的信息,完成下单。另一方面:redis给用户返回一个订单号,代表该用户抢单成功,用户可以根据这个订单号去付款。

优化后:

这样,整个秒杀流程就变为:直接在redis中判断用户的秒杀资格和库存,然后将信息保存到队列里。

秒杀业务的流程变短了,而且是基于Redis,性能得到很大的提升,整个业务的吞吐能力、并发能力可以大大提高了。

那么,如何在Redis中完成对秒杀库存的判断和一人一单的判断呢?

首先是对数据的存储:

  1. 使用String类型,key存储 业务前缀+秒杀券id,value保存优惠券对应的库存;
  2. 因为要保证一人一单,使用set类型,key保存业务前缀+秒杀券id,value保存下单的用户id,保证元素不可重复。

优化后,在Redis中需要执行的具体流程:

异步秒杀优化总结:

上述的优化操作,一方面缩短了秒杀业务的流程,从而大大提高了秒杀业务的并发;另一方面,redis的操作和数据库的操作是异步的,对数据库操作的时效性不再要求那么高了,减轻了数据库的压力。

4.7.2代码实现

改进秒杀业务,提高并发性能。需求:

  1. 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
  2. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢占成功
  3. 如果抢占成功,将优惠券id和用户id封装后存入阻塞队列
  4. 开启线程任务,不断地从阻塞队列中获取信息,实现异步下单功能

需求1:新增秒杀优惠券的同时,将优惠券信息保存到Redis中

(1.1)修改IVoucherService

package com.hmdp.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.hmdp.dto.Result;
import com.hmdp.entity.Voucher;
/**
 * 服务类
 *
 * @author 李
 * @version 1.0
 */
public interface IVoucherService extends IService<Voucher> {
 void addSeckillVoucher(Voucher voucher);
}

(1.2)修改VoucherServiceImpl

package com.hmdp.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.entity.Voucher;
import com.hmdp.mapper.VoucherMapper;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherService;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import static com.hmdp.utils.RedisConstants.SECKILL_STOCK_KEY;
/**
 * 服务实现类
 *
 * @author 李
 * @version 1.0
 */
@Service
public class VoucherServiceImpl extends ServiceImpl<VoucherMapper, Voucher> implements IVoucherService {
 @Resource
 private ISeckillVoucherService seckillVoucherService;
 @Resource
 private StringRedisTemplate stringRedisTemplate;
 @Override
 @Transactional
 public void addSeckillVoucher(Voucher voucher) {
 // 保存优惠券到数据库
 save(voucher);
 // 保存秒杀优惠券信息到数据库
 SeckillVoucher seckillVoucher = new SeckillVoucher();
 seckillVoucher.setVoucherId(voucher.getId());
 seckillVoucher.setStock(voucher.getStock());
 seckillVoucher.setBeginTime(voucher.getBeginTime());
 seckillVoucher.setEndTime(voucher.getEndTime());
 seckillVoucherService.save(seckillVoucher);
 //保存秒杀库存到Redis中
 stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
 }
}

(1.3)修改VoucherController

package com.hmdp.controller;
import com.hmdp.dto.Result;
import com.hmdp.entity.Voucher;
import com.hmdp.service.IVoucherService;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
 * 前端控制器
 *
 * @author 李
 * @version 1.0
 */
@RestController
@RequestMapping("/voucher")
public class VoucherController {
 @Resource
 private IVoucherService voucherService;
 
 /**
 * 新增秒杀券
 * @param voucher 优惠券信息,包含秒杀信息
 * @return 优惠券id
 */
 @PostMapping("seckill")
 public Result addSeckillVoucher(@RequestBody Voucher voucher) {
 voucherService.addSeckillVoucher(voucher);
 return Result.ok(voucher.getId());
 }
}

(1.4)使用postman进行测试,返回结过显示插入成功,data为插入的秒杀券的id

数据库和Redis中也分别插入成功了:

需求2:基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢占成功

在resources目录下新建一个Lua脚本

seckill.lua:

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1判断库存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0) then
 -- 3.2库存不足,返回1
 return 1
end
-- 3.3库存充足,判断用户是否下过单(判断用户id是否在订单key对应的集合中)
-- sismember orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then
 -- 3.4 若存在,说明是重复下单,返回2
 return 2
end
-- 3.5 扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.6 下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

需求3:如果抢占成功,将优惠券id和用户id封装后存入阻塞队列

需求4:开启线程任务,不断地从阻塞队列中获取信息,实现异步下单功能

修改VoucherOrderServiceImpl:

  1. 请求先来到seckillVoucher()方法,该方法先调用lua脚本,尝试判断用户有没有购买资格、库存是否充足。如果有,创建订单,放到阻塞对列中。此时整个秒杀业务就结束了,用户可以得到结果。
  2. 创建阻塞队列和线程池,在类初始化的时候就执行线程池。线程池的业务就是不断地从阻塞队列中获取订单信息,然后创建订单(调用handleVoucherOrder()方法)
package com.hmdp.service.impl;
import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.*;
/**
 * 服务实现类
 *
 * @author 李
 * @version 1.0
 */
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
 @Resource
 private ISeckillVoucherService seckillVoucherService;
 @Resource
 private RedisIdWorker redisIdWorker;
 @Resource
 private StringRedisTemplate stringRedisTemplate;
 @Resource
 private RedissonClient redissonClient;
 private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
 //类一加载就初始化脚本
 static {
 SECKILL_SCRIPT = new DefaultRedisScript<>();
 SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
 SECKILL_SCRIPT.setResultType(Long.class);
 }
 //阻塞队列:当一个线程尝试从队列中获取元素时,如果队列中没有元素,那么该线程就会被阻塞,直到队列中有元素,线程才会被唤醒并获取元素
 private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
 //线程池
 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
 //在当前类初始化完毕之后就执行
 @PostConstruct
 private void init() {
 SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
 }
 //执行异步操作,从阻塞队列中获取订单
 private class VoucherOrderHandler implements Runnable {
 @Override
 public void run() {
 while (true) {
 try {
 //1.获取队列中的订单信息
 /* take()--获取和删除阻塞对列中的头部,如果需要则等待直到元素可用
 (因此不必担心这里的死循环会增加cpu的负担) */
 VoucherOrder voucherOrder = orderTasks.take();
 //2.创建订单
 handleVoucherOrder(voucherOrder);
 } catch (Exception e) {
 log.error("处理订单异常", e);
 }
 }
 }
 }
 private IVoucherOrderService proxy;
 private void handleVoucherOrder(VoucherOrder voucherOrder) {
 //获取用户(因为目前的是线程池对象,不是主线程,不能使用UserHolder从ThreadLocal中获取用户id)
 Long userId = voucherOrder.getUserId();
 //创建锁对象,指定锁的名称
 RLock lock = redissonClient.getLock("lock:order:" + userId);
 //获取锁(可重入锁)
 boolean isLock = lock.tryLock();
 //判断是否获取锁成功
 if (!isLock) {
 //获取锁失败
 log.error("不允许重复下单");
 }
 try {
 proxy.createVoucherOrder(voucherOrder);
 } finally {
 //释放锁
 lock.unlock();
 }
 }
 @Override
 public Result seckillVoucher(Long voucherId) {
 //获取用户id
 Long userId = UserHolder.getUser().getId();
 //1.执行lua脚本
 Long result = stringRedisTemplate.execute(
 SECKILL_SCRIPT,
 Collections.emptyList(),
 voucherId.toString(),
 userId.toString()
 );
 //2.判断脚本执行结果是否为0
 int r = result.intValue();
 if (r != 0) {
 //2.1如果不为0,代表没有购买资格
 return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
 }
 //2.2如果为0,代表有购买资格,将下单信息保存到阻塞对列中
 VoucherOrder voucherOrder = new VoucherOrder();
 //设置订单id
 long orderId = redisIdWorker.nextId("order");
 voucherOrder.setId(orderId);
 //设置用户id
 voucherOrder.setUserId(userId);
 //设置秒杀券id
 voucherOrder.setVoucherId(voucherId);
 //将上述信息保存到阻塞队列
 orderTasks.add(voucherOrder);
 //3.获取代理对象
 proxy = (IVoucherOrderService) AopContext.currentProxy();
 //4.返回订单id
 return Result.ok(0);
 }
 @Transactional
 public void createVoucherOrder(VoucherOrder voucherOrder) {
 //一人一单
 Long userId = voucherOrder.getUserId();
 //查询订单
 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
 if (count > 0) {//说明已经该用户已经对该优惠券下过单了
 log.error("用户已经购买过一次!");
 return;
 }
 //库存充足,则扣减库存(操作秒杀券表)
 boolean success = seckillVoucherService.update()
 .setSql("stock = stock -1")//set stock = stock -1
 //where voucher_id =? and stock>0
 .gt("stock", 0).eq("voucher_id", voucherOrder.getVoucherId()).update();
 if (!success) {//操作失败
 log.error("秒杀券库存不足!");
 return;
 }
 //将订单写入数据库(操作优惠券订单表)
 save(voucherOrder);
 }
}

重启项目,进行测试:

(1)初始数据:

(2)使用jemeter进行测试:使用1000个不同的用户同时向服务器发送抢购秒杀券的请求

测试结果:可以看到平均响应实现为216毫秒,最小值为17毫秒,比之前平均500毫秒的响应时间缩短了一半。

4.7.3秒杀优化总结

(1)秒杀业务的优化思路是什么?

  1. 先利用Redis完成库存余量判断、一人一单判断,完成抢单业务
  2. 再将下单业务放入阻塞队列,利用独立线程异步下单

(2)基于阻塞队列的异步秒杀存在哪些问题?

  1. 内存限制问题:

    这里我们使用的是JDK里面的阻塞队列,它使用的是JVM里面的内存。如果不加以限制,在高并发的情况下,可能会有非常多的订单对象需要去创建,放入阻塞队列中,可能会导致内存溢出。虽然我们限制了队列的长度,但是如果队列存满了,再有新的订单来,就放不下了。

  2. 数据安全问题:

    现在的代码基于内存来保存订单信息,如果服务器宕机了,那么阻塞队列中的所有订单信息将会丢失

我们将在接下来的分析中对上述两个问题进行解决。

4.8Redis消息队列实现异步秒杀

要解决上面的两个问题,最佳的解决方案就是使用消息队列

4.8.1什么是消息队列

消息队列(Message Queue,简称MQ),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

4.8.2消息队列实现异步秒杀的优势

使用消息队列实现异步秒杀的优势:

  • 消息队列是JVM以外的独立服务,不受JVM内存的限制,这就解决了之前的内存限制问题
  • 消息队列不仅仅是做数据存储,它还要确保数据安全,即消息队列里的所有消息都要做持久化,这样不管是服务宕机还是重启,数据都不会丢失
  • 消息队列将消息投递给消费者之后,要求消费者做消息的确认。如果消息没有被确认,这个消息就会在队列中依然存在,下一次会再次投递给消费者,直到收到消息确认为止。

当下比较知名的消息引擎,包括:ActiveMQ、RabbitMQ、Kafka、RocketMQ、Artemis 等

这里使用Redis实现消息队列:

Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

4.8.3基于List结构模拟的消息队列

Redis的List数据结构是一个双向链表,很容易模拟出队列效果。队列是入口和出口不在一边,我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现。

不过要注意的是,当队列中没有消息时,RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

BRPOP key [key ...] timeout
summary: Remove and get the last element in a list, or block until one is available
since: 2.0.0
RPOP key
summary: Remove and get the last element in a list
since: 1.0.0

基于List的消息队列有哪些优缺点?

优点:

  1. 利用Redis存储,不受限于JVM内存上限
  2. 基于Redis的持久化机制,数据安全性有保证
  3. 可以满足消息有序性

缺点:

  1. 无法避免消息丢失
  2. 只支持单消费者

4.8.4基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或者多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel [channel]:订阅一个或者多个频道
  • PUBLISH channel msg:向一个频道发送消息
  • PSUBSCRIBE pattern [pattern]:订阅与pattern格式相匹配的所有频道

基于PubSub的消息队列有哪些优缺点?

优点:采用发布订阅模型,支持多生产、多消费

缺点:

  1. 不支持数据持久化
  2. 无法避免消息丢失
  3. 消息堆积有上限,超出时数据丢失

4.8.5基于Stream的消息队列

Stream是Redis5.0引入的一种新的数据类型,可以实现一个功能非常完善的消息队列。

(1)发送消息的命令:xadd

例如:

(2)读取消息的方式之一:xread

4.8.5.1Stream的单消费模式

XREAD阻塞方式,读取最新的消息:

在业务开发中,我们可以循环地调用XREAD阻塞方式来查询最新的消息,从而实现持续监听队列的效果,伪代码如下:

注意:当我们指定起始ID为$时,代表读取最新的消息。如果我们处理一条消息的过程中,又有超过一条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯

  • 一个消息可以被多个消费者读取

  • 可以阻塞读取

  • 有消息漏读的风险

4.8.5.2Stream的消费者组模式

消费者组(Consumer Goup):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

(1)创建消费者组:

XGROUP CREATE key groupName ID [MKSTREAM]
  • key 队列名称
  • groupName 消费者组名称
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
  • MKSTREAM:队列不存在时自动创建队列

其他常见命令:

# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupName consumerName

(2)从消费者组内读取消息:

127.0.0.1:6379> help xreadgroup
 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
 summary: Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.
 since: 5.0.0
 group: stream
  • group:消费组名称

  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者

  • count:本次查询的最大数量

  • BLOCK milliseconds:当没有消息时最长等待时间

  • NOACK:无需手动ACK,获取到消息后自动确认(不推荐)

  • STREAMS key:指定队列名称

  • ID:获取消息的起始ID:

    • ">":从下一个未消费的消息开始
    • 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

关于ID 为 ">" 和 0 的用途:

在消费者消费消息,但未确认消息时,消息仍存在pending-list中,直到消费者通过XACK来确认消息,标记消息已处理,才会从pending-list中移除。

一般情况下,我们使用“>”号的方式,去获取所有未消费的消息;消费者拿到之后去处理,然后确认。在确认的过程中,若出现异常,该消息会因为没有确认(XACK)依然在pending-list中,在Java代码中体现出来就是出现异常,没有进行XACK。那么我们catch到异常后,可以直接到pending-list中取出(使用ID为0的方式)消息,进行确认处理。

(3)消息确认命令

127.0.0.1:6379> help XACK
 XACK key group ID [ID ...]
 summary: Marks a pending message as correctly processed, effectively removing it from the pending entries list of the consumer group. Return value of the command is the number of messages successfully acknowledged, that is, the IDs we were actually able to resolve in the PEL.
 since: 5.0.0
 group: stream

(4)到pending-list中获取消息:

127.0.0.1:6379> help XPENDING
 XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
 summary: Return information and entries from a stream consumer group pending entries list, that are messages fetched but never acknowledged.
 since: 5.0.0
 group: stream
  • 消费者监听消息的基本思路:

    消费者在指定时间内阻塞尝试获取尚未消费的消息,如果在指定阻塞时间内没有获取到,则循环尝试;

    如果获取到了,则处理消息,并进行消息确认(ACK);如果在消息确认时发生了异常,那么消息因为没有被确认,会依然处于pending-list中等待确认,这时候我们捕获异常,直接到pending-list中确认消息,直到确认成功。

STREAM类型消息队列的XREADGROUP的命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

4.8.6Redis三种消息队列的总结

如果对消息队列的要求比较高,建议还是使用更加专业的消息队列,比如RabbitMQ,RocketMQ。

因为Stream虽然支持持久化,但是这种持久化依赖于Redis本身的持久化(Redis的持久化是有丢失风险的),并且Stream的确认机制只支持消费者的确认,不支持生产者的确认机制,如果是生产者在发消息的过程中丢失了,就会造成问题。此外,还有消息的事务机制、在多消费者下的消费有序性等等。

这些问题都需要更加专业的消息队列插件来保证。

4.8.7代码实现

基于Redis的Stream结构作为消息队列,实现异步秒杀下单:

需求:

  1. 创建一个Stream类型的消息队列,名为stream.orders
  2. 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.order中添加消息,内容包括VoucherId、userId、orderId
  3. 项目启动时,开启一个任务线程,尝试获取stream.orders中的消息,完成下单

(1)创建消息队列

127.0.0.1:6379> XGROUP CREATE stream.orders g1 0 MKSTREAM
OK

(2)修改之前的Lua脚本(seckill.lua)

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1判断库存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0) then
 -- 3.2库存不足,返回1
 return 1
end
-- 3.3库存充足,判断用户是否下过单(判断用户id是否在订单key对应的集合中)
-- sismember orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then
 -- 3.4 若存在,说明是重复下单,返回2
 return 2
end
-- 3.5 扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.6 下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.7 发送消息到队列当中 xadd stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

(3)修改VoucherOrderServiceImpl.java

(3.1)修改seckillVoucher方法将判断购买资格和发送消息到队列中两件事情合并到脚本完成,java代码中只需要获取脚本执行的结果,进行判断返回结果即可

@Override
public Result seckillVoucher(Long voucherId) {
 //获取用户id
 Long userId = UserHolder.getUser().getId();
 //获取订单id
 long orderId = redisIdWorker.nextId("order");
 //1.执行lua脚本-判断购买资格,发送信息到stream.order消息队列
 Long result = stringRedisTemplate.execute(
 SECKILL_SCRIPT,
 Collections.emptyList(),
 voucherId.toString(),
 userId.toString(),
 String.valueOf(orderId)
 );
 //2.判断结果是否为0
 int r = result.intValue();
 if (r != 0) {
 //不为0,代表没有购买资格
 return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
 }
 //3.获取代理对象
 proxy = (IVoucherOrderService) AopContext.currentProxy();
 //4.向客户返回订单id
 return Result.ok(orderId);
}

(3.2)开启一个任务线程,尝试获取stream.orders中的消息,完成下单:主要实现对消息队列的读取和确认,以及出现异常时的处理handlePendingList()

整体的逻辑:在run()方法中不断地从消息队列中读取消息,如果获取成功,就创建订单,然后进行ACK消息确认;如果在过程中出现异常,那么就会到handlePendingList()方法中,到pending-list中取出未成功的消息,进行业务操作,然后ACK确认,直到pending-list中没有消息,才返回消息队列中处理消息,以此类推。

//获取消息队列中的消息
private class VoucherOrderHandler implements Runnable {
 String queueName = "stream.orders";
 @Override
 public void run() {
 while (true) {
 try {
 //1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
 Consumer.from("g1", "c1"),
 StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
 StreamOffset.create(queueName, ReadOffset.lastConsumed())
 );
 //2.判断消息获取是否成功
 if (list == null || list.isEmpty()) {
 //获取失败,说明没有消息,继续下一次循环
 continue;
 }
 //3.如果成功,将信息转为对象
 MapRecord<String, Object, Object> record = list.get(0);
 Map<Object, Object> values = record.getValue();
 VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
 //4.创建订单
 handleVoucherOrder(voucherOrder);
 //5.消息的ACK确认 SACK stream.orders g1 消息id
 stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
 } catch (Exception e) {
 log.error("处理订单异常", e);
 handlePendingList();
 }
 }
 }
 private void handlePendingList() {
 while (true) {
 try {
 //1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0
 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
 Consumer.from("g1", "c1"),
 StreamReadOptions.empty().count(1),
 StreamOffset.create(queueName, ReadOffset.from("0"))
 );
 //2.判断消息获取是否成功
 if (list == null || list.isEmpty()) {
 //获取失败,说明pending-list中没有消息,退出循坏
 break;
 }
 //3.如果成功,将信息转为对象
 MapRecord<String, Object, Object> record = list.get(0);
 Map<Object, Object> values = record.getValue();
 VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
 //4.创建订单
 handleVoucherOrder(voucherOrder);
 //5.消息的ACK确认 SACK stream.orders g1 消息id
 stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
 } catch (Exception e) {
 log.error("处理pending-list异常", e);
 try {
 Thread.sleep(20);//休眠20ms
 } catch (InterruptedException ex) {
 ex.printStackTrace();
 }
 }
 }
 }
}

(4)测试

启动项目,数据库初始数据:

Redis初始数据:

(4.1)使用postman测试是否运行成功:

数据库&Redis:

同一个用户再次去下单,显示不能重复下单:

(4.2)使用jemeter进行秒杀业务的多人抢购测试:

1000个用户同时进行秒杀抢购:

测试结果:请求平均响应为110ms,比之前的平均响应216ms减少了一倍时间。

数据库也没有出现超卖问题。

4.9秒杀业务总结

Redis在秒杀场景下的应用:

  • 缓存
  • 分布式锁
  • 超卖问题
  • Lua脚本
  • Redis消息队列
作者:一刀一个小西瓜原文地址:https://www.cnblogs.com/liyuelian/p/17363231.html

%s 个评论

要回复文章请先登录注册