RabbitMQ学习
Wucheng

RabbitMQ 消息队列基础

首先使用的时候需要创建好一个新的账户和 Virtual Hosts(并绑定到这个账户),这个是用来隔离其他可能需要用到的业务的

使用初始账号密码登录后在 Admin这个选项中新建用户,切换到新建的账户,然后点击右侧边栏的 Virtual Hosts自拟名字创建虚拟路由

需要新建队列在 Queue and Streams中,选择队列绑定到想要的虚拟路由中即可

Java 客户端

1
2
3
4
5
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

1
2
3
4
5
6
7
spring:
rabbitmq:
host: localhost
port: 5672
username:
password:
virtual-host:

生产者

生产者使用注解 @rabbitTemplate即可

1
2
3
4
5
6
7
8
9
10
11
12
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void testSendMessage2Queue(){
String queueName = "simple.queue";
String msg = "hello, amqp!";
rabbitTemplate.convertAndSend(queueName,msg);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
public class Mqlistener {

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("消费者收到了simple.queue的消息【" + msg+ "】");
}
}

消费者使用 @RabbitListener并在参数中指定队列名字即可监听

Work 模型

在单个线程无法处理过来时,可以多个消费者同时绑定一个队列,从而加快处理速度,不过Work模型是自带的,所以直接监听就好了

如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class Mqlistener {

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg){
System.out.println("消费者1 收到了simple.queue的消息【" + msg+ "】");
}

@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
Thread.sleep(1000); // 模拟处理时间
System.err.println("消费者2 收到了simple.queue的消息【" + msg+ "】");
}
}

但是需要注意,MQ 投递消息的时候是一次性全部投放给消费者,并且是均等份的直接投放。这样会造成的后果是,两个消费者处理的速度不同时,一个处理完另一个空闲,会造成资源的浪费。

所以还需要开启配置

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完才能获取下一条消息

这样就能最大化的利用消费者的消息处理

交换机

交换机有三种不同的类型,创建交换机的时候可以选择需要的类型。

队列绑定到交换机的时候,先创建好需要绑定的队列,在 Exchanges中进入需要绑定的交换机然后将队列名添加进去即可

Fanout(广播)

Fanout 交换机会将接收到的消息广播到每一个和他绑定的队列,所以在业务上 Fanout 负责的是把通知发送到每个绑定了自己队列的微服务。

在向 Fanout 交换机发送消息的时候消费者依然使用 convertAndSend ,只是第二个参数需要设置为 null

1
2
3
4
5
void testSendFanout() {
String exchangeName = "amq.fanout";
String msg = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName,null,msg);
}

参数

Direct(定向)

在 Fanout 交换机的基础上,添加绑定的路由,消息只会投递给路由下的队列。

在添加队列的时候,添加路由名字即可,一个队列可以绑定多个路由

在消费者代码中,只需要把上一段的代码中的 null 改为路由即可

1
2
3
4
5
void testSendFanout() {
String exchangeName = "amq.fanout";
String msg = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName,"blue",msg);
}

Topic(话题)

在 Direct 交换机的基础上,可以使用符号 *#来进行匹配用 .进行分割的队列,例如:test.topic

  • * 表示可以匹配 0 个或多个单词,例如:*.topic test.*
  • # 表示只能匹配一个单词

在用法上与 Direct 交换机相同