RabbitMQ(二)

RabbitMQ(二)

一、引入

有时候我们需要保证,MQ消息至少被消费者处理一次。 如果消息发送失败,没有正确的接收到,会导致数据不一致的问题。

那么问题来了:

  • 我们该如何确保MQ消息的可靠性

  • 如果真的发送失败,有没有其它的兜底方案?

发送信息丢失:

  1. 生产者发送信息时连接MQ服务器失败

  2. 生产者发送消息到达MQ后未找到Exchange

  3. 生产者发送消息到达Exchange后未找到queue

  4. 消息到达MQ后处理消息的进程发生异常

MQ导致信息丢失:

  1. 消息到达MQ,保存到队列后,尚未消费就突然宕机

消费者处理消息时:

  1. 消息接收后尚未处理宕机

  2. 消息接收后处理过程中抛出异常

为了解决消息丢失问题,保证MQ的可靠性,我们需要从以下3个方面

  • 确保生产者一定把消息发送到MQ

  • 确保MQ不会将消息弄丢

  • 确保消费者一定要处理消息

二、发送者的可靠性

1、生产者重试机制

确保生产者一定把消息发送到MQ

  1. 解决生产者发送消息时,出现网络故障

使用重试机制,当与MQ服务器断开连接时,重新连接

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数

2、生产者确认机制

解决消息发送到MQ后,消息在MQ丢失。 比如:

  • MQ内部处理消息的进程发生了异常

  • 生产者发送消息到达MQ后未找到Exchange

  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

实现生产者确认机制

1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
  • none:关闭confirm机制

  • simple:同步阻塞等待MQ的回执

  • correlated:MQ异步回调返回回执

定义ReturnCallback(所有异常消息):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqConfig {
private final RabbitTemplate rabbitTemplate;

@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(returned->{
log.error("监听到了消息return callback");
log.debug("exchange:{}",returned.getExchange());
log.debug("routingKey:{}",returned.getRoutingKey());
log.debug("message:{}",returned.getMessage());
log.debug("replyCode:{}",returned.getReplyCode());
log.debug("replyText:{}",returned.getReplyText());

});
}
}

定义ConfirmCallback

发送的时候指定correlationData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Test
public void testComfirmCallback() throws InterruptedException {

//0、创建correlationData,指定一个唯一的id
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
//添加回调函数
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.error("spring amqp 处理确认结果异常",ex);
}

@Override
public void onSuccess(CorrelationData.Confirm result) {
if(result.isAck()){
log.debug("收到ConfirmCallback ack,消息发送成功!");
}else{
log.debug("收到ConfirmCallback nack,消息发送失败!reason:{}", result.getReason());

}
}
});

String exchangeName = "hmall.direct";
String message = "蓝色:通知,女尸是充气的!";
//2、发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue",message,cd);

Thread.sleep(2000);
}

3

三、MQ可靠性

消息到达MQ后,需要MQ能够保存消息,不能丢失消息

1、数据持久化

默认情况下,MQ的数据保存在内存中,重启就消失了。所以我们需要配置数据持久化

包括: 交换机持久化、队列持久化、消息持久化

2、交换机持久化

我们可以在控制台Exchange界面。修改添加交换机时的Durability参数: 设置为Durable就是持久化模式,Transient就是临时模式。

3、队列持久化

同样的,修改创建队列界面的Durability参数

4、消息持久化

在控制台发送消息的时候,就可以选择消息持久化

4

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

5、LazyQueue

默认情况下RabbitMQ会将接收到的消息存入内存中,但是会发生一些特殊情况导致消息积压,然后撑爆内存,触发内存预警,(Pageout)将内存消息刷到磁盘上,并且在此期间不会接收任何消息

  • 消费者宕机或出现网络故障

  • 消息发送量激增,超过了消费者处理速度

  • 消费者处理业务发生阻塞

所以我们设计了惰性队列:接收到的消息直接存入磁盘,消费者消费消息的时候才从磁盘中读取消息、支持百万条的消息存储。

RabbitMQ 3.12版本后,默认队列格式就是LazyQueue

  1. 控制台配置Lazy模式

    5

  2. 代码声明Lazy模式

    1
    2
    3
    4
    5
    6
    7
    @Bean
    public Queue lazyQueue(){
    return QueueBuilder
    .durable("lazy.queue")
    .lazy() // 开启Lazy模式
    .build();
    }

    通过注解声明

    1
    2
    3
    4
    5
    6
    7
    8
    @RabbitListener(queuesToDeclare = @Queue(
    name = "lazy.queue",
    durable = "true",
    arguments = @Argument(name = "x-queue-mode", value = "lazy")
    ))
    public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
    }
  3. 更新已有队列为lazy模式

    基于命令行

    1
    rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

    命令解读:

    • rabbitmqctl :RabbitMQ的命令行工具

    • set_policy :添加一个策略

    • Lazy :策略名称,可以自定义

    • "^lazy-queue$" :用正则表达式匹配队列的名字

    • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式

    • --apply-to queues:策略的作用对象,是所有的队列

    也可以控制台配置

    6

发送消息代码例子:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
void testSendMessage(){
//1、自定义构建消息
Message message = MessageBuilder.withBody("hello,SpingAMQP".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();

//2、发送消息
for (int i = 0; i < 1000000; i++) {
rabbitTemplate.convertAndSend("lazy.queue",message);
}
}

四、消费者可靠性

消息投递给消费者不一定就被正确消费了,有可能发生以下情况

  1. 消息投递过程中网络断了

  2. 消费者收到消息后死机

  3. 消费者收到消息后,处理不当导致异常

1、消费者确认机制

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

修改确认机制为auto

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ack

发送后的消息未接收到结果时,处于的状态是unacked(未确定状态)

若是消息处理异常,返回reject后会删除队列中的信息。 若是业务异常,则会保留(变为ready状态等待下次投递)

2、消费者重试机制

若消费出现异常(业务异常),则会不断重新入队,不断投递。浪费MQ资源。

我们可以设置消费消息出现异常的时候,在本地重试

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
template: #连接MQ服务器
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

3、失败处理策略

当消费者重试后处理消息依然出现异常,消息被丢掉。没法保证消息可靠性了。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略。由MessageRecovery接口来定义

接口 描述
RejectAndDontRequeueRecoverer 重试耗尽后,直接reject,丢弃消息。
ImmediateRequeueMessageRecoverer 重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer 重试耗尽后,将失败消息投递到指定的交换机
  1. 使用RepublishMessageRecoverer

定义一个专门存放错误消息的交换机与队列:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

创建RepublishMessageRecoverer。关联交换机和对应的RoutingKey

1
2
3
4
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

这样。当我们的消息经过重试机制仍然处理失败后,消息将被发往error.direct交换机。最终被发向error.queue队列

4、业务幂等性

f(x) = f(f(x)) 我们要求处理一些业务,处理一次和处理多次所得到的效果是一样的。

通常这些操作天生幂等: 根据id删除数据、查询数据、新增数据

但是关于数据更新的操作不一定幂等: 取消订单(会导致库存多次增加),退款业务(多退钱)

实际业务中会出现: 表单重复提交、服务间调用的重试、MQ的重复投递…

为了保证消息处理的幂等性:

  1. 唯一消息id : 执行消息之前,查看此消息有没有被处理过(应该是需要自己处理)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}

@Bean
public MessageConverter messageConverter(){
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setCreateMessageIds(true); //自动开启消息id

return converter;
}
}
  1. 业务状态判断:

    主要是在代码中利用一些特征处理。比如说取消订单首先这个订单要没被取消。然后即可实现幂等性。

5、兜底方案

其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

7

通常使用定时任务。实现主动查询

五、延迟消息

1、死信交换机

消息满足下列情况,变成死信(Dead Letter)

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息,超时无人消费

  • 要投递的队列消息满了,无法投递

消息成为死信,并且队列通过**dead-letter-exchange属性指定了一个交换机。那么队列中的死信会投递在这个死信交换机中。 进而投递到与死信交换机绑定的队列**中去。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息

  2. 收集那些因队列满了而被拒绝的消息

  3. 收集因TTL(有效期)到期的消息

2、延迟消息

通过设置一个无消费者消费的队列。 给这个队列指定一个死信交换机。同时将另一个有消费者消费的队列绑定这个死信交换机

然后给无消费者消费的队列投递一则有过期时间的消息。当时间过期后,就会投递到死信交换机,进而到有消费者消费的队列

实现了延迟消息

8

注意:

RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。

当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

1、声明队列与绑定关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class NormalConfiguration {

@Bean
public DirectExchange normalExchange(){
return new DirectExchange("normal.direct");
}

@Bean
public Queue normalQueue(){
return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();
}

@Bean
public Binding normalExchangeBinding(DirectExchange normalExchange,Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi");
}

}

2、发送消息

1
2
3
4
5
6
7
8
9
10
@Test
void testSendDelayMessage(){
rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
});
}

3、声明消费者,指定关系

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dlx.queue",durable = "true"),
exchange = @Exchange(name = "dlx.direct",type = ExchangeTypes.DIRECT),
key={"hi"}
))
public void listenDlxQueue(String msg){
log.info("消费者1监听到dlx.queue的消息:"+msg);
}

3、DelayExchange插件

死信交换机实现延迟消息太麻烦了。RabbitMQ社区实现了一个延迟消息插件来产生相同的结果。

rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com)

1、安装

下载后查找docker中RabbitMQ插件目录。一般指定为数据卷

1
docker volume inspect mq-plugins

执行结果:

1
2
3
4
5
6
7
8
9
10
11
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]

我们上传下载的插件到**/var/lib/docker/volumes/mq-plugins/_data**这个目录。

安装插件

1
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2、使用

  1. 注解声明:delayed = “true”

    1
    2
    3
    4
    5
    6
    7
    8
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "delay.queue",durable = "true"),
    exchange = @Exchange(name = "delay.direct",delayed = "true"),
    key={"hi"}
    ))
    public void listenDelayQueue(String msg){
    log.info("消费者1监听到delay.queue的消息:"+msg);
    }
  2. 声明Bean方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Slf4j
    @Configuration
    public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
    return ExchangeBuilder
    .directExchange("delay.direct") // 指定交换机类型和名称
    .delayed() // 设置delay的属性为true
    .durable(true) // 持久化
    .build();
    }

    @Bean
    public Queue delayedQueue(){
    return new Queue("delay.queue");
    }

    @Bean
    public Binding delayQueueBinding(){
    return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
    }

3、发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}

消息过多。计时会消耗过多CPU资源。所以不建议设置延迟消息时间过长。

4、消费者接受消息

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue",durable = "true"),
exchange = @Exchange(name = "delay.direct",delayed = "true"),
key={"hi"}
))
public void listenDelayQueue(String msg){
log.info("消费者1监听到delay.queue的消息:"+msg);
}

4、具体例子: 超时订单问题

9

假如订单超时支付时间为30分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。

不提供具体代码实现。