RabbitMQ(二)
RabbitMQ(二)
一、引入
有时候我们需要保证,MQ消息至少被消费者处理一次。 如果消息发送失败,没有正确的接收到,会导致数据不一致的问题。
那么问题来了:
-
我们该如何确保MQ消息的可靠性?
-
如果真的发送失败,有没有其它的兜底方案?
发送信息丢失:
-
生产者发送信息时连接MQ服务器失败
-
生产者发送消息到达MQ后未找到Exchange
-
生产者发送消息到达Exchange后未找到queue
-
消息到达MQ后处理消息的进程发生异常
MQ导致信息丢失:
-
消息到达MQ,保存到队列后,尚未消费就突然宕机
消费者处理消息时:
-
消息接收后尚未处理宕机
-
消息接收后处理过程中抛出异常
为了解决消息丢失问题,保证MQ的可靠性,我们需要从以下3个方面
-
确保生产者一定把消息发送到MQ
-
确保MQ不会将消息弄丢
-
确保消费者一定要处理消息
二、发送者的可靠性
1、生产者重试机制
确保生产者一定把消息发送到MQ
-
解决生产者发送消息时,出现网络故障
使用重试机制,当与MQ服务器断开连接时,重新连接
1 | spring: |
2、生产者确认机制
解决消息发送到MQ后,消息在MQ丢失。 比如:
-
MQ内部处理消息的进程发生了异常
-
生产者发送消息到达MQ后未找到
Exchange -
生产者发送消息到达MQ的
Exchange后,未找到合适的Queue,因此无法路由
RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种
-
当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
-
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
-
持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
-
其它情况都会返回NACK,告知投递失败
实现生产者确认机制:
1 | spring: |
-
none:关闭confirm机制 -
simple:同步阻塞等待MQ的回执 -
correlated:MQ异步回调返回回执
定义ReturnCallback(所有异常消息):
1 |
|
定义ConfirmCallback:
发送的时候指定correlationData
1 |
|

三、MQ可靠性
消息到达MQ后,需要MQ能够保存消息,不能丢失消息
1、数据持久化
默认情况下,MQ的数据保存在内存中,重启就消失了。所以我们需要配置数据持久化
包括: 交换机持久化、队列持久化、消息持久化
2、交换机持久化
我们可以在控制台Exchange界面。修改添加交换机时的Durability参数: 设置为Durable就是持久化模式,Transient就是临时模式。
3、队列持久化
同样的,修改创建队列界面的Durability参数
4、消息持久化
在控制台发送消息的时候,就可以选择消息持久化

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
5、LazyQueue
默认情况下RabbitMQ会将接收到的消息存入内存中,但是会发生一些特殊情况导致消息积压,然后撑爆内存,触发内存预警,(Pageout)将内存消息刷到磁盘上,并且在此期间不会接收任何消息
-
消费者宕机或出现网络故障
-
消息发送量激增,超过了消费者处理速度
-
消费者处理业务发生阻塞
所以我们设计了惰性队列:接收到的消息直接存入磁盘,消费者消费消息的时候才从磁盘中读取消息、支持百万条的消息存储。
RabbitMQ 3.12版本后,默认队列格式就是LazyQueue
-
控制台配置Lazy模式
![5]()
-
代码声明Lazy模式
1
2
3
4
5
6
7
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}通过注解声明
1
2
3
4
5
6
7
8
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
} -
更新已有队列为lazy模式
基于命令行
1
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
-
rabbitmqctl:RabbitMQ的命令行工具 -
set_policy:添加一个策略 -
Lazy:策略名称,可以自定义 -
"^lazy-queue$":用正则表达式匹配队列的名字 -
'{"queue-mode":"lazy"}':设置队列模式为lazy模式 -
--apply-to queues:策略的作用对象,是所有的队列
也可以控制台配置
![6]()
-
发送消息代码例子:
1 |
|
四、消费者可靠性
消息投递给消费者不一定就被正确消费了,有可能发生以下情况
-
消息投递过程中网络断了
-
消费者收到消息后死机
-
消费者收到消息后,处理不当导致异常
1、消费者确认机制
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
-
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用 -
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活 -
auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回
nack; - 如果是消息处理或校验异常,自动返回
reject;
- 如果是业务异常,会自动返回
修改确认机制为auto
1 | spring: |
发送后的消息未接收到结果时,处于的状态是unacked(未确定状态)
若是消息处理异常,返回reject后会删除队列中的信息。 若是业务异常,则会保留(变为ready状态等待下次投递)
2、消费者重试机制
若消费出现异常(业务异常),则会不断重新入队,不断投递。浪费MQ资源。
我们可以设置消费消息出现异常的时候,在本地重试
1 | spring: |
3、失败处理策略
当消费者重试后处理消息依然出现异常,消息被丢掉。没法保证消息可靠性了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略。由MessageRecovery接口来定义
| 接口 | 描述 |
|---|---|
| RejectAndDontRequeueRecoverer | 重试耗尽后,直接reject,丢弃消息。 |
| ImmediateRequeueMessageRecoverer | 重试耗尽后,返回nack,消息重新入队 |
| RepublishMessageRecoverer | 重试耗尽后,将失败消息投递到指定的交换机 |
-
使用RepublishMessageRecoverer
定义一个专门存放错误消息的交换机与队列:
1 |
|
创建RepublishMessageRecoverer。关联交换机和对应的RoutingKey
1 |
|
这样。当我们的消息经过重试机制仍然处理失败后,消息将被发往error.direct交换机。最终被发向error.queue队列
4、业务幂等性
f(x) = f(f(x)) 我们要求处理一些业务,处理一次和处理多次所得到的效果是一样的。
通常这些操作天生幂等: 根据id删除数据、查询数据、新增数据
但是关于数据更新的操作不一定幂等: 取消订单(会导致库存多次增加),退款业务(多退钱)
实际业务中会出现: 表单重复提交、服务间调用的重试、MQ的重复投递…
为了保证消息处理的幂等性:
-
唯一消息id : 执行消息之前,查看此消息有没有被处理过(应该是需要自己处理)
1 |
|
-
业务状态判断:
主要是在代码中利用一些特征处理。比如说取消订单首先这个订单要没被取消。然后即可实现幂等性。
5、兜底方案
其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

通常使用定时任务。实现主动查询
五、延迟消息
1、死信交换机
消息满足下列情况,变成死信(Dead Letter)
-
消费者使用
basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false -
消息是一个过期消息,超时无人消费
-
要投递的队列消息满了,无法投递
消息成为死信,并且队列通过**dead-letter-exchange属性指定了一个交换机。那么队列中的死信会投递在这个死信交换机中。 进而投递到与死信交换机绑定的队列**中去。
死信交换机有什么作用呢?
-
收集那些因处理失败而被拒绝的消息
-
收集那些因队列满了而被拒绝的消息
-
收集因TTL(有效期)到期的消息
2、延迟消息
通过设置一个无消费者消费的队列。 给这个队列指定一个死信交换机。同时将另一个有消费者消费的队列绑定这个死信交换机。
然后给无消费者消费的队列投递一则有过期时间的消息。当时间过期后,就会投递到死信交换机,进而到有消费者消费的队列。
实现了延迟消息

注意:
RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。
1、声明队列与绑定关系
1 |
|
2、发送消息
1 |
|
3、声明消费者,指定关系
1 |
|
3、DelayExchange插件
死信交换机实现延迟消息太麻烦了。RabbitMQ社区实现了一个延迟消息插件来产生相同的结果。
rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com)
1、安装
下载后查找docker中RabbitMQ插件目录。一般指定为数据卷
1 | docker volume inspect mq-plugins |
执行结果:
1 | [ |
我们上传下载的插件到**/var/lib/docker/volumes/mq-plugins/_data**这个目录。
安装插件
1 | docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange |
2、使用
-
注解声明:delayed = “true”
1
2
3
4
5
6
7
8
public void listenDelayQueue(String msg){
log.info("消费者1监听到delay.queue的消息:"+msg);
} -
声明Bean方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class DelayExchangeConfig {
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
public Queue delayedQueue(){
return new Queue("delay.queue");
}
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
3、发送消息
1 |
|
消息过多。计时会消耗过多CPU资源。所以不建议设置延迟消息时间过长。
4、消费者接受消息
1 |
|
4、具体例子: 超时订单问题

假如订单超时支付时间为30分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。
不提供具体代码实现。

