主要根据黑马的教学目标进行学习,优惠券秒杀,分布式锁以及超卖问题等的具体实践。
具体包括:
全局唯一 ID 的实现
为什么要使用全局唯一 ID,直接用数据库自增 ID 不好吗?
这样会有一个问题,id 自增,那用户可以根据 id 猜测到一定规律,不够安全,被发现销量很少怎么办(doge);而且数据库表的最大存储量是有限的,用户量一多就会受限制,多开表的话由于 id 会自增,可能会造成 id 重复,也就是说某些订单的 id 可能相同,这合理吗?不合理!
如何实现全局唯一 ID?
全局 ID 生成器
要满足的特性:
- 唯一性,这个不必多说
- 高性能,如果生成一个 id 需要很久,那要你何用
- 高可用,这个系统不会挂掉
- 递增性,必须有一个递增趋势
- 安全性,不能有明显规律
使用 redis 的 incre 能满足上述特性
可以定义一个全局 ID 生成器; 下面是实现过程:
ID 的组成
Idea 中实现生成过程,带注释
package com.hmdp.utils; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.format.annotation.DateTimeFormat; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; @Component public class RedisIdWorker { @Resource private StringRedisTemplate stringRedisTemplate; private final static long BEGIN_TIME = 1672531200L; //初始时间,用于生成时间戳 private final static int COUNT_BITS = 32; //序列号的位数,可更改 /** * 生成id,前缀由于区分不同业务 * @param keyPrefix * @return */ public long nextId(String keyPrefix){ //1.生成时间戳 : 当前秒数 - 初始时间秒数 LocalDateTime now = LocalDateTime.now(); long nowTime = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowTime - BEGIN_TIME; //2.生成序列号 使用redis生成自增长序列号(每一天都对应一个不同的icrKey) //2.1 获取当前的日期,精确到天 String day = now.format(DateTimeFormatter.ofPattern("yyyyMMdd")); //2.2 redis生成自增长key的值(序列号) long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + day); //3.拼接并返回,注意这里不是字符串拼接,而是位运算得到32位 long 类型的值 return timestamp << COUNT_BITS | count ; //使用或运算,使空出的32位填充成序列号 } public static void main(String[] args) { //生成一个初始时间 LocalDateTime time = LocalDateTime.of(2023, 1, 1, 0, 0, 0); long second = time.toEpochSecond(ZoneOffset.UTC); System.out.println(second); } }
测试生成 30000 个 id
@Resource private RedisIdWorker redisIdWorker; private ExecutorService es = Executors.newFixedThreadPool(500); //线程池 @Test void testIdGenerator() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(300); //阻塞当前线程 Runnable task = ()->{ for(int i = 0; i < 100; i++){ long id = redisIdWorker.nextId("shop"); System.out.println("id = " + id); } countDownLatch.countDown(); //完成一次任务,计数器-1 }; //执行三百次任务,共30000个id long begin = System.currentTimeMillis(); for(int i = 0; i < 300;i++){ es.submit(task); } countDownLatch.await(); long end = System.currentTimeMillis(); System.out.println("共耗时:"+ (end - begin)); }
成功,但是 CPU 爆了
2134 ms
实现优惠券秒杀下单功能
- 首先要知道优惠券实体类的结构:
Voucher 实体内的属性包括特价优惠券的所有属性
特价优惠券的表用优惠券 id 与优惠券表进行关联- 业务流程
根据流程图,进行业务初步实现 (未使用 redis)
package com.hmdp.service.impl;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.User;
import com.hmdp.entity.Voucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private RedisIdWorker redisIdWorker; //生成id
@Resource
private ISeckillVoucherService secKillVoucherService;
/**
* 秒杀下单优惠券
*
* @param voucherId
* @return
*/
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher seckillVoucher = secKillVoucherService.getById(voucherId);
//2.判断是否在秒杀时间段内
if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){
//2.1未在秒杀时间内,返回错误
return Result.fail("秒杀尚未开始!");
}
if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀结束!");
}
//2.2 在秒杀时间内
//3.判断库存是否充足
Integer stock = seckillVoucher.getStock();
if(stock < 1){
return Result.fail("库存不足");
}
//4.充足,扣减库存
boolean success = secKillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
if(!success) return Result.fail("库存不足");
//5.进行订单的插入
/**
* 1.订单id,使用生成器实现
* 2.用户id
* 3.优惠券id
*/
//封装订单,其他属性有默认值,暂时不管
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("voucher_order:");
Long userId = UserHolder.getUser().getId();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
}
超卖问题
即一瞬间大量请求进来请求该接口并查询数据库,可能会导致库存数量为负数的情况,即超卖;
为什么会出现超卖
主要因为多线程时,线程 1 进行查询,发现有库存,正要去更新数据库,线程 2 可能会在线程 1 更新库存之前查询数据库导致超卖。
解决方法
可以通过加锁实现,这里引出了两种锁的概念 [[悲观锁和乐观锁]]
具体概念可跳转到具体文件
使用乐观锁的CAS方法进行实现
CAS方法,不加锁,只通过在查询库存后,更新数据库库存时,加一个判断条件,==当前库存等于查询时的库存== 可以避免超卖问题。
这种方法,会导致原本能卖出去的券卖不出去,即”少卖”的情况;
为什么呢,如果很多线程一起来,那就会出现即使库存充足,但是由于数据不同步(查询和更新时的stock)导致失败。
乐观锁的改进-提高成功率
上述判断条件会导致成功率大幅下降,所以可以放宽要求,只要==更新时的库存大于0就能进行更新==。
一人一单
即一人只能买一张优惠券
方法 :
在库存充足时,判断数据库中是否已经存在当前用户 user_id 且 voucher 都相等的订单,如果存在则返回异常,否则可以继续下单。
出现多线程安全的问题
如果只是上述判断,会出现很多请求同时请求到订单数为 0 的情况,这时由于都可以进行订单插入,就会导致一人多单。
一人多单的悲观锁解决方案
不用乐观锁是因为,没有更新操作,而是插入操作,不能通过判断某值进行锁。
%%不得不说,下面加锁和事务的过程实在太复杂,现在听不懂!🤣%%
将下单和库存判断封装起来进行加锁(针对当前用户的锁)
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher seckillVoucher = secKillVoucherService.getById(voucherId);
//2.判断是否在秒杀时间段内
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
//2.1未在秒杀时间内,返回错误
return Result.fail("秒杀尚未开始!");
}
if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀结束!");
}
//2.2 在秒杀时间内
//3.判断库存是否充足
Integer stock = seckillVoucher.getStock();
if (stock < 1) {
return Result.fail("库存不足");
}
//以用户id‘作为锁的关键字,防止每个用户都要加锁,降低效率
// intern是在字符串常量池里找相同的值,否则每次toString都会新建对象,导致不等
synchronized (UserHolder.getUser().getId().toString().intern()) {
//获取与事务有关的对象
IVoucherService proxy = (IVoucherService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
//一人一单
Long userId = UserHolder.getUser().getId();
Integer orders = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (orders > 0) return Result.fail("用户已经购买过该优惠券!");
//4.充足,扣减库存
boolean success = secKillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
if (!success) return Result.fail("库存不足");
//5.进行订单的插入
/**
* 1.订单id,使用生成器实现
* 2.用户id
* 3.优惠券id
*/
//封装订单,其他属性有默认值,暂时不管
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("voucher_order:");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
这种一人一单的解决方法在集群模式下还是不能正常实现,因为多个 Tomcat 服务器分别有一个 JVM 存储不同的字符串常量池,导致锁监控器彼此之间无法进行同步,也就无法实现锁住同一个 userId ; 所以需要用到下一部分的知识: ==分布式锁==
分布式锁
满足分布式系统或集群模式下多”==进程==”之间==可见并且互斥==的锁
可见性是很好实现的,如 redis,mysql,zookeeper 都可以
多个服务器之间执行的不同应用就是不同的进程特性:
- 高可用
- 高性能
- 安全性
使用 redis 实现分布式锁
根据 redis 互斥锁的特性, 获取锁需要具备原子性
下面是代码实现
ILock 接口
package com.hmdp.utils;
public interface ILock {
/**
* 自己实现redis锁的接口
* timeOutSec是过期时间
* 注意:这里要实现的是非阻塞式的锁,否则会影响性能
*/
boolean tryLock(long timeOutSec);
//释放锁
void unlock();
}
实现类 SimpleRedisLock
实现锁时,需要区分每个业务之间的区别,需要加 name 作为区分标识。==在调用该锁的对象使用 tryLock 时可以特别写一下业务名称==。如 redisLock. TryLock (“order”+userId, 5);
这里加上 userId 就是为了保证”一人一单”。多进程之间也不会额外创建新的订单。
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String name; //需要加锁的业务名称
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(long timeOutSec) {
//当前线程ID
long threadId = Thread.currentThread().getId();
//1.获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + " ", timeOutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
//释放锁
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
在下单代码中应用 redis 锁
public Result seckillVoucher(Long voucherId) {
-------省略内容-----
Long userId = UserHolder.getUser().getId();
SimpleRedisLock redisLock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
boolean locked = redisLock.tryLock(5);
if (!locked) {
//获取锁失败
return Result.fail("一个人只允许下一单");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} catch (IllegalStateException e) {
throw new RuntimeException(e);
}finally {
//结束后释放锁
redisLock.unlock();
}
/*
//以用户id‘作为锁的关键字,防止每个用户都要加锁,降低效率
// intern是在字符串常量池里找相同的值,否则每次toString都会新建对象,导致不等
synchronized (UserHolder.getUser().getId().toString().intern()) {
//获取与事务有关的对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
* */
}
改进 redis 分布式锁
当某个获取到锁的线程阻塞时,一旦过期时间达到,锁过期被删,其他线程就会趁虚而入获取锁,发现可以获取到锁从而导致又一次串行的情况。
解决方法
使用 (uuid + 线程 id) 是因为如果按照普通的线程 id,很有可能造成不同服务器之间的线程 id 冲突。
代码实现:修改锁的实现类中的方法
----省略-----
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; //作为线程id前缀
@Override
public boolean tryLock(long timeOutSec) {
//重新拼接线程ID
String threadId = ID_PREFIX + Thread.currentThread().getId();
//1.获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId , timeOutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
//释放锁
@Override
public void unlock() {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取 redis 锁的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
//判断是否一致,如果不一致则说明当前线程无权进行该锁的删除,一致则可以删
if(threadId.equals(id)) stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
你以为这就完了嘛?
并么有,由于判断和删除是两个动作,即使两者之间没有其他代码,但是由于 ==JVM GC 垃圾回收时会堵塞线程==,如果时间过长,锁由于过期时间达到而被删除,其他线程就会获取锁,一旦 GC 结束,该线程的锁就会被删除(误删)(key 一样都是 prefix + 业务名称 name)。
Redis Lua 脚本实现原子性
基于 lua 语言特性和 redis 自带的脚本命令 script ,调用 redis 相关的方法进行判断可以满足原子性; 这里只贴图,相关实现不作说明。如果遇到该问题时可以回来看看。
改进 redis 的分布式锁
使用 Redisson 实现分布式锁
基于 redis 的 setnx 实现的分布式锁有什么问题
在==高要求的情况==下,有如下四个问题:
基于 setnx 实现已经能够满足大部分的需求,如果需要的话可以使用 Redisson 实现。
Redisson 实现
Redisson 是一个基于 redis 基础上实现的,能提供一系列分布式服务包括(分布式锁,分布式对象等)的实现。
官方文档的翻译:redisson使用全解——redisson官方文档+注释(上篇)_redisson官网中文-CSDN博客
下面开始具体实现 redisson 的测试:
- 写配置类,将 redissonClent 加入 bean
package com.hmdp.config; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient(){ //1. 获取redisson的配置 Config config = new Config(); //2.配置redis 地址和密码 config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456"); //返回redissonClient对象 return Redisson.create(config); } }
- 将之前实现的 simpleLock 方法换成 redisson
tryLock () 这里可以查看参数, 分别为
- 无参: 默认获取锁失败直接返回
- 三参: 分别为获取锁失败重试时间,锁自动过期时间,时间单位
- 两参: 自动过期时间和时间单位
经过测试发现 redisson 确实可以实现安全性要求和可重入性,且更简单
额外说明一下可重入性:同一个线程内的方法可以获取一个锁两次(多次)
Redisson 实现可重入锁的原理
先上图:
大概就是,不使用 redis 的 setnx 互斥锁,而是自定义一个 hashMap 结构的值,分别==记录线程名和获取锁的次数==, 同一个线程内的方法尝试获取锁会允许获取锁,此线程的获取锁次数+1,反之,执行完业务后释放锁,判断如果是当前线程的锁,这个次数就-1,只有当次数回到 0 时,才会将该锁删除,不为 0 就重置锁的有效期,为同线程的其他业务留够时间。
跟踪源码发现确实是使用 Lua 脚本 + redis script 实现的可重入锁
后一部分是讲解锁重试机制的源码和看门狗机制,先润了,以后看
WatchDog: Redisson 内部提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是 30 秒钟,也可以通过修改 Config. LockWatchdogTimeout 来另行指定。
实战篇-20.分布式锁-Redisson的锁重试和WatchDog机制_哔哩哔哩_bilibili
实战篇-21.分布式锁-Redisson的multiLock原理_哔哩哔哩_bilibili
Redis 优化秒杀
Tip: 终于回来了>﹏<
原本的下订单,除了锁之外有四个部分是直接请求数据库的,因此并发效率不高。
优化
将判断库存和校验一人一单的部分分离出来给 redis 做,并通过异步线程完成数据库的其他调用。
如何实现 ?
- 判断库存
先将优惠券的库存缓存到 redis 中,使用优惠券 id 作为 key 和库存数量作为 value,用户下单则库存 - 1; - 实现一人一单
使用 key-set 集合 redis 数据结构,即可满足不重复性,且一张优惠券可被多个用户购买。
基本流程:
为保证多个操作时的原子性,使用 lua 脚本实现
业务实现
- 将秒杀优惠券信息缓存到 redis 中
- 编写 lua 脚本实现判断库存和一人一单
根据流程图来,lua 脚本的语法熟悉就好,主要还是 redis 操作的命令。
- 修改优惠券下单的接口:seckillOrderImpl 里的下单 seckillVoucher ()
3.1 redis 脚本 script 执行配置
3.2 剩余代码实现
执行判断库存和一人一单并将下单信息加入到堵塞队列
异步下单,开启独立线程进行对应订单的数据库插入
不好意思,看不懂 (), 先贴着
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
//redis script
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
//创建堵塞队列 //空的时候会堵塞请求的线程,有元素则会接收线程请求消息
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
//线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
private final String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try {
// 0.初始化stream
initStream();
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
handleVoucherOrder(voucherOrder);
// 4.确认消息 XACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}
public void initStream(){
Boolean exists = stringRedisTemplate.hasKey(queueName);
if (BooleanUtil.isFalse(exists)) {
log.info("stream不存在,开始创建stream");
// 不存在,需要创建
stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1");
log.info("stream和group创建完毕");
return;
}
// stream存在,判断group是否存在
StreamInfo.XInfoGroups groups = stringRedisTemplate.opsForStream().groups(queueName);
if(groups.isEmpty()){
log.info("group不存在,开始创建group");
// group不存在,创建group
stringRedisTemplate.opsForStream().createGroup(queueName, ReadOffset.latest(), "g1");
log.info("group创建完毕");
}
}
private void handlePendingList() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
handleVoucherOrder(voucherOrder);
// 4.确认消息 XACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getId();
// 创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 获取锁
boolean isLock = lock.tryLock();
// 判断是否获取锁成功
if(!isLock){
// 获取锁失败,返回错误或重试
log.error("不允许重复下单");
return;
}
try {
// 获取代理对象(事务)
createVoucherOrder(voucherOrder);
} finally {
// 释放锁
lock.unlock();
}
}
IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 4.返回订单id
return Result.ok(orderId);
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 5.一人一单
Long userId = voucherOrder.getUserId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
log.error("用户已经购买过一次!");
return;
}
// 6.扣减库存
boolean success = secKillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
log.error("库存不足!");
return;
}
// 7.创建订单
save(voucherOrder);
}
局外话
这里的秒杀如果只用堵塞队列实现下单还会有问题,大概理解上面这些秒杀的实现方式就好,老师最后讲的那部分真的复杂;
下面就看看用 redis 消息队列实现好不好理解吧。
Redis 消息队列实现异步秒杀
P 72 - P 77 redis消息队列
这里的堵塞队列或者消息队列实现很复杂,只能暂时理解过程,想实现和完全搞懂还是有些距离,期待以后什么时候能补掉吧,唉😥
暂时停更优惠券专题:date: 2023/10/4 20:11