一、MQ的概述
1.1 同步调用存在的问题
微服务间基于Feign的调用就是同步方式,存在一些问题。


1.2 异步调用的优缺点
异步调用最常见的就是事件驱动模式

优势:
- 服务解耦
- 性能提升,吞吐量提升
- 故障隔离
- 流量削峰
缺点:
- 一来与Broker的可靠性、安全性,吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理。
1.3 MQ的常见架构
MQ,中文是消息队列,字母意思就是存放消息的队列。也就是事假驱动中的Broker。

二、部署RabbitMQ
2.1 RabbitMQ的概述
RabbitMq是基于Erlang语音开发的开源消息中间件。

publisher
:生产者,也就是发送消息的一方
consumer
:消费者,也就是消费消息的一方
queue
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
exchange
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
virtual host
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
上述这些东西都可以在RabbitMQ的管理控制台来管理。
2.2 单机部署
2.2.1 下载镜像
方式一:在线拉取
1
| docker pull rabbitmq:3-management
|
方式二:从本地加载
在课前资料已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:
2.2.2 安装MQ
执行下面的命令来运行MQ容器:
1 2 3 4 5 6 7 8 9
| docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
|
三、数据隔离
不用的虚拟主机之间相互隔离。
需求:在RabbitMQ的控制台完成下列操作:
- 新建一个用户hmall
- 为hmall用户创建一个virtual host
- 测试不同virtual host之间的数据隔离现象


四、常见的消息模型
MQ官方中给了5个MQDemo的基本模型,对应了几种不同的做法
- 基本消息队列(BasicQueue)
- 工作消息队列(WorkQueue)
- 发布订阅
- Fanout Exchange:广播
- Direct Exchange:路由
- Topic Exchange:主题

4.1 基本消息队列
官方的Helloworld是最基础的消息队列模型来实现,只包括三个角色。
publisher:消息发布者,将消息发送到队列queue。
queue:消息队列,负责接受并缓存消息。
consumer:订阅队列,处理消息队列中的消息。

基本消息队列发送消息的流程:
- 建立connection
- 创建channel通道
- 利用channel声明队列
- 利用channel发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Test public void testSendMessage() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.16.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null);
String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】");
channel.close(); connection.close();
}
|
基本消息队列的信息接收流程:
- 建立connection
- 创建channel通道
- 利用channel声明队列
- 定义consumer消费行为handleDelivery()
- 利用channel将消费者与队列绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.16.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null);
channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); }
|
4.2 SpringAMQP
- AMQP:高级消息队列协议,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言平台无关,更符合微服务中独立性的要求。
- SpringAMQP:是基于AMQP协议定义中的一套API规范,提供了模板来发送和接收消息。分为两部分,其中spring-ampq是基础抽象,spring-rabbit是底层的默认实现。
使用SpirngAMQP实现Helloworld中的基础消息队列功能
- 在父工程中引入spring-amqp的依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
- 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.16.128 port: 5672 username: itcast password: 123321 virtual-host: /
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testSend2(){ String queueName = "simple.queue"; String message = "hello,spring amqp"; rabbitTemplate.convertAndSend(queueName,message); } }
|
- 在consumer服务中编写消费逻辑,绑定simple.queue这个队列
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.16.128 port: 5672 username: itcast password: 123321 virtual-host: /
|
1 2 3 4 5 6 7 8
| @Component public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue") public void listenerSimpleQueue(String msg){ System.out.println("消费者接收到simple.queue的消息:"+msg); } }
|
4.3 工作消息队列

模拟WorkQueue,实现一个队列绑定多个消费者
消息提供者
1 2 3 4 5 6 7 8 9
| @Test public void testSend2WorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello,spring amqp_"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName,message+i); Thread.sleep(20); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue") public void listenerWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到simple.queue的消息:"+msg); Thread.sleep(20); }
@RabbitListener(queues = "simple.queue") public void listenerWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到simple.queue的消息:"+msg); Thread.sleep(200); } }
|
发现提供者提供的50条消息会平均分配,消息预取
修改application.xml文件,设置preFetch这个值,可以控制预取消息的上限。
1 2 3 4 5 6 7 8 9 10
| spring: rabbitmq: host: 192.168.16.128 port: 5672 username: itcast password: 123321 virtual-host: / listener: simple: prefetch: 1
|
4.4 发布订阅模型
发布订阅与之前案例的区别就是允许将同一消息发送给多个消费者。实现的方式是加入了exchange
(交换机)。
常见的exchange有:
- Fanout:广播
- Diret:路由
- Topic:话题
注意:exchange只负责消息路由(消息转发),而不是存储,路由失败则消息丢失
3.4.1 FanoutExchange
FanoutExchange会将接收到的消息路由(广播)到每一个
跟其绑定的queue。


- 在consumer中添加一个类,添加@Configuration注解,并声明FanoutExchange,Queue和绑定对象Binding。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Configuration public class FanoutConfig {
@Bean public Queue fanoutQueue1(){ return new Queue("itcast.queue1"); } @Bean public Queue fanoutQueue2(){ return new Queue("itcast.queue2"); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); }
@Bean public Binding binding1(FanoutExchange fanoutExchange,Queue fanoutQueue1){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); }
@Bean public Binding binding2(FanoutExchange fanoutExchange,Queue fanoutQueue2){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
|
- 在consumer服务中的SpringRabbitListener类中,声明两个方法,分别监听queue1和queue2。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component @Slf4j public class SpringRabbitListener {
@RabbitListener(queues = "itcast.queue1") public void listenerFanoutQueue1(String msg){ System.out.println("消费者接收到FanoutQueue的消息:"+msg); }
@RabbitListener(queues = "itcast.queue2") public void listenerFanoutQueue2(String msg){ System.out.println("消费者接收到FanoutQueue的消息:"+msg); } }
|
3.
1 2 3 4 5 6 7 8 9
| @Test public void testFanoutQueue(){ String exechangeName = "itcast.fanout"; String message = "hello, every one"; rabbitTemplate.convertAndSend(exechangeName,"",message); }
|
4.4.2 DirectExchange
DirectExchange会将接收到的信息根据规则路由到指定的Queue,因此成为路由模式(routes)
- 每一个Queue都与Exchange设置一个Bindingkey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Component public class SpringRabbitListener { @RabbitListener(bindings = @QueueBinding( value = @Queue("direct.queue1"), exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"blue","red"} )) public void listenerDirectQueue1(String msg){ System.out.println("消费者接收到DirectQueue的消息:"+msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue("direct.queue2"), exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"yellow","red"} )) public void listenerDirectQueue2(String msg){ System.out.println("消费者接收到DirectQueue的消息:"+msg); } }
|
1 2 3 4 5 6 7 8
| @Test public void testDirectQueue(){ String exechangeName = "itcast.direct"; String message = "hello yellow"; rabbitTemplate.convertAndSend(exechangeName,"yellow",message); }
|
4.4.3 ToicExchange
ToicExchange与DirectExchange类似 ,区别在于routingKey必须是多个单词的列表,并以.
分割。
Queue与Exchange指定BindingKey时可以使用通配符


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Component public class SpringRabbitListener { @RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue1"), exchange = @Exchange(value = "itcast.topic",type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenerTopicQueue1(String msg){ System.out.println("消费者接收到TopicQueue1的消息:"+msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue("topic.queue2"), exchange = @Exchange(value = "itcast.topic",type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenerTopicQueue2(String msg){ System.out.println("消费者接收到TopicQueue2的消息:"+msg); } }
|
1 2 3 4 5 6 7 8
| @Test public void testTopicQueue(){ String exechangeName = "itcast.topic"; String message = "hello china.news"; rabbitTemplate.convertAndSend(exechangeName,"china.news",message); }
|
4.4 声明队列和交换机
队列和交换机的声明一般在消费者那边进行,生产者不关心队列,往交换机发就行。

方式一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Configuration public class FanoutConfig {
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); }
@Bean public Queue fanoutQueue1(){ return new Queue("itcast.queue1"); }
@Bean public Binding binding1(FanoutExchange fanoutExchange,Queue fanoutQueue1){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Queue fanoutQueue2(){ return new Queue("itcast.queue2"); }
@Bean public Binding binding2(FanoutExchange fanoutExchange,Queue fanoutQueue2){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
|
方式二:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component public class SpringRabbitListener { @RabbitListener(bindings = @QueueBinding( value = @Queue("direct.queue1"), exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"blue","red"} )) public void listenerDirectQueue1(String msg){ System.out.println("消费者接收到DirectQueue的消息:"+msg); }
@RabbitListener(bindings = @QueueBinding( value = @Queue("direct.queue2"), exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT), key = {"yellow","red"} )) public void listenerDirectQueue2(String msg){ System.out.println("消费者接收到DirectQueue的消息:"+msg); } }
|
4.5 消息转换器
如果生产者发送的是一个对象,那会转成字节进行发送,如果想要变成json,需要配置消息转换器。
在提供者和消费者都替换掉消息转换器。
1 2 3 4
| <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
|
1 2 3 4
| @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
|
五、消息可靠性

消息从生产者发送到exchange,在到queue,在到消费者,有那些导致消息丢失的可能性?
- 发送时丢失
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机

5.1 发送者的消息可靠性
5.1.1发送者重连
有的时候由于网络波动,可能会出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
1 2 3 4 5 6 7 8 9
| spring: rabbitmq: connection-timeout: 1s template: retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3
|
注意:
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
5.1.2 发送者确认
RabbitMQ提供了publisher confirm
和publisher return
两种确认机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
- 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功。
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
- 其它情况都会返回NACK,告知投递失败
结果有两种请求:
publisher-confirm
,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
publisher-return
,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败的原因。
确认机制发送消息时,需要给每一个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。

- 在publisher这个微服务的application.yaml添加配置。
1 2 3 4 5 6
| springspring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
|
- publisher-confirm-type:
- none:关闭confirm机制。
- simple:同步等待confirm结果,直到超时
- correlated:异步回调,定义confirmCallback,MQ返回结果时会调用ConfirmCallback
- publisher-returns:开启publish-return功能,同样基于callback机制,不过是定义ReturnCallback
- template.mandatory:定义消息路由失败时的策略。true则调用ReturnCallback;false则直接丢弃消息。
- 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @Slf4j @Configuration public class CommonConfig implements ApplicationContextAware {
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemp late = applicationContext.getBean(RabbitTemplate.class); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}", replyCode,replyText,exchange,routingKey,message.toString()); });
} }
@Slf4j @Configuration @RequiredArgsConstructor public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(returned -> { log.error("监听到了消息return callback"); log.error("交换机:{}",returned.getExchange()); log.error("routingKey:{}",returned.getRoutingKey()); log.error("message:{}",returned.getMessage()); log.error("replyCode:{}",returned.getReplyCode()); log.error("replyText:{}",returned.getReplyText()); }); } }
|
- 发送消息,指定消息id,消息ConfirmCallback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Slf4j @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testSendMessage2SimpleQueue() throws InterruptedException { String message = "hello, spring amqp!"; CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(confirm -> { if(confirm.isAck()){ log.info("消息成功投递到交换机"); }else{ log.error("消息投递到交换机失败!消息id:{}",correlationData.getId()); } }, throwable -> { log.error("消息发送失败!",throwable); }); rabbitTemplate.convertAndSend("amq.topic", "simple.test", message,correlationData); } }
|
消息正常发送,返回ack。
发送到交换机,但是路由失败,返回ack,然后执行ReturnCallback的回调。
消息没发送到交换机,或消息发送到了队列但是没有被持久化,返回nack。
5.2 MQ的可靠性
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
5.2.1 消息持久化
MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。
- 交换机持久化,默认是持久化的。

- 队列持久化,默认是持久化的。

- 消息持久化,SpringAMQP中的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定。
对于消息来说,不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。
持久化的消息会同时写入磁盘和内存(加快读取速度),非持久化消息会在内存不够用时,将消息写入磁盘(一般重启之后就没有了)

5.2.2 Lazy Queue
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。
惰性队列的特征如下:
- 接收到消息后直接存入磁盘,不再存储到内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Bean public Queue lazyQueue(){ return new QueueBuilder.durable("lazy.queue").lazy().build(); }
@RabbitListener(queuesToDeclare = @Queue( name = "lazy.queue", durable = "true", arguments = @Argument(name="x-queue-mode",value = "lazy") )) public void listenLazyQueue(String msg){ log.info("消费者收到lazy.queue的消息:{}",msg); }
|
5.3 消费者的可靠性
5.3.1 消费者消息确认
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。


而SpringAMQP则允许配置三种确认模式:
none
:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
manual
:手动ack,需要在业务代码结束后,调用api发送ack
auto
:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack
;抛出异常则返回nack。推荐
- 如果是业务异常,会自动返回nack
- 如果是消息处理或校验异常,返回reject

5.3.2 失败重试机制

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

消费者失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要MessageRecoverer
接口来处理,他包含三种实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。


如何保证RabbitMQ的消息可靠性。

业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的: f(x)=f(f(x))。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。

方案一:是给每个消息都设置一个唯一id,利用id区分是否是重复消息:(不推荐)

1 2 3 4 5 6 7 8 9 10 11 12 13
| @Bean public MessageConverter jacksonMessageConverter(){ Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); converter.setCreateMessageIds(true); return converter; }
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(Message message){ log.info("消费者收到simple.queue的消息:{}",new String(message.getBody())); log.info("消费者收到simple.queue的消息ID:{}",message.getMessageProperties().getMessageId()); }
|

六、延迟消息
延迟消息∶发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行任务。
6.1 死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter) :

如果该队列配置了dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)

6.2 TTL
TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:
- 消息所在队列设置了存活时间。
- 消息本身设置了存活时间。

声明一组死信交换机额队列,基于注解方式。

要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

发送消息时,给消息本身设置超时时间。

1 2 3 4 5 6 7 8
| @Test void testSendDelayMessage(){ rabbitTemplate.convertAndSend("normal,.direct","hi","hello",message -> { message.getMessageProperties().setExpiration("10000"); return message; }); }
|
如果队列和消息都设置了超时时间,则以较短的为准。
6.3 延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为**延迟队列(DelayQueue)**模式。
延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
RabbitMQ部署指南
SpringAMQP使用延迟队列插件
这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

基于逐渐的方式

基于Bean的方式

发消息:

1 2 3 4 5 6 7 8
| @Test void testSendDelayMessage(){ rabbitTemplate.convertAndSend("normal,.direct","hi","hello",message -> { message.getMessageProperties().setDleay(10000); return message; }); }
|

七、惰性队列
消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
解决消息堆积有三种种思路:
- 增加更多消费者,提高消费速度。
- 在消费者内开启线程池加快消息处理速度。
- 扩大队列容积,提高堆积上限。
惰性队列
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念也就是惰性队列。
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存。
- 消费者要消费消息时才会从磁盘中读取并加载到内存。
- 支持数百万条的消息存储。

