一、MQ的概述

1.1 同步调用存在的问题

微服务间基于Feign的调用就是同步方式,存在一些问题。

image-20230817232431846

image-20230817232839067

1.2 异步调用的优缺点

异步调用最常见的就是事件驱动模式

image-20230817233225717

优势:

  • 服务解耦
  • 性能提升,吞吐量提升
  • 故障隔离
  • 流量削峰

缺点:

  • 一来与Broker的可靠性、安全性,吞吐能力
  • 架构复杂了,业务没有明显的流程线,不好追踪管理。

1.3 MQ的常见架构

MQ,中文是消息队列,字母意思就是存放消息的队列。也就是事假驱动中的Broker。

image-20230818154250443

二、部署RabbitMQ

2.1 RabbitMQ的概述

RabbitMq是基于Erlang语音开发的开源消息中间件。

image-20230818161559246

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

上述这些东西都可以在RabbitMQ的管理控制台来管理。

2.2 单机部署

2.2.1 下载镜像

方式一:在线拉取

1
docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:

image-20210423191210349

上传到虚拟机中后,使用命令加载镜像即可:

1
docker load -i mq.tar

2.2.2 安装MQ

执行下面的命令来运行MQ容器:

1
2
3
4
5
6
7
8
9
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

三、数据隔离

不用的虚拟主机之间相互隔离。

需求:在RabbitMQ的控制台完成下列操作:

  • 新建一个用户hmall
  • 为hmall用户创建一个virtual host
  • 测试不同virtual host之间的数据隔离现象

image-20250613215331588

image-20250613215312868

四、常见的消息模型

MQ官方中给了5个MQDemo的基本模型,对应了几种不同的做法

  • 基本消息队列(BasicQueue)
  • 工作消息队列(WorkQueue)
image-20230818162427583
  • 发布订阅
    • Fanout Exchange:广播
    • Direct Exchange:路由
    • Topic Exchange:主题

image-20230818162458977

4.1 基本消息队列

官方的Helloworld是最基础的消息队列模型来实现,只包括三个角色。

publisher:消息发布者,将消息发送到队列queue。

queue:消息队列,负责接受并缓存消息。

consumer:订阅队列,处理消息队列中的消息。

image-20230818162647124

基本消息队列发送消息的流程:

  1. 建立connection
  2. 创建channel通道
  3. 利用channel声明队列
  4. 利用channel发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.16.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();

// 2.创建通道Channel
Channel channel = connection.createChannel();

// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);

// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");

// 5.关闭通道和连接
channel.close();
connection.close();

}

基本消息队列的信息接收流程:

  1. 建立connection
  2. 创建channel通道
  3. 利用channel声明队列
  4. 定义consumer消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.16.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();

// 2.创建通道Channel
Channel channel = connection.createChannel();

// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);

// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}

4.2 SpringAMQP

  • AMQP:高级消息队列协议,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言平台无关,更符合微服务中独立性的要求。
  • SpringAMQP:是基于AMQP协议定义中的一套API规范,提供了模板来发送和接收消息。分为两部分,其中spring-ampq是基础抽象,spring-rabbit是底层的默认实现。

使用SpirngAMQP实现Helloworld中的基础消息队列功能

  1. 在父工程中引入spring-amqp的依赖
1
2
3
4
5
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.16.128 # rabbitMq的Ip地址
port: 5672 # 端口,5672是发送消息的,15672是控制台(页面)
username: itcast
password: 123321
virtual-host: / # 虚拟主机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSend2(){
String queueName = "simple.queue";
String message = "hello,spring amqp";
rabbitTemplate.convertAndSend(queueName,message);
}
}

  1. 在consumer服务中编写消费逻辑,绑定simple.queue这个队列
1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.16.128 # rabbitMq的Ip地址
port: 5672
username: itcast
password: 123321
virtual-host: / # 虚拟主机
1
2
3
4
5
6
7
8
@Component
public class SpringRabbitListener {

@RabbitListener(queues = "simple.queue")
public void listenerSimpleQueue(String msg){
System.out.println("消费者接收到simple.queue的消息:"+msg);
}
}

4.3 工作消息队列

image-20230818202200774


模拟WorkQueue,实现一个队列绑定多个消费者

消息提供者

1
2
3
4
5
6
7
8
9
@Test
public void testSend2WorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello,spring amqp_";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName,message+i);
Thread.sleep(20);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class SpringRabbitListener {

@RabbitListener(queues = "simple.queue")
public void listenerWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到simple.queue的消息:"+msg);
Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenerWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到simple.queue的消息:"+msg);
Thread.sleep(200);
}
}

发现提供者提供的50条消息会平均分配,消息预取

修改application.xml文件,设置preFetch这个值,可以控制预取消息的上限。

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: 192.168.16.128 # rabbitMq的Ip地址
port: 5672
username: itcast
password: 123321
virtual-host: / # 虚拟主机
listener:
simple:
prefetch: 1 # 每次只获取一条信息;处理完才能获取下一个

4.4 发布订阅模型

发布订阅与之前案例的区别就是允许将同一消息发送给多个消费者。实现的方式是加入了exchange(交换机)。

常见的exchange有:

  • Fanout:广播
  • Diret:路由
  • Topic:话题

注意:exchange只负责消息路由(消息转发),而不是存储,路由失败则消息丢失

3.4.1 FanoutExchange

FanoutExchange会将接收到的消息路由(广播)到每一个跟其绑定的queue。

image-20230818211616153

image-20230818211653909

  1. 在consumer中添加一个类,添加@Configuration注解,并声明FanoutExchange,Queue和绑定对象Binding。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
public class FanoutConfig {

// 声明队列1
@Bean
public Queue fanoutQueue1(){
return new Queue("itcast.queue1");
}

// 声明队列2
@Bean
public Queue fanoutQueue2(){
return new Queue("itcast.queue2");
}

// 声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}

// 绑定交换机和队列1
@Bean
public Binding binding1(FanoutExchange fanoutExchange,Queue fanoutQueue1){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

// 绑定交换机和队列2
@Bean
public Binding binding2(FanoutExchange fanoutExchange,Queue fanoutQueue2){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
  1. 在consumer服务中的SpringRabbitListener类中,声明两个方法,分别监听queue1和queue2。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
@Slf4j
public class SpringRabbitListener {

@RabbitListener(queues = "itcast.queue1")
public void listenerFanoutQueue1(String msg){
System.out.println("消费者接收到FanoutQueue的消息:"+msg);
}

@RabbitListener(queues = "itcast.queue2")
public void listenerFanoutQueue2(String msg){
System.out.println("消费者接收到FanoutQueue的消息:"+msg);
}
}

3.

1
2
3
4
5
6
7
8
9
@Test
public void testFanoutQueue(){
// 交换机名称
String exechangeName = "itcast.fanout";
// 消息
String message = "hello, every one";
// 发送消息,交换机,Rotingkey(暂时为空)、消息
rabbitTemplate.convertAndSend(exechangeName,"",message);
}

4.4.2 DirectExchange

DirectExchange会将接收到的信息根据规则路由到指定的Queue,因此成为路由模式(routes)

  • 每一个Queue都与Exchange设置一个Bindingkey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。

image-20230818221412383

image-20230818221511085

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue1"),
exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"blue","red"}
))
public void listenerDirectQueue1(String msg){
System.out.println("消费者接收到DirectQueue的消息:"+msg);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue2"),
exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"yellow","red"}
))
public void listenerDirectQueue2(String msg){
System.out.println("消费者接收到DirectQueue的消息:"+msg);
}
}
1
2
3
4
5
6
7
8
@Test
public void testDirectQueue(){
// 交换机名称
String exechangeName = "itcast.direct";
// 消息
String message = "hello yellow";
rabbitTemplate.convertAndSend(exechangeName,"yellow",message);
}

4.4.3 ToicExchange

ToicExchange与DirectExchange类似 ,区别在于routingKey必须是多个单词的列表,并以.分割。

Queue与Exchange指定BindingKey时可以使用通配符

  • #:代指0个或多个单词。
  • *:代指一个单词。

image-20230818224059838

image-20230818224218405

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue1"),
exchange = @Exchange(value = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenerTopicQueue1(String msg){
System.out.println("消费者接收到TopicQueue1的消息:"+msg);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue2"),
exchange = @Exchange(value = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenerTopicQueue2(String msg){
System.out.println("消费者接收到TopicQueue2的消息:"+msg);
}
}
1
2
3
4
5
6
7
8
@Test
public void testTopicQueue(){
// 交换机名称
String exechangeName = "itcast.topic";
// 消息
String message = "hello china.news";
rabbitTemplate.convertAndSend(exechangeName,"china.news",message);
}

4.4 声明队列和交换机

队列和交换机的声明一般在消费者那边进行,生产者不关心队列,往交换机发就行。

image-20240718171832938

方式一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
public class FanoutConfig {

// 声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}

// 声明队列1
@Bean
public Queue fanoutQueue1(){
return new Queue("itcast.queue1");
}

// 绑定交换机和队列1
@Bean
public Binding binding1(FanoutExchange fanoutExchange,Queue fanoutQueue1){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

// 声明队列2
@Bean
public Queue fanoutQueue2(){
return new Queue("itcast.queue2");
}

// 绑定交换机和队列2
@Bean
public Binding binding2(FanoutExchange fanoutExchange,Queue fanoutQueue2){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}

方式二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class SpringRabbitListener {
// 声明队列绑定交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue1"),
exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"blue","red"}
))
public void listenerDirectQueue1(String msg){
System.out.println("消费者接收到DirectQueue的消息:"+msg);
}


@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue2"),
exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"yellow","red"}
))
public void listenerDirectQueue2(String msg){
System.out.println("消费者接收到DirectQueue的消息:"+msg);
}
}

4.5 消息转换器

如果生产者发送的是一个对象,那会转成字节进行发送,如果想要变成json,需要配置消息转换器。

在提供者和消费者都替换掉消息转换器。

1
2
3
4
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
1
2
3
4
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}

五、消息可靠性

image-20230905223709753

消息从生产者发送到exchange,在到queue,在到消费者,有那些导致消息丢失的可能性?

  • 发送时丢失
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

image-20230906161711908

5.1 发送者的消息可靠性

5.1.1发送者重连

有的时候由于网络波动,可能会出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次的等待时长=initial-interval * multiplier
max-attempts: 3 #最大重试次数

注意:

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

5.1.2 发送者确认

RabbitMQ提供了publisher confirmpublisher return两种确认机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功。
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

结果有两种请求:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败的原因。

确认机制发送消息时,需要给每一个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。

image-20230906162617739

  1. 在publisher这个微服务的application.yaml添加配置。
1
2
3
4
5
6
springspring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
  • publisher-confirm-type:
    • none:关闭confirm机制。
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义confirmCallback,MQ返回结果时会调用ConfirmCallback
  • publisher-returns:开启publish-return功能,同样基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true则调用ReturnCallback;false则直接丢弃消息。

  1. 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 方式一
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

// 消息发送到了交换机,但是没路由到队列
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取rabbitTemplate对象
RabbitTemplate rabbitTemp late = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 记录日志
log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
replyCode,replyText,exchange,routingKey,message.toString());
// 如果有需要的话,重发消息
});

}
}

// 方式二
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqConfig {

private final RabbitTemplate rabbitTemplate;

/**
* 监听消息发送失败
*/
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(returned -> {
log.error("监听到了消息return callback");
log.error("交换机:{}",returned.getExchange());
log.error("routingKey:{}",returned.getRoutingKey());
log.error("message:{}",returned.getMessage());
log.error("replyCode:{}",returned.getReplyCode());
log.error("replyText:{}",returned.getReplyText());
});
}
}
  1. 发送消息,指定消息id,消息ConfirmCallback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;

// 消息没发送到交换机
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.准备消息
String message = "hello, spring amqp!";
// 2.
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(confirm -> {
if(confirm.isAck()){
// ACK
log.info("消息成功投递到交换机");
}else{
// NACK
log.error("消息投递到交换机失败!消息id:{}",correlationData.getId());
// 重发
}
}, throwable -> {
// 消息发送失败
log.error("消息发送失败!",throwable);
// 重发消息
});
// 3.发送消息
rabbitTemplate.convertAndSend("amq.topic", "simple.test", message,correlationData);
}
}

消息正常发送,返回ack。

发送到交换机,但是路由失败,返回ack,然后执行ReturnCallback的回调。

消息没发送到交换机,或消息发送到了队列但是没有被持久化,返回nack。

5.2 MQ的可靠性

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失。

  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞。

5.2.1 消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

  1. 交换机持久化,默认是持久化的。

image-20230906204445350

  1. 队列持久化,默认是持久化的。

image-20230906204545125

  1. 消息持久化,SpringAMQP中的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定。

对于消息来说,不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。

持久化的消息会同时写入磁盘和内存(加快读取速度),非持久化消息会在内存不够用时,将消息写入磁盘(一般重启之后就没有了)

image-20230906205855585

5.2.2 Lazy Queue

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘,不再存储到内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)

在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

image-20240719233948980

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 方式一
@Bean
public Queue lazyQueue(){
return new QueueBuilder.durable("lazy.queue").lazy().build();
}
// 方式二
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name="x-queue-mode",value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("消费者收到lazy.queue的消息:{}",msg);
}

5.3 消费者的可靠性

5.3.1 消费者消息确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。

image-20240719235844399

image-20240719235910010

而SpringAMQP则允许配置三种确认模式:

  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

  • manual:手动ack,需要在业务代码结束后,调用api发送ack

  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack。推荐

    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,返回reject

image-20230906222120546

5.3.2 失败重试机制

image-20230906222631340

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

image-20230906222834699

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要MessageRecoverer接口来处理,他包含三种实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

image-20230906224015475

image-20230906224244024

如何保证RabbitMQ的消息可靠性。

image-20230906224626462

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的: f(x)=f(f(x))。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。

image-20240720112217131

方案一:是给每个消息都设置一个唯一id,利用id区分是否是重复消息:(不推荐)

image-20240720112405552

1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public MessageConverter jacksonMessageConverter(){
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
// 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否重复消费。
converter.setCreateMessageIds(true);
return converter;
}

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(Message message){
log.info("消费者收到simple.queue的消息:{}",new String(message.getBody()));
log.info("消费者收到simple.queue的消息ID:{}",message.getMessageProperties().getMessageId());
}

image-20240720114515686

六、延迟消息

延迟消息∶发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。

延迟任务:设置在一定时间之后才执行任务。

6.1 死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter) :

image-20230906230825601

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)

image-20230906231045305

6.2 TTL

TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:

  • 消息所在队列设置了存活时间。
  • 消息本身设置了存活时间。

image-20230906231833704

声明一组死信交换机额队列,基于注解方式。

image-20230906232033184

要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

image-20230906232200810

发送消息时,给消息本身设置超时时间。

image-20230906233318321

1
2
3
4
5
6
7
8
@Test
void testSendDelayMessage(){
rabbitTemplate.convertAndSend("normal,.direct","hi","hello",message -> {
// 设置消息的过期时间
message.getMessageProperties().setExpiration("10000");
return message;
});
}

如果队列和消息都设置了超时时间,则以较短的为准。

6.3 延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为**延迟队列(DelayQueue)**模式。
延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

RabbitMQ部署指南

SpringAMQP使用延迟队列插件

这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

image-20240720130748889

基于逐渐的方式

image-20230906235622076

基于Bean的方式

image-20230906235807833

发消息:

image-20230906235907171

1
2
3
4
5
6
7
8
@Test
void testSendDelayMessage(){
rabbitTemplate.convertAndSend("normal,.direct","hi","hello",message -> {
// 设置消息的过期时间
message.getMessageProperties().setDleay(10000);
return message;
});
}

image-20230907000434503

七、惰性队列

消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

解决消息堆积有三种种思路:

  • 增加更多消费者,提高消费速度。
  • 在消费者内开启线程池加快消息处理速度。
  • 扩大队列容积,提高堆积上限。

惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念也就是惰性队列

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存。
  • 消费者要消费消息时才会从磁盘中读取并加载到内存。
  • 支持数百万条的消息存储。

image-20230907001402502

image-20230907001419780