1. 初始Redis

Redis诞生于2009年全称是Remote Dictionary Server,远程词典服务器,是一个基于内存的键值型NoSQL数据库。

特征:

  • 键值型,value支持多种不同数据结构,功能丰富。

  • 单线程,每个命令具备原子性

  • 低延迟,速度快(基于内存、IO多路复用、良好的编码)。

  • 支持数据的持久化

  • 支持主从集群、分片集群

1.1 Redis的单机安装

  1. Redis是基于C语言编写的,因此首先需要安装Redis所需要的gcc依赖:
1
yum install -y gcc tcl
  1. 上传redis压缩包到Linux
  2. 解压缩
1
tar -zxzf redis-6.2.6.tar.gz
  1. 进入解压好的redis目录
1
cd redis-6.2.6
  1. 编译运行
1
make && make install

默认的安装路径是在 /usr/local/bin目录下:

image-20240721165840433

该目录以及默认配置到环境变量,因此可以在任意目录下运行这些命令。其中:

  • redis-cli:是redis提供的命令行客户端
  • redis-server:是redis的服务端启动脚本
  • redis-sentinel:是redis的哨兵启动脚本

1.2 启动

redis的启动方式有很多种,例如:

  • 默认启动

    这种启动属于前台启动,会阻塞整个会话窗口,窗口关闭或者按下CTRL + C则Redis停止。不推荐使用。

    1
    redis-server
  • 指定配置启动

    如果要让Redis以后台方式启动,则必须修改Redis配置文件,就在我们之前解压的redis安装包下(/usr/local/src/redis-6.2.6),名字叫redis.conf:

    1
    cp redis.conf redis.conf.bck

    修改配置

    1
    2
    3
    4
    5
    6
    # 监听的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0
    bind 0.0.0.0
    # 守护进程,修改为yes后即可后台运行
    daemonize yes
    # 密码,设置后访问Redis必须输入密码
    requirepass 123321

    其他的常见配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # 监听的端口
    port 6379
    # 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录
    dir .
    # 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15
    databases 1
    # 设置redis能够使用的最大内存
    maxmemory 512mb
    # 日志文件,默认为空,不记录日志,可以指定日志文件名
    logfile "redis.log"

    启动Redis

    1
    2
    3
    4
    # 进入redis安装目录 
    cd /usr/local/src/redis-6.2.6
    # 启动
    redis-server redis.conf

    停止服务:

    1
    2
    3
    # 利用redis-cli来执行 shutdown 命令,即可停止 Redis 服务,
    # 因为之前配置了密码,因此需要通过 -u 来指定密码
    redis-cli -u 123321 shutdown
  • 开机自启

    新建一个系统服务文件:

    1
    vi /etc/systemd/system/redis.service

    内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    [Unit]
    Description=redis-server
    After=network.target

    [Service]
    Type=forking
    ExecStart=/usr/local/bin/redis-server /usr/local/src/redis-6.2.6/redis.conf
    PrivateTmp=true

    [Install]
    WantedBy=multi-user.target

    然后重载系统服务:

    1
    systemctl daemon-reload

    现在,我们可以用下面这组命令来操作redis了:

    1
    2
    3
    4
    5
    6
    7
    8
    # 启动
    systemctl start redis
    # 停止
    systemctl stop redis
    # 重启
    systemctl restart redis
    # 查看状态
    systemctl status redis

    执行下面的命令,可以让redis开机自启:

    1
    systemctl enable redis

1.3 Redis客户端

安装完成Redis,我们就可以操作Redis,实现数据的CRUD了。这需要用到Redis客户端,包括:

  • 命令行客户端
  • 图形化桌面客户端
  • 编程客户端

命令行客户端

Redis安装完成后就自带了命令行客户端:redis-cli,使用方式如下:

1
redis-cli [options] [commonds]

其中常见的options有:

  • -h 127.0.0.1:指定要连接的redis节点的IP地址,默认是127.0.0.1
  • -p 6379:指定要连接的redis节点的端口,默认是6379
  • -a 123321:指定redis的访问密码

其中的commonds就是Redis的操作命令,例如:

  • ping:与redis服务端做心跳测试,服务端正常会返回pong

不指定commond时,会进入redis-cli的交互控制台:

image-20240721172306463

不指定输入密码时,可以进入redis交换控制台输入auth输入密码也可以

1
2
127.0.0.1:6379> auth 123321
OK

2. Redis命令

2.1 数据结构介绍

Redis是一个key-value的数据库,key一般是String类型,不过value的类型多种多样:

image-20240721181551128

2.2 通用命令

查看帮助文档

1
2
3
4
# 查看数据类型
help @string
# 查看命令
help keys

keys

查看所有符合节点的key,不建议在生产环境设备上使用

1
2
3
4
# 查出所有key
keys *
# 查出a开头的key
keys a*

del

删除指定的keye。

1
2
3
4
# 删除key为name的值
del name
# 删除多个key
del k1 k2 k3 k4

exists

判断key是否存在。

1
exists name

expire

给key设置一个过期时间,有效期到期自动删除。

1
2
3
# 单位是秒
# 查看帮助文档 help expire
expire name 20

ttl

查看key剩余的有效期,-1表示用久,-2表示已被删除。

1
ttl name

2.3 String类型

String类型,也就是字符串类型,是Redis中最简单的存储类。

其value是字符串,不过根据字符串的格式不同,又可以分为3类:

  • string:普通字符串
  • int:整数类型,可以做自增、自减操作
  • float:浮点类型,可以做自增、自减操作

不管是哪种格式,底层都是字节数组形式存储,只不过是编码方式不同。字符串类型的最大空间不能超过512m。

String的常见命令

  • set:添加或修改已经存在的的string键值对。
  • get:根据key获取String类型的value。
  • mset:批量添加多个String类型键值对。
  • mget:根据多个key获取多个String 类型。
  • incr:让一个整型的key自增1。incr age -1也可实现自减的效果。
  • decr:让一个整型的key自减1。
  • incrby:让一个整型的key自增并指定步长,例如: incrby num 2让num值自增2。
  • incrbyfloat:让一个浮点类型的数字自增并指定步长。
  • setnx:添加一个string类型的键值对,前提是这个key不存在,否则不执行。这是实现分布式锁的关键命令,因为它能确保在同一时间只有一个客户端能够获得锁。set name jack nx
  • setex:添加一个String类型的键值对,并且指定有效期。set name ex 10
1
2
3
4
5
6
7
set name jack
get name
mset name lucy age 18
mget name age
incr age
incr age -1
incrby age 2

2.4 key的结构

Redis没有类1似MlyoT分不同类型的key呢?

例如,需要存储用户、商品信息到redis,有一个用户id是1,有一个商品id恰好也是1。

Redis的key允许有多个单词形成层级结构,多个单词之间用∵’隔开,格式如下:

项目名:业务名:类型:id

image-20240721204745055

2.5 Hash哈希

Hash类型,也叫散列,其value是一个无序字典,类似于Java中的HashMap结构。

String结构是将对象序列化为JSON字符串后存储,当需要修改对象某个字段时很不方便:

image-20240721213333297

Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD:

image-20240721213358631

Hash常用命令

  • hset key field value:添加或者修改hash类型key的field的值。
  • hget key field:获取一个hash类型key的field的值。
  • hmset :批量添加多个hash类型key的field的值。
  • hmget:批量获取多个hash类型key的field值。
  • hgetall:获取一个hash类型中所有的key中所有的field和value。
  • hkeys:获取一个hash类型的key中所有的field。
  • hvals:获取一个hash类型的key中所有的value。
  • hincrby:让一个hash类型key字段值自增并指定步长。
  • hsetnx:添加一个hash类型的key的field值,前提是这个field不存在,否则不执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hset itheima:user:1 name jack
hset itheima:user:1 age 18
hget itheima:user:1 name
hget itheima:user:1 age

hmset itheima:user:2 name lucy age 18
hmget itheima:user:2 name age

hgetall itheima:user:2
hkeys itheima:user:2
hvals itheima:user:2

hincrby itheima:user:2 age 2

hsetnx itheima:user:2 sex 1

2.6 List列表

Redis中的List类型与Java中的LinkedList类似,可以看做是一个双向链表结构。既可以支持正向检索和也可以支持反向检索。

  • 有序
  • 元素可以重复
  • 插入和删除快
  • 查询速度一般

List的常用命令

  • lpush key element …:向列表左侧插入一个或多个元素。
  • lpop key:移除并返回列表左侧的第一个元素,没有则返回nil。
  • rpush key element …:向列表右侧插入一个或多个元素。
  • rpop key:移除并返回列表右侧的第一个元素,没有则返回nil。
  • lrange key star end:返回一段角标范围内的所有元素。
  • blpop和brpop:与lpop和rpop类似,只不过在没有元素时等待指定时间,而不是直接返回nil。
1
2
3
4
lpush users 1 2 3
rpush users 4 5 6
# 从左边移除1个元素
lpop users 1

2.7 Set类型集合

Redis的Set结构与Java中的HashSet类似,可以看做是一个value为null的HashMap。因为也是一个hash表,因此具备与HashSet类似的特征:

  • 无序。
  • 元素不可重复。
  • 查找快。
  • 支持交集、并集、差集等功能。

Set常见命令

  • sadd key member:向set中添加一个或多个元素。
  • srem key member:移除set中的指定元素
  • scard key:返回set中的元素个数
  • sismember key member:判断一个元素是否存在于set中
  • smembers:返回set中的所有元素
  • sinter key1 key2:求key1和key2的交集
  • sdiff key1 key2:求key1和key2的差集
  • sunion key1 key2:求key1和key2的并集
1
2
3
4
5
sadd s1 a b c
smembers s1
scard s1
# 存在返回1,失败返回0
sismember s1 a
1
2
sadd s2 b c d
sinter s1 s2

练习:

将下列数据用Redis的Set集合来存储:

张三的好友有:李四、王五、赵六

李四的好友有:王五、麻子、二狗

利用Set的命令实现下列功能:

  • 计算张三的好友有几人
  • 计算张三和李四有哪些共同好友
  • 查询哪些人是张三的好友却不是李四的好友
  • 查询张三和李四的好友总共有哪些人
  • 判断李四是否是张三的好友
  • 判断张三是否是李四的好友
  • 将李四从张三的好友列表中移除
1
2
3
4
5
6
7
8
9
10
sadd zs lisi wangwu zhaoliu	
sadd ls wangwu mazi ergou
scard zs
sinter zs ls
sdiff zs ls
sunion zs ls
sismember zs lisi
sismember ls zhangsan
srem zs lisi
smembers zs

2.8 SortedSet有序集合

Redis的SortedSet是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加 hash表。SortedSet具备下列特性:

  • 可排序
  • 元素不重复
  • 查询速度快

因为SortedSet的可排序特性,经常被用来实现排行榜这样的功能。

SortedSet的常用命令

  • zadd key score member:添加一个或多个元素到sorted set中,如果已经存在则更新其score值。
  • zrem key member:删除sorted set中的一个指定元素。
  • zscore key member:获取sorted set中指定元素的score值。
  • zrank key member:获取sorted set中指定元素的排名。
  • zcard key:获取取sorted set中指定元素个数。
  • zcount key min max:统计score值在范围内的所有元素的个数。
  • zincrby key increment member:让sorted set中的指定元素自增,步长为指定的increment值。
  • zrange key min max:按照score排序后,获取指定score排名范围内的元素。
  • zrangebyscore key min max:按照score排序后,获取指定score范围内的元素。
  • zdiff、zinter、zunion:求差集,交集,并集。

注意:所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可。

练习

将班级的下列学生得分存入Redis的SortedSet中:

Jack 85,Lucy 89,Rose 82,Tom 95, Jerry 78,Amy 92,Miles 76。

并实现下列功能:

  • 删除Tom同学
  • 获取Amy同学的分数
  • 获取Rose同学的排名
  • 查询80分以下有几个学生
  • 给Amy同学加2分
  • 查出成绩前3名的同学
  • 查出成绩80分以下的所有同学
1
2
3
4
5
6
7
8
zadd stus 85 Jack 89 Lucy 82 Rose 95 Tom 78 Jerry 92 Amy 76 Miles
zrem stus Tom
zscore stus Amy
zrevrank stus Rose
zcount stus 0 80
zincrby stus 2 Amy
zrevrange stus 0 2
zrangebyscore stus 0 80

2.9 BitMap位图

3. Java客户端

3.1 Jedis

  1. 引入依赖
1
2
3
4
5
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>
  1. 建立链接,设置密码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Jedis jedis;

@BeforeEach
public void setUp(){
// 建立链接
jedis = new Jedis("192.168.200.100",6379);
// 设置密码
jedis.auth("123321");
// 选择库
jedis.select(0);
}

// 关闭链接,释放资源
@AfterEach
public void closeConn(){
if (jedis != null){
jedis.close();
}
}
  1. 测试
1
2
3
4
5
6
7
@Test
public void testString(){
String result = jedis.set("name", "lucy");
System.out.println("result:"+result);
String name = jedis.get("name");
System.out.println("name:"+name);
}

连接池方式

Jedis本身是线程不安全的,并且频繁的创建和销毁链接会有性能损耗,因此我们推荐大家使用Jedis链接池代替Jedis的直连方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class JedisConnectionFactory {

private static final JedisPool JEDIS_POOL;

static{
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 最大链接
jedisPoolConfig.setMaxTotal(8);
// 最大空闲链接
jedisPoolConfig.setMaxIdle(8);
// 最小空闲链接
jedisPoolConfig.setMinIdle(0);
// 设置最长等待时长
jedisPoolConfig.setMaxWait(Duration.ofMillis(200));
JEDIS_POOL = new JedisPool(jedisPoolConfig,"192.168.200.100",6379,1000,"123321");
}

public static Jedis getJedis(){
return JEDIS_POOL.getResource();
}
}
1
2
3
4
5
6
7
8
9
10
@Test
public void testJedisPool(){
Jedis jedis = JedisConnectionFactory.getJedis();
HashMap<String, String> map1 = new HashMap<>();
map1.put("name","jack");
map1.put("age","21");
jedis.hmset("stus1",map1);
List<String> hmget = jedis.hmget("stus1", "name", "age");
System.out.println(hmget);
}

3.2 SpringDataRedis

SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis。

  • 提供对不同Redis客户端的整合(Lettuce和Jedis)
  • 提供了RedisTemplate统一API来操作Redis
  • 支持Redis的发布订阅模型
  • 支持Redis哨兵和Redis集群
  • 支持基于Lettuce的响应式编程
  • 支持基于JDK,JSON,字符串,Spring对象数据序列化及反序列化。
  • 支持基于Redis的JDKCollection实现

SpringDataRedis中提供了RedisTemplate工具类,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:

image-20240723112616341

快速入门

  1. 引入依赖
1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
  1. 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
redis:
host: 192.168.200.100
port: 6379
password: 123321
database: 0 #指定库
lettuce:
pool:
enabled: true
max-active: 8
max-idle: 8
min-idle: 0
max-wait: 100ms
  1. 注入RedisTemplate,并进行测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest
class SpringDataRedisApplicationTests {

@Autowired
private RedisTemplate redisTemplate;

@Test
void testString() {
ValueOperations valueOperations = redisTemplate.opsForValue();
valueOperations.set("name","cjz");
Object name = valueOperations.get("name");
System.out.println(name);
}
}

发现问题?

RedisTemplate可以接收任意Object作为值写入Redis,只不过写入前会把Object序列化为字节形式,默认是采用JDK序列化,得到的结果是这样的:

image-20240723114856595

  • 可读性差
  • 内存占用较大
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class RedisConfig {

@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory connectionFactory){
// 创建RedisTemplate对象
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
// 设置连接工厂
redisTemplate.setConnectionFactory(connectionFactory);
// 创建JSON序列化工具
GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
// 设置key的序列化
redisTemplate.setKeySerializer(RedisSerializer.string());
redisTemplate.setHashKeySerializer(RedisSerializer.string());
// 设置value的序列化
redisTemplate.setValueSerializer(jsonRedisSerializer);
redisTemplate.setHashValueSerializer(jsonRedisSerializer);
//返回
return redisTemplate;
}
}

测试:

1
2
3
4
5
6
7
@Test
void testSaveUser(){
ValueOperations<String,User> valueOperations = redisTemplate.opsForValue();
valueOperations.set("user:100",new User("cjz","23"));
User user = valueOperations.get("user:100");
System.out.println(user);
}

image-20240723120844947

发现问题?

为了在反序列化时知道对象的类型,JSON序列化器会将类的class类型写入json结果中,存入Redis,会带来额外的内存开销。

image-20240723121651786

为了在反序列化时知道对象的类型,JSON序列化器会将类的class类型写入json结果中,存入Redis,会带来额外的内存开销。

StringRedisTemplate

为了节省内存空间,我们并不会使用JSON序列化器来处理value,而是统一使用String序列化器,要求只能存储String类型的key和value。当需要存储Java对象时,手动完成对象的序列化和反序列化

Spring默认提供了一个StringRedisTemplate类,它的key和value的序列化方式默认就是String方式。省去了我们自定义RedisTemplate的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Autowired
private StringRedisTemplate stringRedisTemplate;

@Test
void testStringTemplate(){
ValueOperations<String, String> stringValueOperations = stringRedisTemplate.opsForValue();
User user = new User("yyds", "123");
// 手动序列化
String jsonStr = JSONUtil.toJsonStr(user);
stringValueOperations.set("user:100",jsonStr);

String value = stringValueOperations.get("user:100");
// 手动反序列化
User parseUser = JSONUtil.toBean(value, User.class);
System.out.println(parseUser);
}

image-20240723123344473

操作hash类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void testHash(){
HashOperations<String, Object, Object> hashTemplate = stringRedisTemplate.opsForHash();
hashTemplate.put("user:200","name","Jack");
hashTemplate.put("user:200","age","17");

Set<Object> keys = hashTemplate.keys("user:200");
System.out.println(keys);

List<Object> values = hashTemplate.values("user:200");
System.out.println(values);


Map<Object, Object> entries = hashTemplate.entries("user:200");
System.out.println(entries);
}

4. 缓存

4.1 什么是缓存?缓存更新策略

缓存就是数据交换的缓冲区(称作Cache [ kae[]),是存贮数据的临时地方,一般读写性能较高。

缓存更新策略

image-20240724112404789

操作缓存和数据库时有三个问题需要考虑:

  1. 删除缓存还是更新缓存?

    • 更新缓存:每次更新数据库都更新缓存,无效写操作较多

    • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存,一般采用该方式

  2. 如何保证缓存与数据库的操作的同时成功或失败?

    • 单体系统,将缓存与数据库操作放在一个事务
    • 分布式系统,利用TCC等分布式事务方案
  3. 先操作缓存还是先操作数据库?

    • 先删缓存,在操作数据库
    • 先操作数据库,在删缓存(推荐

最佳实践

  1. 低一致性需求:使用Redis自带的内存淘汰机制

  2. 高一致性需求:主动更新,并以超时剔除作为兜底方案

    读操作

    • 缓存命中则直接返回
    • 缓存未命中则查询数据库,并写入缓存,设定超时时间

    写操作

    • 先写数据库,然后再删除缓存
    • 要确保数据库与缓存操作的原子性

4.2 缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

image-20240724115506656

常见的解决方案:

  1. 缓存空对象
    • 优点:方便维护
    • 缺点:额外的内存消耗;可能造成短期不一致。
image-20240724115749946
  1. 布隆过滤

    • 优点:内存占用小,没有多余的key。
    • 缺点:实现复杂,存在误判可能。
    image-20240724115841475

4.3 缓存雪崩

缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

image-20240724124519755

常见的解决方案:

  • 给不同key的TTL添加随机值。
  • 利用Redis集群提高服务的可用性。
  • 给缓存业务添加降级限流策略。
  • 给业务添加多级缓存。

4.4 缓存击穿

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

  • 互斥锁
image-20240724125632213
  • 逻辑过期
image-20240724130005843 image-20240724130036278

使用互斥锁实现

image-20240724130736464
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Override
public Result queryById(Long id) {
// 缓存穿透
// queryWithPassThrough(id);

// 互斥锁解决缓存击穿
queryWithMtex(id);
return Result.ok();
}

private Shop queryWithMtex(Long id) {
String key = RedisConstants.CACHE_SHOP_KEY + id;
String shopJson = redisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(shopJson)){
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
// 命中的是不是空值
if (shopJson != null){
// 返回错误信息
return null;
}
// 实现缓存重建
// 1.获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
Shop shop = null;
try {
boolean flag = tryLock(lockKey);
// 2.判断是否获取成功
if (!flag){
// 3.失败,并休眠重试
Thread.sleep(50);
return queryWithMtex(id);
}

// 4.成功,写入redis
shop = getById(id);
if (shop == null){
// 缓存空数据
redisTemplate.opsForValue().set(key, "",RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 写入redis
redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 释放互斥锁
unLock(lockKey);
}
return shop;
}

private boolean tryLock(String key){
Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.MINUTES);
return BooleanUtil.isTrue(flag);
}

private void unLock(String key){
redisTemplate.delete(key);
}

基于逻辑过期方式解决缓存击穿问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Data
public class RedisData {

private LocalDateTime expireTime;

private Object object;
}

public void saveShop2Redis(Long id,Long expireSeconds){
// 查询店铺数据
Shop shop = getById(id);
RedisData redisData = new RedisData();
redisData.setObject(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
// 写入redis
redisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}

缓存工具类的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@Component
@Slf4j
@RequiredArgsConstructor
public class CacheClient {

private final StringRedisTemplate stringRedisTemplate;

// 将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
public void set(String key, Object val, Long time, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(val),time,unit);
}

// 将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
public void setWithLogicalExpire(String key, Object val, Long time, TimeUnit unit){
RedisData redisData = new RedisData();
redisData.setData(val);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}

// 根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
public <R,ID> R withPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,
Long time, TimeUnit unit){
String key = keyPrefix+id;
String json = stringRedisTemplate.opsForValue().get(key);
// 1. 判断是否存在
if (StrUtil.isNotBlank(json)){
return JSONUtil.toBean(json,type);
}
// 判断命中是否是空值
if (json != null){
return null;
}
// 不存在,查询数据库
R r = dbFallback.apply(id);
if (r == null){
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
// 存在,写入redis
this.set(key,r,time,unit);
return r;
}

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

// 根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id, Class<R> type,Function<ID,R> dbFallback,Long time, TimeUnit unit){
String key = keyPrefix+id;
String json = stringRedisTemplate.opsForValue().get(key);
// 1. 判断是否存在
if (StrUtil.isBlank(json)){
return null;
}
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
if (expireTime.isAfter(LocalDateTime.now())){
return r;
}
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
if (isLock){
CACHE_REBUILD_EXECUTOR.submit(()->{
try{
//查询数据库,写入缓存
R r1 = dbFallback.apply(id);
this.setWithLogicalExpire(key,r1,time,unit);
}catch (Exception e){
throw new RuntimeException(e);
}finally {
unLock(lockKey);
}
});
}
return r;
}

private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.MINUTES);
return BooleanUtil.isTrue(flag);
}

private void unLock(String key){
stringRedisTemplate.delete(key);
}
}

4.5 基于Redis全局唯一ID的生成

image-20240724154832178
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class RedisIdWorker {

// 开始时间戳
private static final long BEGIN_TIMESTAMP = 1640995200L;

// 序列号的位数
private static final int COUNT_BITS = 32;

private StringRedisTemplate stringRedisTemplate;

public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

private long nexId(String keyPrefix){
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 2.生成序列号
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3.拼接并返回
return (timestamp << COUNT_BITS) | count;
}
}

5. 分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

image-20240724211228108

5.1 分布式锁的实现

image-20240724211740758

实现分布式锁时需要实现两个基本方法:

  • 获取锁:

    • 互斥,确保只能有一个线程获取锁

    • 非阻塞:尝试一次,成功返回true,失败返回false

      1
      2
      3
      4
      5
      // 添加锁,利用SETNX的互斥特性
      SETNX lock thread1
      expire lock 5
      // 保证原子性
      set lock thread1 EX 10 NX
  • 释放锁:

    • 手动释放

    • 超时释放

      1
      2
      # 释放锁,删除即可
      DEL lock

分布式锁实现1

image-20240724214206819
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public interface ILock {

/**
* 尝试获得锁
* @param timeout 锁持有的超时时间,过期后自动释放
* @return true表示获取锁成功,false代表获取锁失败
*/
boolean tryLock(long timeout);

// 释放锁
void unlock();
}

public class SimpleRedisLock implements ILock{

private String name;
private String KEY_PREFIX = "lock:";
private final StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean tryLock(long timeout) {
// 获取线程的标识
long threadId = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeout, TimeUnit.MINUTES);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

修改,分布式实现2

线程1因为某种原因陷入阻塞,超过了锁的过期时间,锁被释放掉了,线程二可以获取锁,这时,线程1醒了,把线程2的锁释放了。

image-20240724221821252

image-20240724221606405

image-20240724222044625

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SimpleRedisLock implements ILock{

private String name;
private String KEY_PREFIX = "lock:";
private final StringRedisTemplate stringRedisTemplate;

// 防止集群模式下线程ID重复
private static final String ID_PREFIX = UUID.randomUUID().toString(true) +"-";

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean tryLock(long timeout) {
// 获取线程的标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.MINUTES);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
String threadId = ID_PREFIX + Thread.currentThread().getId();
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断和删除的过程不具备原子性
if (threadId.equals(id)){
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

修改,分布式实现3

实现方式二中,判断和删除的过程不具备原子性。

image-20240724224043376

Redis的Lua脚本

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。

image-20240724225024795

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

image-20240724225128114

image-20240724225539221

image-20240724225836256

释放锁的业务流程是这样的:

  1. 获取锁中的线程标示
  2. 判断是否与指定的标示(当前线程标示)一致
  3. 如果一致则释放锁(删除)
  4. 如果不一致则什么都不做
1
2
3
4
5
6
7
-- 获取锁中的线程标识 get key
local id = redis.call('get',KEYS[1])
-- 比较线程标识与锁中是否一致
if(id == ARGV[1]) then
return redis.call('del',KEYS[1])
end
return 0

使用java代码调用Lua脚本

image-20240724231458693

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SimpleRedisLock implements ILock{

private String name;
private StringRedisTemplate stringRedisTemplate;

private static final String KEY_PREFIX = "lock:";
// 防止集群模式下线程ID重复
private static final String ID_PREFIX = UUID.randomUUID().toString(true) +"-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}



public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean tryLock(long timeout) {
// 获取线程的标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.MINUTES);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 调用Lua脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name),threadId);
}
}

5.2 基于Redis的分布式锁

基于setnx实现分布式锁存在以下问题:

image-20240724234316215

Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格( ln-Memory Data Grid ) 。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

image-20240724234822546

快速入门

  1. 引入依赖
1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
  1. 配置Redisson客户端
1
2
3
4
5
6
7
8
9
10
@Configuration
public class RedisConfig {

@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.200.100:6379").setPassword("123321");
return Redisson.create(config);
}
}
  1. 使用Redisson分布式锁

image-20240724235549778

Redisson可重入原理

每次获取锁,次数加1,每次释放锁,次数减1,当减到0时就可以删除锁。

image-20240725112955247


获得锁

  1. 判断锁是否存在
    • 不存在,获得锁并添加线程标识,设置有效期,执行业务。
    • 存在,判断锁标识是不是自己的。
      • 不是自己的,获取锁失败
      • 是自己的,锁计数加1,设置有效期,执行业务
  2. 执行完业务后,判断锁算不算自己的
    • 不是,锁已释放
    • 是,锁计数-1,判断计数是否为0
      • 不是,重置有效期
      • 是,释放锁

image-20240725113124572


释放锁


image-20240725113230923

Redisson分布式锁原理

image-20240725124915994

为了防止业务执行时出现阻塞而ttl到期释放,另一个线程拿到了锁导致的安全问题,需要自动续期。(保证锁是我们业务执行完成释放)

Redisson主从一致性问题

image-20240725130700014

RedLock(红锁):不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁(n/2+ 1),避免在一个redis实例上加锁。

必须在所有节点能获取锁,才能获取锁成功。

image-20240725130815581

image-20240726181603906

6. Redis消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理( Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

6.1 基于List结构模拟消息队列

而Redis的list数据结构是一个双向链表很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现。

下过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。

因此这里应该使用**BRPOP或者BLPOP**来实现阻塞效果。

优点:

  • 利用Redis存储

  • 不受限于JVM内存上限基于Redis的持久化机制

  • 数据安全性有保证可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

6.2 基于PubSub实现消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

image-20240725153038528

image-20240725153429205

image-20240725153440307

优点:

  • 采用发布订阅模型,支持多生产多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

6.3 基于Stream的消息队列

Stream是Redis 5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

image-20240725154233161

image-20240725154656290

image-20240725154612743

image-20240725154820267

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

消费者组

image-20240725155248167

7. Redis的持久化

7.1 RDB持久化

RDB全称Redis Database Backup file (Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。

快照文件成为RDB文件,默认保存在当前运行目录。

1
2
3
redis-cli
>save #由Redis主进程来执行RDB,会阻塞所有命令
>bgsave #开启子进程执行RDB,避免主进程受到影响(推荐)

Redis停机时会执行一次RDB。

image-20240725160227453

bgsave开始时会fork主进程得到子进程,子进程共享主进程的内存数据。完成fork后读取内存数据并写入RDB文件。

fork采用的是copy-on-write技术:

  • 当主进程执行读操作时,访问共享内存;
  • 当主进程执行写操作时,则会拷贝一份数据,执行写操作。

image-20240725161644847

RDB方式bgsave的基本流程?

  • fork主进程得到一个子进程,共享内存空间
  • 子进程读取内存数据并写入新的RDB文件
  • 用新RDB文件替换旧的RDB文件。

RDB会在什么时候执行? save 60 1000代表什么含义?

默认是服务停止时。

代表60秒内至少执行1000次修改则触发RDB

RDB的缺点是什么?

  • RDB执行间隔时间长,两次RDB之间写入数据有丢失的风险

  • fork子进程、压缩、写出RDB文件都比较耗时

7.2 AOF持久化

AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件。

image-20240725162435678

AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF

1
2
3
4
# 是否开启AOF功能,默认是no
appendonly yes
# AOF文件的名称
appendfilename "appendonly.aof"

AOF的命令记录的频率也可以通过redis.conf文件来配:

1
2
3
4
5
6
# 表示每执行一次写命令,立即记录到AOF文件
appendfsync always
# 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案
appendfsync everysec
# 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
appendfsync no

image-20240725163129465

因为是记录命令,AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作,但只有最后一次写操作才有意义。通过执行**bgrewriteaof**命令,可以让AO文件执行重写功能,用最少的命令达到相同效果。

image-20240725163806397

image-20240725164021547

7.3 RDB和AOF的对比

image-20240725164524270

8. Redis主从

8.1 集群架构

单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离。

image-20240729120755020

共包含3个节点,一个主节点,两个从节点。

IP PORT 角色
192.168.200.100 7001 master
192.168.200.100 7002 slave
192.168.200.100 7003 slave

8.2 准备和实例

  1. 创建3个目录,分别是7001,7002,7003
1
mkdir 7001 7002 7003
  1. 恢复原始配置

修改redis-6.2.4/redis.conf文件,将其中的持久化模式改为默认的RDB模式,AOF保持关闭状态。

1
2
3
4
5
6
7
8
# 开启RDB
# save ""
save 3600 1
save 300 100
save 60 10000

# 关闭AOF
appendonly no
  1. 拷贝配置文件到每个实例目录

然后将redis-6.2.4/redis.conf文件拷贝到三个目录中(在/tmp目录执行下列命令):

1
2
3
cp redis-6.2.4/redis.conf 7001
cp redis-6.2.4/redis.conf 7002
cp redis-6.2.4/redis.conf 7003
  1. 修改每个实例的端口、工作目录

修改每个文件夹内的配置文件,将端口分别修改为7001、7002、7003,将rdb文件保存位置都修改为自己所在目录(在/tmp目录执行下列命令):

1
2
3
sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/tmp\/7001\//g' 7001/redis.conf
sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/7002\//g' 7002/redis.conf
sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/7003\//g' 7003/redis.conf
  1. 需要关闭保护模式
1
2
3
4
5
6
# 绑定地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问
bind 0.0.0.0
# 保护模式,关闭保护模式
protected-mode no
# 数据库数量,设置为1
databases 1
  1. 修改每个实例的声明IP

虚拟机本身有多个IP,为了避免将来混乱,我们需要在redis.conf文件中指定每一个实例的绑定ip信息,格式如下:

1
2
# redis实例的声明 IP
replica-announce-ip 192.168.200.100

每一个目录都要改

1
2
3
4
5
6
7
# 逐一执行
sed -i '1a replica-announce-ip 192.168.200.100' 7001/redis.conf
sed -i '1a replica-announce-ip 192.168.200.100' 7002/redis.conf
sed -i '1a replica-announce-ip 192.168.200.100' 7003/redis.conf

# 或者一键修改
printf '%s\n' 7001 7002 7003 | xargs -I{} -t sed -i '1a replica-announce-ip 192.168.200.100' {}/redis.conf

8.3 启动

为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:

1
2
3
4
5
6
# 第1个
redis-server 7001/redis.conf
# 第2个
redis-server 7002/redis.conf
# 第3个
redis-server 7003/redis.conf

image-20240729124715139

8.4 此时还没有主从关系

现在三个实例还没有任何关系,要配置主从可以使用replicaof 或者slaveof(5.0以前)命令。

有临时和永久两种模式:

  • 修改配置文件(永久生效)

    • 在redis.conf中添加一行配置:slaveof <masterip> <masterport>
  • 使用redis-cli客户端连接到redis服务,执行slaveof命令(重启后失效):

    1
    slaveof <masterip> <masterport>

注意:

在5.0以后新增命令replicaof,与salveof效果一致。

方式二:

配置主从关系。

1
2
3
4
# 连接 7002
redis-cli -p 7002
# 执行slaveof
slaveof 192.168.150.101 7001
1
2
3
4
# 连接 7002
redis-cli -p 7003
# 执行slaveof
slaveof 192.168.150.101 7001

然后连接7001,查看集群状态。

1
2
3
4
# 连接 7001
redis-cli -p 7001
# 查看状态
info replication

8.5 测试

执行下列操作以测试:

  • 利用redis-cli连接7001,执行set num 123

  • 利用redis-cli连接7002,执行get num,再执行set num 666

  • 利用redis-cli连接7003,执行get num,再执行set num 888

可以发现,只有在7001这个master节点上可以执行写操作,7002和7003这两个slave节点只能执行读操作。

8.6 数据同步原理

全量同步

主从第一次同步是全量同步:

image-20240729132306821

master如何判断slave是不是第一次来同步数据?这里会用到两个很重要的概念:

  • Replication ld:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个master都有唯一的replid,slave则会继承master节点的replid。

  • offset:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新。

因此slave做数据同步,必须向master声明自己的replication id和offset,master才可以判断到底需要同步哪些数据

image-20240729132816475

简述全量同步的流程?

  • slave节点请求增量同步
  • master节点判断replid,发现不一致,拒绝增量同步
  • master将完整内存数据生成RDB,发送RDB到slave
  • slave清空本地数据,加载master的RDB
  • master将RDB期间的命令记录在repl_baklog,并持续将log中的命令发送给slave
  • slave执行接收到的命令,保持与master之间的同步

增量同步

主从第一次同步是全量同步,但如果slave重启后同步,则执行增量同步。

master和slave会在这个环循环同步

image-20240729133635092

image-20240729133708844


repl_baklog大小有上限,写满后会覆盖最早的数据。如果slave断开时间过久,导致尚未备份的数据被覆盖,则无法基于log做增量同步,只能再次全量同步。

image-20240729133939148

image-20240729134025304

可以从以下几个方面来优化Redis主从就集群:

  • 在master中配置repl-diskless-sync yes启用无磁盘复制(当我要写RDB文件时,不把它写到磁盘的IO流,写到网络中去),避免全量同步时的磁盘IO。(磁盘比较慢,网络比较快的场景下使用)。

  • Redis单节点上的内存占用不要太大,减少RDB导致的过多磁盘。

  • 适当提高repl_baklog的大小,发现slave宕机时尽快实现故障恢复,尽可能避免全量同步。

  • 限制一个master上的slave节点数量,如果实在是太多slave,则可以采用主-从-从链式结构,减少master压力。

image-20240729134713308

总结

image-20240729134820270

9. Redis哨兵

slave节点宕机恢复后可以找master节点同步数据,那master节点宕机怎么办?

9.1 哨兵的作用和工作原理

哨兵的作用

Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。哨兵的结构和作用如下:

  • 监控:Sentinel会不断检查您的master和slave是否按预期工作。
  • 自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主。
  • 通知:Sentinel充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端。

image-20240729135429265

服务状态监控

Sentinel基于心跳机制监测服务状态,每隔1秒向集群的每个实例发送ping命令:

主观下线:如果某sentinel节点发现某实例未在规定时间响应则认为该实例主观下线

客观下线:若超过指定数量(quorum)的sentinel都认为该实例主观下线,则该实例客观下线。quorum值最好超过Sentinel实例数量的一半。

image-20240729135719270

选举新的master

一旦发现master故障,sentinel需要在salve中选择一个作为新的master,选择依据是这样的:

  • 首先会判断slave节点与master节点断开时间长短,如果超过指定值(down-after-milliseconds*10)则会排除该slave节点。

  • 然后判断slave节点的slave-priority值,越小优先级越高,如果是0则永不参与选举。

  • 如果slave-prority一样,则判断lave节点的offset值,越大说明数据越新,优先级越高。

  • 最后是判断slave节点的运行id大小,越小优先级越高。

如何实现故障转移

当选中了其中一个slave为新的master后(例如slave1 ) ,故障的转移的步骤如下:

  1. 假如master宕机了
image-20240729140235295
  1. sentinal给备选的slavel节点发送一个slaveof no one命令,让该节点成为master。
image-20240729140414216
  1. sentinel给所有其它slave发送slaveof 192.168.200.100 7002命令,让这些slave成为新master的从节点,升始从新的master上同步数据。

    image-20240729140519933
  2. 最后,sentinel将故障节点标记为slave,当故障节点恢复后会自动成为新的master的slave节点。

    image-20240729140633274

总结

image-20240729140847320

9.2 搭建哨兵集群

首先搭建一个三节点形成的Sentinel集群,来监管之前的Redis主从集群。如图:

image-20240729141250762

三个sentinel实例信息如下:

节点 IP PORT
s1 192.168.200.100 27001
s2 192.168.200.100 27002
s3 192.168.200.100 27003

要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。

创建三个文件夹,名字分别叫s1、s2、s3:

1
2
3
4
# 进入/tmp目录
cd /tmp
# 创建目录
mkdir s1 s2 s3

然后我们在s1目录创建一个sentinel.conf文件,添加下面的内容:

1
2
3
4
5
6
7
8
port 27001
sentinel announce-ip 192.168.200.100
sentinel monitor mymaster 192.168.200.100 7001 2
# master与slavel断开的最长超时时间
sentinel down-after-milliseconds mymaster 5000
# slavel故障恢复的超时时间
sentinel failover-timeout mymaster 60000
dir "/tmp/s1"
  • port 27001:是当前sentinel实例的端口
  • sentinel monitor mymaster 192.168.200.100 7001 2:指定主节点信息
    • mymaster:主节点名称,自定义,任意写
    • 192.168.200.100 7001:主节点的ip和端口
    • 2:选举master时的quorum值。当超过两个的sentinel决定你下线了,那就是客观下线

然后将s1/sentinel.conf文件拷贝到s2、s3两个目录中(在/tmp目录执行下列命令):

1
2
cp s1/sentinel.conf s2
cp s1/sentinel.conf s3

修改信息

1
2
sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf

启动

1
2
3
4
5
6
# 第1个
redis-sentinel s1/sentinel.conf
# 第2个
redis-sentinel s2/sentinel.conf
# 第3个
redis-sentinel s3/sentinel.conf

测试

尝试然master宕机,查看日志。

9.3 RedisTemplate的哨兵模式

  1. 引入依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  1. 添加sentinel的配置
1
2
3
4
5
6
7
8
spring:
redis:
sentinel:
master: mymaster
nodes:
- 192.168.200.100:27001
- 192.168.200.100:27002
- 192.168.200.100:27003
  1. 配置主从读写分离集群
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean
public LettuceClientConfigurationBuilderCustomizer configurationBuilderCustomizer(){
return new LettuceClientConfigurationBuilderCustomizer() {
@Override
public void customize(LettuceClientConfiguration.LettuceClientConfigurationBuilder clientConfigurationBuilder) {
clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}
};
}
// lambda简化
@Bean
public LettuceClientConfigurationBuilderCustomizer configurationBuilderCustomizer(){
return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}

这里的ReadFrom是配置Redis的读取策略,是一个枚举,包括下面选择:

  • MASTER:从主节点读取。
  • MASTER PREFERRED:优先从master节点读取,master不可用才读取replica。
  • REPLICA: 从slave (replica)节点读取。
  • REPLICA_PREFERRED:优先从slave (replica)节点读取,所有的slave都不可用才读取master。

10. Redis分片集群

10.1 分片集群结构

主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:

  • 海量数据存储问题

  • 高并发写的问题

使用分片集群可以解决上述问题,分片集群特征:

  • 集群中有多个master,每个master保存不同数据

  • 每个master都可以有多个slave节点

  • master之间通过ping监测彼此健康状态

  • 客户端请求可以访问集群任意节点,最终都会被转发到正确节点

10.2 搭建分片集群

分片集群需要的节点数量较多,这里我们搭建一个最小的分片集群,包含3个master节点,每个master包含一个slave节点,结构如下:

image-20210702164116027

这里我们会在同一台虚拟机中开启6个redis实例,模拟分片集群,信息如下:

IP PORT 角色
192.168.200.100 7001 master
192.168.150.101 7002 master
192.168.150.101 7003 master
192.168.150.101 8001 slave
192.168.150.101 8002 slave
192.168.150.101 8003 slave
  1. 将之前搭建主从+哨兵集群的文件删除
1
2
3
4
5
6
# 进入/tmp目录
cd /tmp
# 删除旧的,避免配置干扰
rm -rf 7001 7002 7003
# 创建目录
mkdir 7001 7002 7003 8001 8002 8003
  1. 在/tmp下准备一个新的redis.conf文件,内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
port 6379
# 开启集群功能
cluster-enabled yes
# 集群的配置文件名称,不需要我们创建,由redis自己维护
cluster-config-file /tmp/6379/nodes.conf
# 节点心跳失败的超时时间
cluster-node-timeout 5000
# 持久化文件存放目录
dir /tmp/6379
# 绑定地址
bind 0.0.0.0
# 让redis后台运行
daemonize yes
# 注册的实例ip
replica-announce-ip 192.168.200.100
# 保护模式
protected-mode no
# 数据库数量
databases 1
# 日志
logfile /tmp/6379/run.log
  1. 将这个文件拷贝到每个目录下:
1
2
3
4
# 进入/tmp目录
cd /tmp
# 执行拷贝
echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf
  1. 修改每个目录下面的redis.conf,将其中的6379修改为与目标目录一致:
1
2
3
4
# 进入/tmp目录
cd /tmp
# 修改配置文件
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf、

启动

因为已经配置了后台启动模式,所以可以直接启动服务:

1
2
3
4
# 进入/tmp目录
cd /tmp
# 一键启动所有服务
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf

通过ps查看状态:

1
ps -ef | grep redis

image-20240729161337243

如果要关闭所有进程,可以执行命令:

1
2
3
4
# 推荐
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-cli -p {} shutdown

ps -ef | grep redis | awk '{print $2}' | xargs kill

创建集群

虽然服务启动了,但是目前每个服务之间都是独立的,没有任何关联。

我们需要执行命令来创建集群,在Redis5.0之前创建集群比较麻烦,5.0之后集群管理命令都集成到了redis-cli中。

1)Redis5.0之前

Redis5.0之前集群命令都是用redis安装包下的src/redis-trib.rb来实现的。因为redis-trib.rb是有ruby语言编写的所以需要安装ruby环境。

1
2
3
# 安装依赖
yum -y install zlib ruby rubygems
gem install redis

然后通过命令来管理集群:

1
2
3
4
# 进入redis的src目录
cd /tmp/redis-6.2.4/src
# 创建集群
./redis-trib.rb create --replicas 1 192.168.200.100:7001 192.168.200.100:7002 192.168.200.100:7003 192.168.200.100:8001 192.168.200.100:8002 192.168.200.100:8003

2)Redis5.0以后

我们使用的是Redis6.2.4版本,集群管理以及集成到了redis-cli中,格式如下:

1
redis-cli --cluster create --cluster-replicas 1 192.168.200.100:7001 192.168.200.100:7002 192.168.200.100:7003 192.168.200.100:8001 192.168.200.100:8002 192.168.200.100:8003

命令说明:

  • redis-cli --cluster或者./redis-trib.rb:代表集群操作命令
  • create:代表是创建集群
  • --replicas 1或者--cluster-replicas 1 :指定集群中每个master的副本个数为1,此时节点总数 ÷ (replicas + 1) 得到的就是master的数量。因此节点列表中的前n个就是master,其它节点都是slave节点,随机分配到不同master

运行后的样子:

image-20240729162116838

这里输入yes,则集群开始创建:

image-20240729162240809

通过命令可以查看集群状态:

1
redis-cli -p 7001 cluster nodes

image-20240729162313633

测试

尝试连接7001节点,存储一个数据:

1
2
3
4
5
6
7
8
# 连接
redis-cli -p 7001
# 存储数据
set num 123
# 读取数据
get num
# 再次存储
set a 1

结果悲剧了:

image-20210702182343979

集群操作时,需要给redis-cli加上-c参数才可以:

1
redis-cli -c -p 7001

这次可以了:

image-20210702182602145

10.3 散列插槽

Redis会把每一个master节点映射到0~16383共16384个插槽( hash slot) 上,查看集群信息时就能看到:

image-20240729162643490

数据key不是与节点绑定,而是与插槽绑定。redis会根据key的有效部分计算插槽值,分两种情况:

  • key中包含”{}”,且“{}”中至少包含1个字符,“{}”中的部分是有效部分
  • key中不包含“{}”,整个key都是有效部分

例如:key是num,那么就根据num计算,如果是(itcast)num,则根据itcast计算。计算方式是利用CRC16算法得到一个hash值,然后对16384取余,得到的结果就是slot值。

image-20240729164304774

10.4 集群伸缩

redis-cli –cluster提供了很多操作集群的命令,可以通过下面方式查看:

1
redis-cli --cluster help

添加节点

image-20240729164736622

案例

image-20240729165012812

1
2
3
4
5
6
7
8
9
10
mkdir 7004
cp redis.conf ./7004/
sed -i s/6379/7004/g 7004/redis.conf
cat 7004/redis.conf
# 启动7004,此时还没有添加到集群中
redis-server 7004/redis.conf
# 查看进程
ps -ef | grep redis
# 添加到集群中
redis-cli --cluster add-node 192.168.200.100:7004 192.168.200.100:7001

分配插槽:

image-20240729165952654

1
redis-cli --cluster reshard 192.168.200.100:7001

image-20240729170542378

1
redis-cli -p 7001 cluster nodes

image-20240729170617508

image-20240729170655313

删除节点

先把插槽移走,在删除节点(从集群中删除)

1
redis-cli --cluster reshard 192.168.200.100:7001

image-20240729171304087

已经没有插槽了

image-20240729171427664

删除节点

1
redis-cli --cluster del-node 192.168.200.100:7004 b6f11cdda7358b9acb51f9ed3e5a63db079cd9c4

10.5 故障转移

1
2
# 监控集群状态
watch redis-cli -p 7001 cluster nodes
1
2
3
4
# 使7002的master宕机
redis-cli -p 7002 shutdown
# 启动
redis-server 7002/redis.conf

image-20240729172509178

此时7002已经成为从节点。

当集群中有一个master宕机会发生什么呢?

  1. 首先是该实例与其它实例失去连接。
  2. 然后是疑似宕机
  3. 最后是确定下线,自动提升一个slave为新的master

手动故障转移

利用cluster failover命令可以手动让集群中的某个master宕机,切换到执行cluster failover命令的这个slave节点,实现无感知的数据迁移。其流程如下:

image-20240729173017572

手动的Failover支持三种不同模式:

  • 缺省:默认的流程,如图1~6步。
  • force:省略了对offset的一致性校验。
  • takeover:直接执行第5步,忽略数据一致性、忽略master状态和其它master的意见。

让7002替换8003成为新的master。

image-20240729173547461

1
2
redis-cli -p 7002
CLUSTER FAILOVER

image-20240729173726245

10.6 RedisTmplate访问分片集群

RedisTemplate底层同样基于lettuce实现了分片集群的支持而使用的步骤与哨兵模式基本一致:

  1. 引入redis的starter的依赖
  2. 配置分片集群地址
1
2
3
4
5
6
7
8
9
10
spring:
redis:
cluster:
nodes:
- 192.168.200.100:7001
- 192.168.200.100:7002
- 192.168.200.100:7003
- 192.168.200.100:8001
- 192.168.200.100:8002
- 192.168.200.100:8003
  1. 配置读写分离
1
2
3
4
@Bean
public LettuceClientConfigurationBuilderCustomizer configurationBuilderCustomizer(){
return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}