RabbitMQ(一)
消息队列RabbitMQ的黑马学习笔记
原文地址:
day06-MQ基础 - 飞书云文档 (feishu.cn)
一、介绍
不同于同步调用,消息队列是为了解决异步通讯的问题。不关心异步代码运行后的结果。不需要保证时效性。 比如我们要发送一个验证码。这时候就可以用消息队列
同步调用的方式存在下列问题:
而要解决这些问题,我们就必须用异步调用的方式来代替同步调用。
异步调用的优势包括:
-
耦合度更低
-
性能更好
-
业务拓展性强
-
故障隔离,避免级联失败
当然,异步通信也并非完美无缺,它存在下列缺点:
-
完全依赖于Broker的可靠性、安全性和性能
-
架构复杂,后期维护和调试麻烦
RabbitMQ
![1]()
其中包含几个概念:
-
publisher:生产者,也就是发送消息的一方
-
consumer:消费者,也就是消费消息的一方
-
queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
-
exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
-
virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
-
交换机没有存储消息的能力
-
创建好的队列要和交换机绑定
二、引入
Docker安装
1 2 3 4 5 6 7 8 9 10 11
| docker run \ -e RABBITMQ_DEFAULT_USER=itheima \ -e RABBITMQ_DEFAULT_PASS=123321 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ --network hm-net\ -d \ rabbitmq:3.8-management
|
三、使用
访问虚拟机地址+ 15672 即可到达RabbitMQ控制面板
[RabbitMQ Management](http://192.168.101.128:15672/
1、发送消息这里不做讲解
创建交换机->创建队列->交换机队列绑定->交换机发送消息、队列接受消息
2、数据隔离的使用这里不做讲解
管理员添加用户
-> 切换到新添加的用户
-> 在此用户下创建一个virtual host
-> 切换到对应的virtual host
四、SpringAMQP
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。
SpringAMQP提供了三个功能:
1、配置文件
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
application.yml中添加配置:
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.150.101 port: 5672 virtual-host: /hmall username: hmall password: 123
|
2、简单发送
创建一个simple.queue
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testSimpleQueue(){ String queueName = "simple.queue"; String message = "hello,spring amqp!";
rabbitTemplate.convertAndSend(queueName,message); }
|
发送成功可以在RabbitMQ控制面板看到
3、消息接收
配置好application.yaml后
1 2 3 4 5 6 7 8 9 10
| @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } }
|
4、WorkQueues模型
让多个消费者绑定到一个队列,共同消费队列中的消息,加快消息处理速度
配置能者多劳
1 2 3 4 5
| spring: rabbitmq: listener: simple: prefetch: 1
|
1 2 3 4 5 6 7 8 9 10
| @RabbitListener(queues = "work.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2。。。。。接收到消息:"+msg+ LocalTime.now()); Thread.sleep(200); } @RabbitListener(queues = "work.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:"+msg+ LocalTime.now()); Thread.sleep(20); }
|
1 2 3 4 5 6 7 8 9 10
| @Test public void testWorkQueue(){ String queueName = "work.queue"; for(int i=0;i<50;i++){ String message = "hello, spring amqp_"+i;
rabbitTemplate.convertAndSend(queueName,message); } }
|
五、引入交换机
-
Publisher:生产者,不再发送消息到队列中,而是发给交换机
-
Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
-
Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
-
Consumer:消费者,与以前一样,订阅队列,没有变化
交换机有4种:
-
Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
-
Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
-
Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
-
Headers:头匹配,基于MQ的消息头匹配,用的较少。
1、Fanout
![2]()
广播模式
在RabbitMQ控制面板分别创建
Exchange:hmall.fanout
QUEUE: fanout.queue1、 fanout.queue2
并且绑定队列与交换机
消息发送:
1 2 3 4 5 6 7 8
| @Test public void testFanoutExchange() { String exchangeName = "hmall.fanout"; String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
|
消息接收:
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); }
@RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }
|
2、Direct
-
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
-
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
-
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
-
创建两个队列: direct.queue1、direct.queue2
-
创建交换机hmall.direct
-
使用red和blue作为key,绑定direct.queue1到hmall.direct
-
使用red和yellow作为key,绑定direct.queue1到hmall.direct
消息接收:
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "direct.queue1") public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); }
@RabbitListener(queues = "direct.queue2") public void listenDirectQueue2(String msg) { System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); }
|
消息发送:
1 2 3 4 5 6 7 8 9
| @Test public void testDirectQueue(){ String exchangeName ="hmall.direct"; String message = "红色:震惊"; rabbitTemplate.convertAndSend(exchangeName,"blue",message); }
|
3、Topic
topic交换机的BindingKey由一个或多个单词组成,多个单词之间以.分割。
还支持通配符:
#:匹配一个或多个词
*:匹配一个词
-
创建一个hmall.topic交换机
-
创建topic.queue1与topic.queue2分别以china.# 和 #.news绑定
消息发送:
1 2 3 4 5 6 7 8 9
| @Test public void testTopicQueue(){ String exchangeName ="hmall.topic"; String message = "红色:震惊"; rabbitTemplate.convertAndSend(exchangeName,"china.news",message); }
|
消息接收:
1 2 3 4 5 6 7 8
| @RabbitListener(queues = "topic.queue1") public void listenTopicQueue1(String msg){ log.info("消费者1监听到topic.queue1的消息:"+msg); } @RabbitListener(queues = "topic.queue2") public void listenTopicQueue2(String msg){ log.info("消费者2监听到topic.queue2的消息:"+msg); }
|
六、声明交换机和队列
如果每次都要打开控制面板创建交换机,那也太麻烦了。所以我们可以通过代码的方式自动创建交换机和队列
1、手动创建
fanout:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Configuration public class FanoutConfiguration {
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("hmall.fanout"); }
@Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); }
@Bean public Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); }
@Bean public Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
|
direct: 区别主要是绑定的时候要 一个一个添加BindingKey 麻烦
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class DirectConfiguration {
@Bean public DirectExchange directExchange(){ return new DirectExchange("hmall.direct"); } @Bean public Queue directQueue1(){ return new Queue("direct.queue1"); } @Bean public Queue directQueue2(){ return new Queue("direct.queue1"); } @Bean public Binding directQueue1BindingRed(Queue directtQueue1,DirectExchange directExchange){ return BindingBuilder.bind(directtQueue1).to(directExchange).with("red"); } @Bean public Binding directQueue1BindingBlue(Queue directtQueue1,DirectExchange directExchange){ return BindingBuilder.bind(directtQueue1).to(directExchange).with("blue"); } @Bean public Binding directQueue2BindingRed(Queue directtQueue2,DirectExchange directExchange){ return BindingBuilder.bind(directtQueue2).to(directExchange).with("red"); } @Bean public Binding directQueue2BindingYellow(Queue directtQueue2,DirectExchange directExchange){ return BindingBuilder.bind(directtQueue2).to(directExchange).with("yellow"); } }
|
2、基于注解的声明
在接收的地方直接声明交换机和队列
direct模式:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), //声明队列 exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT), //声明交换机,指定交换机类型 key={"red","blue"} //声明key )) public void listendirectQueue1(String msg){ log.info("消费者1监听到direct.queue1的消息:"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT), key={"red","yellow"} ))
|
topic模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); }
|
七、消息转换器
数据传输时,我们发送的消息会被序列化转成字节发送给MQ,接收消息的时候,会把字节反序列化成java对象。
只不过默认情况下是jdk序列化,有下列几点缺点
1、配置JSON转换器
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
|
启动类中添加Bean
1 2 3 4 5 6 7 8
| @Bean public MessageConverter messageConverter(){ Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }
|