RabbitMQ(一)

RabbitMQ(一)

消息队列RabbitMQ的黑马学习笔记

原文地址:

‍‌⁠‍⁠‬‌‌‬‬‬‍⁠‍⁠⁠‍⁠‍‌‬⁠‬‌⁠‌‍day06-MQ基础 - 飞书云文档 (feishu.cn)

一、介绍

不同于同步调用,消息队列是为了解决异步通讯的问题。不关心异步代码运行后的结果。不需要保证时效性。 比如我们要发送一个验证码。这时候就可以用消息队列

同步调用的方式存在下列问题:

  • 拓展性差

  • 性能下降

  • 级联失败

而要解决这些问题,我们就必须用异步调用的方式来代替同步调用

异步调用的优势包括:

  • 耦合度更低

  • 性能更好

  • 业务拓展性强

  • 故障隔离,避免级联失败

当然,异步通信也并非完美无缺,它存在下列缺点:

  • 完全依赖于Broker的可靠性、安全性和性能

  • 架构复杂,后期维护和调试麻烦

RabbitMQ

1

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方

  • consumer:消费者,也就是消费消息的一方

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

  1. 交换机没有存储消息的能力

  2. 创建好的队列要和交换机绑定

二、引入

Docker安装

1
2
3
4
5
6
7
8
9
10
11
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hm-net\
-d \
rabbitmq:3.8-management
  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

三、使用

访问虚拟机地址+ 15672 即可到达RabbitMQ控制面板

[RabbitMQ Management](http://192.168.101.128:15672/

1、发送消息这里不做讲解

创建交换机->创建队列->交换机队列绑定->交换机发送消息、队列接受消息

2、数据隔离的使用这里不做讲解

管理员添加用户

-> 切换到新添加的用户

-> 在此用户下创建一个virtual host

-> 切换到对应的virtual host

四、SpringAMQP

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

1、配置文件

1
2
3
4
5
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml中添加配置:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码

2、简单发送

创建一个simple.queue

1
2
3
4
5
6
7
8
9
10
11
12
13
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleQueue(){
//1、队列名
String queueName = "simple.queue";
//2、消息
String message = "hello,spring amqp!";

//3、发送消息
rabbitTemplate.convertAndSend(queueName,message);
}

发送成功可以在RabbitMQ控制面板看到

3、消息接收

配置好application.yaml后

1
2
3
4
5
6
7
8
9
10
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}

4、WorkQueues模型

让多个消费者绑定到一个队列,共同消费队列中的消息,加快消息处理速度

配置能者多劳

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2。。。。。接收到消息:"+msg+ LocalTime.now());
Thread.sleep(200);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:"+msg+ LocalTime.now());
Thread.sleep(20);
}
1
2
3
4
5
6
7
8
9
10
@Test
public void testWorkQueue(){
//1、队列名
String queueName = "work.queue";
for(int i=0;i<50;i++){
String message = "hello, spring amqp_"+i;

rabbitTemplate.convertAndSend(queueName,message);
}
}

五、引入交换机

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机

  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。

  • Consumer:消费者,与以前一样,订阅队列,没有变化

交换机有4种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

1、Fanout

2

广播模式

  • 1) 可以有多个队列

  • 2) 每个队列都要绑定到Exchange(交换机)

  • 3) 生产者发送的消息,只能发送到交换机

  • 4) 交换机把消息发送给绑定过的所有队列

  • 5) 订阅队列的消费者都能拿到消息

在RabbitMQ控制面板分别创建

Exchange:hmall.fanout

QUEUE: fanout.queue1、 fanout.queue2

并且绑定队列与交换机

消息发送:

1
2
3
4
5
6
7
8
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}

消息接收:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

2、Direct

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

  1. 创建两个队列: direct.queue1、direct.queue2

  2. 创建交换机hmall.direct

  3. 使用red和blue作为key,绑定direct.queue1到hmall.direct

  4. 使用red和yellow作为key,绑定direct.queue1到hmall.direct

消息接收:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

消息发送:

1
2
3
4
5
6
7
8
9
@Test
public void testDirectQueue(){
//1、交换机名
String exchangeName ="hmall.direct";
//2、消息
String message = "红色:震惊";
//3、发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}

3、Topic

topic交换机的BindingKey由一个或多个单词组成,多个单词之间以.分割。

还支持通配符:

#:匹配一个或多个词

*:匹配一个词

  1. 创建一个hmall.topic交换机

  2. 创建topic.queue1与topic.queue2分别以china.# 和 #.news绑定

消息发送

1
2
3
4
5
6
7
8
9
@Test
public void testTopicQueue(){
//1、交换机名
String exchangeName ="hmall.topic";
//2、消息
String message = "红色:震惊";
//3、发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
}

消息接收:

1
2
3
4
5
6
7
8
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
log.info("消费者1监听到topic.queue1的消息:"+msg);
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
log.info("消费者2监听到topic.queue2的消息:"+msg);
}

六、声明交换机和队列

如果每次都要打开控制面板创建交换机,那也太麻烦了。所以我们可以通过代码的方式自动创建交换机和队列

1、手动创建

fanout:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
public class FanoutConfiguration {

//声明fanout交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout");
}

//第一个队列
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}

//绑定
@Bean
public Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//第二个队列
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}

//绑定
@Bean
public Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}

direct: 区别主要是绑定的时候要 一个一个添加BindingKey 麻烦

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DirectConfiguration {

//声明direct交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("hmall.direct");
}
//声明队列
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
@Bean
public Queue directQueue2(){
return new Queue("direct.queue1");
}
//添加绑定关系
@Bean
public Binding directQueue1BindingRed(Queue directtQueue1,DirectExchange directExchange){
return BindingBuilder.bind(directtQueue1).to(directExchange).with("red");
}
@Bean
public Binding directQueue1BindingBlue(Queue directtQueue1,DirectExchange directExchange){
return BindingBuilder.bind(directtQueue1).to(directExchange).with("blue");
}
@Bean
public Binding directQueue2BindingRed(Queue directtQueue2,DirectExchange directExchange){
return BindingBuilder.bind(directtQueue2).to(directExchange).with("red");
}
@Bean
public Binding directQueue2BindingYellow(Queue directtQueue2,DirectExchange directExchange){
return BindingBuilder.bind(directtQueue2).to(directExchange).with("yellow");
}
}

2、基于注解的声明

在接收的地方直接声明交换机和队列

direct模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"), //声明队列
exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT), //声明交换机,指定交换机类型
key={"red","blue"} //声明key
))
public void listendirectQueue1(String msg){
log.info("消费者1监听到direct.queue1的消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
key={"red","yellow"}
))

topic模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

七、消息转换器

数据传输时,我们发送的消息会被序列化转成字节发送给MQ,接收消息的时候,会把字节反序列化成java对象。

只不过默认情况下是jdk序列化,有下列几点缺点

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

1、配置JSON转换器

1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>

启动类中添加Bean

1
2
3
4
5
6
7
8
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}