redis雪崩

雪崩是指在redis缓存中,大量key同时失效,此时会对后台数据库造成巨大压力

解决方法

对key的过期时间进行调整,防止大量key同时失效,最简单的方法对key的过期时间统一成 固定时间+random

redis击穿

缓存击穿是指一个非常 “热” 的 key(通常是被频繁访问的数据)在某个时间点过期,此时若有大量并发请求过来,这些请求发现缓存中没有数据,就会同时去数据库查询该数据,对数据库造成巨大的压力。

解决方法

加互斥锁

在高并发下,只有获取锁成功的线程才会去查询数据库,并更新缓存,其他线程则等待一段时间。

自定义锁

利用redis中的nx命令自定义实现锁

1
2
3
4
5
6
7
public Boolean trylock(String key){
Boolean falg = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.MINUTES);
return BooleanUtil.isTrue(falg);
}
public void unlock(String key){
stringRedisTemplate.delete(key);
}

击穿

当一个线程执行到这里时,会先从缓存中查询,则会先进入StrUtil.isNotBlank(s)判断,
StrUtil.isNotBlank()的作用为判断是否字符串是否不为空且不为仅包含空白字符的方法;如果为true,则说明缓存中存在且不为空,直接返回。
如果为false,则说明缓存中不存在或者值为空,即进入下一个判断,如果是s!=null为true,说明缓存中存在s,但是s的值为空所以返回null,
当s!=null为false后尝试获取锁,获取锁失败则等待重试,获取锁成功则查询数据库,如果数据库中存在则写入缓存中,如果不存在则向缓存中
写入空值,这样可以保证在高并发的情况下只有获取锁成功的线程会访问数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public Shop queryWithMutex(Long id) {
String key = RedisConstants.CACHE_SHOP_KEY + id;
//从redis中查询
String s = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(s)) {
return JSONUtil.toBean(s, Shop.class);
}
//判断是否为空值
if (s != null) {
return null;
}
//不存在尝试获取锁
String lockkey = "lock:shop:" + id;
Boolean islock = trylock(lockkey);
Shop shop=null;
try {
//获取锁不成功则等待反复尝试
if (!islock) {
Thread.sleep(50);
return queryWithMutex(id);
}
//如果不存在查询数据库
shop = getById(id);
//模拟实际
Thread.sleep(200);
if (shop != null) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL);
}
//如果数据库中没有该shop对象,则根据id设置为一个空值存入redis中,当下次查询该id时会在上一个if截至,不会重新查数据库
else {
stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL);
return null;
}

} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unlock(lockkey);
}
return shop;
}

逻辑过期

设置逻辑过期时间,这样可以保证key永远存在,只需要判断是否逻辑过期进行更形即可

缓存穿透

查询一个一定不存在的数据,由于缓存中没有该数据,所有的请求都会落到数据库上,导致数据库压力瞬间增大。如果有人利用这个漏洞不断发起这种查询,就可能使数据库因承受不住大量请求而崩溃。

解决方法

1 添加NULL值


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Shop redisPassThrought(Long id) {
String key =RedisConstants.CACHE_SHOP_KEY+id;
//从redis中查询
String s= stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(s)) {
return JSONUtil.toBean(s,Shop.class);
}
//判断是否为空值
if (s!=null){
return null;
}
//如果不存在查询数据库
Shop shop =getById(id);
if (shop != null){
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL);
}
//如果数据库中没有该shop对象,则根据id设置为一个空值存入redis中,当下次查询该id时会在上一个if截至,不会重新查数据库
else { stringRedisTemplate.opsForValue().set(key,"",RedisConstants.CACHE_NULL_TTL);
return null;
}
return shop;
}

Redis分布式锁 :

1 自定义分布式锁:

  调用该方法的时候需要new一个RedisLock的对象,并传入两个需要的参数key

原理 :

 利用Redis中的NX命令判断是否存在,不存在则创建,存在创建会失败
  注意 : 返回值是Boolean类型时,不建议直接返回对象success,自动拆箱可能会空指针

自定义分布式锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SimpleRedisLock implements ILock {
//锁的前缀
private static final String KEY_PREFIX = "lock:";
//具体业务名称,将前缀和业务名拼接之后当做Key
private String name;
//这里不是@Autowired注入,采用的是构造器注入,在创建SimpleRedisLock时,将RedisTemplate作为参数传入
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
long threadId = Thread.currentThread().getId();
//获取锁,使用SETNX方法进行加锁,同时设置过期时间,防止死锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
//自动拆箱可能会出现null,这样写更稳妥
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
//通过DEL来删除锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

业务逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public Result seckillVoucher(Long voucherId) {
LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>();
//1. 查询优惠券
queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId);
SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper);
//2. 判断秒杀时间是否开始
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
return Result.fail("秒杀还未开始,请耐心等待");
}
//3. 判断秒杀时间是否结束
if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
return Result.fail("秒杀已经结束!");
}
//4. 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("优惠券已被抢光了哦,下次记得手速快点");
}
Long userId = UserHolder.getUser().getId();
// 创建锁对象
SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 获取锁对象
boolean isLock = redisLock.tryLock(120);
// 加锁失败,说明当前用户开了多个线程抢优惠券,但是由于key是SETNX的,所以不能创建key,得等key的TTL到期或释放锁(删除key)
if (!isLock) {
return Result.fail("不允许抢多张优惠券");
}
try {
// 获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
redisLock.unlock();
}
}

Redis分布式锁误删情况说明

  1. 逻辑说明
    持有锁的线程1在锁的内部出现了阻塞,导致他的锁TTL到期,自动释放
    此时线程2也来尝试获取锁,由于线程1已经释放了锁,所以线程2可以拿到
    但是现在线程1阻塞完了,继续往下执行,要开始释放锁了
    那么此时就会将属于线程2的锁释放,这就是误删别人锁的情况

  2. 解决方案
    解决方案就是在每个线程释放锁的时候,都判断一下这个锁是不是自己的,如果不属于自己,则不进行删除操作。
    假设还是上面的情况,线程1阻塞,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1阻塞完了,继续往下执行,开始删除锁,但是线程1发现这把锁不是自己的,所以不进行删除锁的逻辑,当线程2执行到删除锁的逻辑时,如果TTL还未到期,则判断当前这把锁是自己的,于是删除这把锁

    解决Redis分布式锁误删问题

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
          public class RedisLock {
    private static final String KEY_PREFIX = UUID.randomUUID().toString(true)+"-";
    private String key;
    private StringRedisTemplate redisTemplate;

    public RedisLock(String key, StringRedisTemplate redisTemplate) {
    this.key = key;
    this.redisTemplate = redisTemplate;
    }


    public boolean tryLock(Long timeout) {
    //获取锁的时候将该线程标识(UUID+线程Id)存入缓存中,释放锁是判断,防止误删
    String threadId = KEY_PREFIX +Thread.currentThread().getId();
    Boolean success= redisTemplate.opsForValue().setIfAbsent(key, threadId , timeout, TimeUnit.SECONDS);
    return Boolean.TRUE.equals(success);
    }

    public void unlock() {
    String threadId = KEY_PREFIX +Thread.currentThread().getId();
    String id = redisTemplate.opsForValue().get(key);
    if(threadId.equals(id)) {
    redisTemplate.delete(key);
    }
    }
    }

    Redisson 可重入锁

    基于SETNX实现的分布式锁存在以下问题

    1. 我们编写的分布式锁只能尝试一次,失败了就返回false,没有重试机制。但合理的情况应该是:当线程获取锁失败后,他应该能再次尝试获取锁
    2. 重入问题是指获取锁的线程,可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,例如在HashTable这样的代码中,它的方法都是使用synchronized修饰的,加入它在一个方法内调用另一个方法,如果此时是不可重入的,那就死锁了。所以可重入锁的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的
    3. 我们在加锁的时候增加了TTL,这样我们可以防止死锁,但是如果卡顿(阻塞)时间太长,也会导致锁的释放。虽然我们采用Lua脚本来防止删锁的时候,误删别人的锁,但现在的新问题是没锁住,也有安全隐患
    4. 主从一致性
      如果Redis提供了主从集群,那么当我们向集群写数据时,主机需要异步的将数据同步给从机,万一在同步之前,主机宕机了(主从同步存在延迟,虽然时间很短,但还是发生了),那么又会出现死锁问题

      那么什么是Redisson呢

      Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
      Redis提供了分布式锁的多种多样功能
  3. 可重入锁(Reentrant Lock)
  4. 公平锁(Fair Lock)
  5. 联锁(MultiLock)
  6. 红锁(RedLock)
  7. 读写锁(ReadWriteLock)
  8. 信号量(Semaphore)
  9. 可过期性信号量(PermitExpirableSemaphore)
  10. 闭锁(CountDownLatch)

需要提前导入依赖坐标

对其进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
//配置Redisson,用于分布式锁
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient RedissonClient() {
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
//创建RedissonClient对象
return Redisson.create(config);
}
}

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Resource
private RedissonClient redissonClient;

@Test
void testRedisson() throws InterruptedException {
//获取可重入锁
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,三个参数分别是:获取锁的最大等待时间(期间会重试),锁的自动释放时间,时间单位
boolean success = lock.tryLock(1,10, TimeUnit.SECONDS);
//判断获取锁成功
if (success) {
try {
System.out.println("执行业务");
} finally {
//释放锁
lock.unlock();
}
}
}

原理的话要读源码,读不懂一点好吧,太难读了。等我回头学会了再来补充。
目前只知道一点Redisson可重入的原理大概是 采用了hash结构存储锁,外层key代表锁存在,
内层key代表是线程的标识 即一个方法会先获取外层key,如果锁存在,则该方法继续获取
内层key,如果内层key和自己的线程标识相同,则获取锁成功,此时value +1,方法执行完后
也不是直接释放锁,而是value -1,直到value =0时才释放锁

认识消息队列

什么是消息队列?

字面意思就是存放消息的队列,最简单的消息队列模型包括3个角色
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息

使用队列的好处在于解耦:

举个例子,快递员(生产者)把快递放到驿站/快递柜里去(Message Queue)去,我们(消费者)从快递柜/驿站去拿快递,这就是一个异步,如果耦合,那么快递员必须亲自上楼把快递递到你手里,服务当然好,但是万一我不在家,快递员就得一直等我,浪费了快递员的时间。所以解耦还是非常有必要的
那么在这种场景下我们的秒杀就变成了:在我们下单之后,利用Redis去进行校验下单的结果,然后在通过队列把消息发送出去,然后在启动一个线程去拿到这个消息,完成解耦,同时也加快我们的响应速度
这里我们可以直接使用一些现成的(MQ)消息队列,如kafka,rabbitmq等,但是如果没有安装MQ,我们也可以使用Redis提供的MQ方案(学完Redis我就去学微服务)

redis基于List的消息队列

消息队列(Message Queue),字面意思就是存放消息的队列,而Redis的list数据结构是一个双向链表,很容易模拟出队列的效果
队列的入口和出口不在同一边,所以我们可以利用:LPUSH结合RPOP或者RPUSH结合LPOP来实现消息队列。
不过需要注意的是,当队列中没有消息时,RPOP和LPOP操作会返回NULL,而不像JVM阻塞队列那样会阻塞,并等待消息,所以我们这里应该使用BRPOP或 者BLPOP来实现阻塞效果

基于List的消息队列有哪些优缺点?

优点

  1. 利用Redis存储,不受限于JVM内存上限
  2. 基于Redis的持久化机制,数据安全性有保障
  3. 可以满足消息有序性

    缺点

  4. 无法避免消息丢失(经典服务器宕机)
  5. 只支持单消费者(一个消费者把消息拿走了,其他消费者就看不到这条消息了)

    基于PubSub的消息队列

    PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费和可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息
    SUBSCRIBE channel [channel]:订阅一个或多个频道
    PUBLISH channel msg:向一个频道发送消息
    PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道

基于PubSub的消息队列有哪些优缺点

优点:

采用发布订阅模型,支持多生产,多消费

缺点:

  1. 不支持数据持久化
    2 .无法避免消息丢失(如果向频道发送了消息,却没有人订阅该频道,那发送的这条消息就丢失了)
    3 .消息堆积有上限,超出时数据丢失(消费者拿到数据的时候处理的太慢,而发送消息发的太快)

    Stream的单消费模式

    1
    2
    3

    ## 创建名为users的队列,并向其中发送一个消息,内容是{name=jack, age=21},并且使用Redis自动生成ID
    XADD users * name jack age 21

读取队列方法之一 :XREAD

1
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

  1. [COUNT count]
    每次读取消息的最大数量
  2. [BLOCK milliseconds]
    当没有消息时,是否阻塞,阻塞时长
  3. STREAMS key [key …]
    要从哪个队列读取消息,key就是队列名
  4. ID [ID …]
    起始ID,只返回大于该ID的消息
    0:表示从第一个消息开始
    $:表示从最新的消息开始

注意:
当我们指定其实ID为$时,代表只能读取到最新消息,如果当我们在处理一条消息的过程中,又有超过1条以上的消息到达队列,那么下次获取的时候,也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点

  1. 消息可回溯
  2. 一个消息可以被多个消费者读取
  3. 可以阻塞读取
  4. 有漏读消息的风险

Sream的组消费模式

将多个消费者划分为一组,监听一个队列

1.消息分流

队列中消息分给组内不同的消费者,而不是重复消费者(也可以消费者重复去执行)提高效率

2. 消息标识

消费者在读取一个消息后,会给该消息添加一个标识,从而可以记录到最后被处理的消息,即使消费者宕机。重启后可以
继续从标识的地方继续读取,直到消费者执行完毕该消息,进行确认后,该消息才会被消息队列移除,确保了每个消息最少执行一遍

3。消息确认

消费者获取消息后,消息处于pending状态,并存入一个pending-list,当处理完成后,需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移除

基于Stream实现异步秒杀

异步秒杀所以需要子线程中消费队列
思路:
在子线程中一直循环,循环中该线程指定g1组的消费者c1读取Stream中的消息,然后判断读取的消息是否为空,
为空则进行下一次循环,不为空则解析消息中的数据,得到需要的对象,然后创建订单,订单完成后进行消息确认
消息队列会移除该消息,如果执行过程中出现异常,就会先捕捉异常,然后执行pending-list中的消息,因为出现异常
消息被读取加上标识,但是并未确定,所以会在pending-list中。读取消息,判断是否为空,不为空则创建订单,
确认消息,如果为空说明pending-list中没有消息,直接break。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
String queuename ="stream.order";
private class VoucherOrderHander implements Runnable{

@Override
public void run() {
while(true){
try {
//1.2 stream
//获取消息队列中的信息
List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),
//.count(1) 表示只读取一个元素, .block(Duration.ofSeconds(2))表示如果没有消息则等待两秒,
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
// ReadOffset.lastConsumed()表示从最新元素开始读取
StreamOffset.create(queuename, ReadOffset.lastConsumed()));
//判断是否获取成功
if (list ==null || list.isEmpty()){
continue;
}
//解析信息
MapRecord<String,Object,Object> mapRecord = list.get(0);
Map<Object,Object> value = mapRecord.getValue();
VoucherOrder voucherOrder= BeanUtil.fillBeanWithMap(value, new VoucherOrder(),true);
//创建订单
porxy.CreateVoucher1(voucherOrder);
//xack确认
stringRedisTemplate.opsForStream().acknowledge(queuename,"g1",mapRecord.getId());
} catch (Exception e) {
log.info("异步订单",e);
//获取消息队列中的信息
//0表示从pending-list中的第一个消息开始,如果前面都ACK了,那么这里就不会监听到消息
List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queuename,ReadOffset.from("0")));
//判断是否获取成功
if (list ==null || list.isEmpty()){
break;
}
//解析信息
MapRecord<String,Object,Object> mapRecord = list.get(0);
Map<Object,Object> value = mapRecord.getValue();
VoucherOrder voucherOrder= BeanUtil.fillBeanWithMap(value, new VoucherOrder(),true);
//创建订单
porxy.CreateVoucher1(voucherOrder);
//xack确认
stringRedisTemplate.opsForStream().acknowledge(queuename,"g1",mapRecord.getId());
}
}
}
}

主线程

主线程中的异步秒杀的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Long userId = UserHolder.getUser().getId();
long orderId= redisIdWorker.nextId("order");
//执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(), userId.toString(),String.valueOf(orderId)
);
//判断是否为0
int r = result.intValue();
if (r!=0){
return Result.fail(r==1? "库存不足" : "不能重复下单") ;
}
porxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);

Lua脚本的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-- 订单id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 新增orderId,但是变量名用id就好,因为VoucherOrder实体类中的orderId就是用id表示的
local id = ARGV[3]
-- 优惠券key
local stockKey = 'seckill:stock:' .. voucherId
-- 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 判断库存是否充足
if (tonumber(redis.call('get', stockKey)) <= 0) then
return 1
end
-- 判断用户是否下单
if (redis.call('sismember', orderKey, userId) == 1) then
return 2
end
-- 扣减库存
redis.call('incrby', stockKey, -1)
-- 将userId存入当前优惠券的set集合
redis.call('sadd', orderKey, userId)
-- 将下单数据保存到消息队列中
redis.call("sadd", 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', id)
return 0

redis的学习就先到这里吧,时间紧任务重就要先去学微服务啦