RabbitMQ 消息队列基础
首先使用的时候需要创建好一个新的账户和 Virtual Hosts(并绑定到这个账户),这个是用来隔离其他可能需要用到的业务的
使用初始账号密码登录后在 Admin这个选项中新建用户,切换到新建的账户,然后点击右侧边栏的 Virtual Hosts自拟名字创建虚拟路由
需要新建队列在 Queue and Streams中,选择队列绑定到想要的虚拟路由中即可
Java 客户端
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
配置文件
1 2 3 4 5 6 7
| spring: rabbitmq: host: localhost port: 5672 username: password: virtual-host:
|
生产者
生产者使用注解 @rabbitTemplate即可
1 2 3 4 5 6 7 8 9 10 11 12
| @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate;
@Test void testSendMessage2Queue(){ String queueName = "simple.queue"; String msg = "hello, amqp!"; rabbitTemplate.convertAndSend(queueName,msg); } }
|
消费者
1 2 3 4 5 6 7 8 9 10
| @Slf4j @Component public class Mqlistener {
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg){ System.out.println("消费者收到了simple.queue的消息【" + msg+ "】"); } }
|
消费者使用 @RabbitListener并在参数中指定队列名字即可监听
Work 模型
在单个线程无法处理过来时,可以多个消费者同时绑定一个队列,从而加快处理速度,不过Work模型是自带的,所以直接监听就好了
如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component public class Mqlistener {
@RabbitListener(queues = "work.queue") public void listenWorkQueue1(String msg){ System.out.println("消费者1 收到了simple.queue的消息【" + msg+ "】"); }
@RabbitListener(queues = "work.queue") public void listenWorkQueue2(String msg) throws InterruptedException { Thread.sleep(1000); System.err.println("消费者2 收到了simple.queue的消息【" + msg+ "】"); } }
|
但是需要注意,MQ 投递消息的时候是一次性全部投放给消费者,并且是均等份的直接投放。这样会造成的后果是,两个消费者处理的速度不同时,一个处理完另一个空闲,会造成资源的浪费。
所以还需要开启配置
1 2 3 4 5
| spring: rabbitmq: listener: simple: prefetch: 1
|
这样就能最大化的利用消费者的消息处理
交换机
交换机有三种不同的类型,创建交换机的时候可以选择需要的类型。
队列绑定到交换机的时候,先创建好需要绑定的队列,在 Exchanges中进入需要绑定的交换机然后将队列名添加进去即可
Fanout(广播)
Fanout 交换机会将接收到的消息广播到每一个和他绑定的队列,所以在业务上 Fanout 负责的是把通知发送到每个绑定了自己队列的微服务。
在向 Fanout 交换机发送消息的时候消费者依然使用 convertAndSend ,只是第二个参数需要设置为 null
1 2 3 4 5
| void testSendFanout() { String exchangeName = "amq.fanout"; String msg = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName,null,msg); }
|
参数
Direct(定向)
在 Fanout 交换机的基础上,添加绑定的路由,消息只会投递给路由下的队列。
在添加队列的时候,添加路由名字即可,一个队列可以绑定多个路由
在消费者代码中,只需要把上一段的代码中的 null 改为路由即可
1 2 3 4 5
| void testSendFanout() { String exchangeName = "amq.fanout"; String msg = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName,"blue",msg); }
|
Topic(话题)
在 Direct 交换机的基础上,可以使用符号 *和 #来进行匹配用 .进行分割的队列,例如:test.topic
* 表示可以匹配 0 个或多个单词,例如:*.topic test.*
# 表示只能匹配一个单词
在用法上与 Direct 交换机相同