📚笔记整理自黑马程序员Redis入门到实战教程
共享session登录问题
集群的session共享问题
session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。
session的替代方案应该满足:
• 数据共享
• 内存存储
• key、value结构
使用Redis实现共享session登录
- 当我们发送验证码时,以手机号为key,存储验证码(String)
- 登录验证通过后,以随机token为key,存储用户数据(Hash)
发送验证码
public Result sedCode(String phone, HttpSession session) {
//1. 校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
//2.如果不符合,返回错误信息
return Result.fail("手机号格式错误");
}
//3. 符合,生成验证码
String code = RandomUtil.randomNumbers(6);
//4. 保存验证码到session
// session.setAttribute("code",code);
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
//5. 发送验证码
log.debug("发送短信验证码成功,验证码:{}",code);
//返回ok
return Result.ok();
}
登录验证
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1. 校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误");
}
//2. 校验验证码
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY+phone);
// Object cacheCode = session.getAttribute("code");
String code = loginForm.getCode();
if (cacheCode == null || !cacheCode.equals(code)){
//3. 不一致,报错
return Result.fail("验证码错误");
}
//4.一致,根据手机号查询用户
User user = query().eq("phone", phone).one();
//5. 判断用户是否存在
if (user == null){
//6. 不存在,创建新用户
user = createUserWithPhone(phone);
}
String token = UUID.randomUUID().toString();
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
String tokenkey=LOGIN_USER_KEY+token;
stringRedisTemplate.opsForHash().putAll(tokenkey,userMap);
stringRedisTemplate.expire(tokenkey,LOGIN_USER_TTL,TimeUnit.MINUTES);
return Result.ok(token);
}
登录拦截器
每次发送请求时,需要进行身份校验,并且还需要刷新token有效期,这些步骤适合放在登录拦截器中处理
流程分析:
-
获取token
-
查询Redis的用户
- 不存在,拦截
- 存在,继续
-
保存到ThreadLocal
-
刷新token有效期
-
放行
具体代码:
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
return true;
}
// 2.基于TOKEN获取redis中的用户
String key = LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
// 3.判断用户是否存在
if (userMap.isEmpty()) {
return true;
}
// 5.将查询到的hash数据转为UserDTO
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 6.存在,保存用户信息到 ThreadLocal
UserHolder.saveUser(userDTO);
// 7.刷新token有效期
stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 8.放行
return true;
}
注意这里之所以要保存信息到ThreadLocal中,是为了后面在service中方便拿出使用,因为存在线程不安全问题,所以使用ThreadLocal保存
商户查询缓存
店铺类型在很多地方都用到,为了提高查询效率,添加查询缓存,但与此同时,因为更新缓存和更新数据库的操作不是原子性的,可能会导致缓存和数据不一致问题
业务场景:
-
低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存 (设置过期时间)
-
高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存(使用更新策略)
缓存更新策略
主动更新策略
-
Cache Aside Pattern
由缓存的调用者,在更新数据库的同时更新缓存
-
Read/Write Through Pattern
缓存与数据库整合为一个服务,由服务来维护一致性。
调用者调用该服务,无需关心缓存一致性问题 -
Write Behind Caching Pattern
调用者只操作缓存,由其它线程异步的将缓存数据持久化到数据库,保证最终一致。
综合各种考虑,我们这里的更新策略选择第一种,可以更好的与当前业务结合。
更新顺序不一致导致的问题
根据删除缓存和更新数据库的顺序不同会有不同的问题:
-
先删除缓存,后更新数据库:
会导致缓存中存放的是旧值
-
先更新数据库,后删除缓存
更新数据库后,未删除缓存时,存在短暂数据不一致情况
先更新数据库值,后删除缓存的方案在等待缓存删除完成期间会有短暂的不一致数据存在。但对于商铺详情信息来说,可以接受。
@Transactional
public Result update(Shop shop) {
Long id = shop.getId();
if(id == null){
return Result.fail("店铺id不能为空");
}
updateById(shop);
stringRedisTemplate.delete(CACHE_SHOP_KEY+id);
return Result.ok();
}
缓存穿透问题
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案有两种:
- 缓存空对象
◆ 优点:实现简单,维护方便
◆ 缺点:
• 额外的内存消耗
• 可能造成短期的不一致
-
布隆过滤
◆ 优点:内存占用较少,没有多余key
◆ 缺点:
• 实现复杂
• 存在误判可能
这里我们选择缓存空对象来解决
缓存空对象
流程分析:
代码实现:
private Shop queryWithPassThrough(Long id){
String key = CACHE_SHOP_KEY+id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
//不为null和“”
if(StrUtil.isNotBlank(shopJson)){
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
if (shopJson!=null && shopJson.length()==0){
//命中的是空值“”
return null;
}
Shop shop = getById(id);
if(shop==null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL,TimeUnit.MINUTES);
return shop;
}
缓存雪崩
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
◆ 给不同的Key的TTL添加随机值
◆ 利用Redis集群提高服务的可用性
◆ 给缓存业务添加降级限流策略
◆ 给业务添加多级缓存
缓存击穿
概念
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问 会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种:
- 互斥锁
- 逻辑过期
互斥锁
如果热点key失效,只运行一个线程去更新缓存,其他线程等更新好后再来获取
流程分析:
代码实现:
// 获取锁
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
public <R, ID> R queryWithMutex(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3.存在,直接返回
return JSONUtil.toBean(shopJson, type);
}
// 判断命中的是否是空值
if (shopJson != null) {
// 返回一个错误信息
return null;
}
// 4.实现缓存重建
// 4.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
R r = null;
try {
boolean isLock = tryLock(lockKey);
// 4.2.判断是否获取成功
if (!isLock) {
// 4.3.获取锁失败,休眠并重试
Thread.sleep(50);
return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
}
// 4.4.获取锁成功,根据id查询数据库
r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
}
// 6.存在,写入redis
this.set(key, r, time, unit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 7.释放锁
unlock(lockKey);
}
// 8.返回
return r;
}
逻辑锁过期
没有设置过期时间,但会为每个key设置一个逻辑过期时间,当发现超过逻辑过期时间后,会使用单独线程去构建缓存。
流程分析:
代码实现:
public <R, ID> R queryWithLogicalExpire(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(json)) {
//热点key会进行预热,即会提前传入缓存中,
// 如果缓存中都没有,则表明这个数据也不在数据库,没有必要继续查询数据库了
// 3.不存在,直接返回
return null;
}
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())) {
// 5.1.未过期,直接返回店铺信息
return r;
}
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock){
// 6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 查询数据库
R newR = dbFallback.apply(id);
// 重建缓存
this.setWithLogicalExpire(key, newR, time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
// 释放锁
unlock(lockKey);
}
});
}
// 6.4.返回过期的商铺信息
return r;
}
总结
优惠劵秒杀
全局唯一ID
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题 :
- id的规律性太明显
- 受单表数据量的限制,分布式场景下无法使用
我们可以借助redis的incr指令来做自增
全局ID生成器
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
-
符号位:1bit,永远为0
-
时间戳:31bit,以秒为单位,可以使用69年
-
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID(使用redis实现)
@Component
public class RedisIdWorker {
/**
* 开始时间戳
*/
private static final long BEGIN_TIMESTAMP = 1640995200L;
/**
* 序列号的位数
*/
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix) {
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 2.生成序列号
// 2.1.获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2.自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3.拼接并返回
return timestamp << COUNT_BITS | count;
}
}
这里的redis的key是
"icr:" + keyPrefix + ":" + date
,这样就可以每天一个key,方便统计订单量
实现优惠劵秒杀下单
下单时需要判断两点:
• 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
• 库存是否充足,不足则无法下单
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
//创建订单
return createVoucherOrder(voucherId);
}
超卖问题
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁
悲观锁
认为线程安全问题一定会发生,因此 在操作数据之前先获取锁,确保线程 串行执行。这里使用synchronized来加锁
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderId);
}
}
优点:简单粗暴
缺点:性能一般
乐观锁
认为线程安全问题不一定会发生,因 此不加锁,只是在更新数据时去判断 有没有其它线程对数据做了修改。
◆ 如果没有修改则认为是安全的,自 己才更新数据。
◆ 如果已经被其它线程修改说明发生 了安全问题,此时可以重试或异常。
-
版本号法
需要增加一个版本号字段,如果修改了的话就增加1
-
CAS法
扣减前获取库存量,调用sql扣减时判断此时库存是否等于前面查到的库存,如果没有变化就是没有人修改过
@Transactional public Result createVoucherOrder(Long voucherId) { // 5.一人一单 Long userId = UserHolder.getUser().getId(); // 创建锁对象 RLock redisLock = redissonClient.getLock("lock:order:" + userId); // 尝试获取锁 boolean isLock = redisLock.tryLock(); // 判断 if(!isLock){ // 获取锁失败,直接返回失败或者重试 return Result.fail("不允许重复下单!"); } try { // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 return Result.fail("用户已经购买过一次!"); } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 .update(); if (!success) { // 扣减失败 return Result.fail("库存不足!"); } // 7.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 7.1.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 7.2.用户id voucherOrder.setUserId(userId); // 7.3.代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); // 7.返回订单id return Result.ok(orderId); } finally { // 释放锁 redisLock.unlock(); } }
性能好,但适合读多写少的情况,这种秒杀的情况,读跟写差不多的话使用CAS就回导致成功率低
mysql的排他锁
只需要在更新时判断此时库存是否大于0
update goods set stock = stock - 1 WHERE id = 1001 and stock > 0
使用update时会加排他锁,这一行不能被任何其他线程修改和读写
排他锁又称为写锁,简称X锁,顾名思义,排他锁就是不能与其他所并存,如一个事务获取了一个数据行的排他锁,其他事务就不能再获取该行的其他锁,包括共享锁和排他锁,但是获取排他锁的事务是可以对数据就行读取和修改。
一人一单
描述
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
流程分析:
存在的并发问题:
并发情况下,他们同时查询到订单中没有他们的,于是他们都认为自己是第一次购买,导致一人多单情况
单机情况下
可以使用锁来保证并发安全
具体代码实现:
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}
// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回订单id
return Result.ok(orderId);
}
}
这里锁住的是
userId.toString().intern()
,通过intern()可以将字符串放入常量池中,对于一个JVM实例中,常量池中的常量唯一,这样多个线程就可以基于这个唯一常量加锁。
分布式下
分布式锁是由共享存储系统维护的变量,多个客户端可以向共享存储系统发送命令进行加 锁或释放锁操作。Redis 作为一个共享存储系统,可以用来实现分布式锁。
基于Redis实现分布式锁
对于加锁操作,我们需要满足三个条件
- 加锁包括了读取锁变量、检查锁变量值和设置锁变量值三个操作,但需要以原子操作的 方式完成,所以,我们使用 SET 命令带上 NX 选项来实现加锁;
- 锁变量需要设置过期时间,以免客户端拿到锁后发生异常,导致锁一直无法释放,所 以,我们在 SET 命令执行时加上 EX/PX 选项,设置其过期时间;
- 锁变量的值需要能区分来自不同客户端的加锁操作,以免在释放锁时,出现误释放操 作,所以,我们使用 SET 命令设置锁变量值时,每个客户端设置的值是一个唯一值,用 于标识客户端。
对于释放锁操作,需要注意
释放锁包含了读取锁变量值、判断锁变量值和删除锁变量三个操作,我们无法使用单个命令来实现,所以,我们可以采用 Lua 脚本执行释放锁操作,通过 Redis 原子性地执行 Lua 脚本,来保证释放锁操作的原子性。
加锁操作,需要加入线程标识
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
释放锁,调用lua脚本
-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del', KEYS[1])
end
return 0
分布式锁-Redission
基于setnx实现的分布式锁存在下面的问题
- 不可重入 : 同一个线程无法多次获取同一把
- 不可重试 : 获取锁只尝试一次就返回 false,没有重试机制
- 超时释放 : 锁超时释放虽然可以避免 死锁,但如果是业务执行 耗时较长,也会导致锁释 放,存在安全隐患
- 主从一致性 : 如果Redis提供了主从集群, 主从同步存在延迟,当主 宕机时,如果从并同步主 中的锁数据,则会出现锁 实现
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布 式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。使用Redission可以解决上面提到的4个问题。
Redis优化秒杀
我们之前的操作都是基于数据库的,但是操作数据库的性能是比较慢的,我们可以将判断秒杀库存跟校验一人一单的操作放到Redis缓存中,然后再开启另外一个线程去处理数据库相关的步骤。
优化思路:
① 先利用Redis完成库存余量、一人一单判断,完成抢单业务
② 再将下单业务放入阻塞队列,利用独立线程异步下单
步骤分析:
① 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
② 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
③ 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
④ 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
代码实现:
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 2.2.为0 ,有购买资格,把下单信息保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
// 2.3.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 2.4.用户id
voucherOrder.setUserId(userId);
// 2.5.代金券id
voucherOrder.setVoucherId(voucherId);
// 2.6.放入阻塞队列
orderTasks.add(voucherOrder);
// 3.返回订单id
return Result.ok(orderId);
}
//处理阻塞线程中的订单,保存到数据库中
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while (true){
try {
// 1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 2.创建订单
createVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
这种基于java的阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题
- 数据安全问题
Redis消息队列实现异步秒杀
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
Redis提供了三种不同的方式来实现消息队列:
◆ list结构:基于List结构模拟消息队列
◆ PubSub:基本的点对点消息模型
◆ Stream:比较完善的消息队列模型
基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟 出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息 。
因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
基于List的消息队列有哪些优缺点?
优点:
• 利用Redis存储,不受限于JVM内存上限
• 基于Redis的持久化机制,数据安全性有保证
• 可以满足消息有序性
缺点:
• 无法避免消息丢失
• 只支持单消费者
基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者 向对应channel发送消息后,所有订阅者都能收到相关消息。
◼ SUBSCRIBE channel [channel] :订阅一个或多个频道
◼ PUBLISH channel msg :向一个频道发送消息
◼ PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
基于PubSub的消息队列有哪些优缺点?
优点:
• 采用发布订阅模型,支持多生产、多消费
缺点:
• 不支持数据持久化
• 无法避免消息丢失
• 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。 发送消息的命令
例如:
1. 基于Stream的消息队列-XREAD
读取消息的方式之一:XREAD
例如,使用XREAD读取第一个消息:
XREAD阻塞方式,读取最新的消息:
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a5PVuDHs-1652915735655)(C:/Users/wcl/AppData/Roaming/Typora/typora-user-images/image-20220518210952049.png)]
当我们指定起始ID为$时,代表读取最新的消息,如果我们处 理一条消息的过程中,又有超过1条以上的消息到达队列,则 下次获取时也只能获取到最新的一条,会出现漏读消息的问题
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
2. 基于Stream的消息队列-消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
(1)消息分流
队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
(2)消息标示
消费者组会维护一个标示, 记录最后一个被处理的消息, 哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。
(3)消息确认
消费者获取消息后,消息处于 pending状态,并存入一个 pending-list。当处理完成后 需要通过XACK来确认消息,标记 消息为已处理,才会从pendinglist移除
具体命令
创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
- MKSTREAM:队列不存在时自动创建队列
其它常见命令:
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS
key [key ...] ID [ID ...
-
group:消费组名称
-
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
-
count:本次查询的最大数量
-
BLOCK milliseconds:当没有消息时最长等待时间
-
NOACK:无需手动ACK,获取到消息后自动确认
-
STREAMS key:指定队列名称
-
ID:获取消息的起始ID:
-
“>”:从下一个未消费的消息开始
-
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一 个消息开始
消费者监听消息的基本思路
STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
总结
因此我们这里选择基于Redis的Stream机构作为消息队列,实现异步秒杀下单
思路分析:
① 创建一个Stream类型的消息队列,名为stream.orders
② 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包 含voucherId、userId、orderId
③ 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
具体实现:
- 创建一个Stream类型的消息队列
-
修改lua脚本
-- 1.参数列表 -- 1.1.优惠券id local voucherId = ARGV[1] -- 1.2.用户id local userId = ARGV[2] -- 1.3.订单id local orderId = ARGV[3] -- 2.数据key -- 2.1.库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2.订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1.判断库存是否充足 get stockKey if(tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.库存不足,返回1 return 1 end -- 3.2.判断用户是否下单 SISMEMBER orderKey userId if(redis.call('sismember', orderKey, userId) == 1) then -- 3.3.存在,说明是重复下单,返回2 return 2 end -- 3.4.扣库存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5.下单(保存用户)sadd orderKey userId redis.call('sadd', orderKey, userId) -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ... redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0
-
执行lua脚本
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.返回订单id
return Result.ok(orderId);
}
- 从redis的消息队列取出订单,同步到数据库中
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
评论区