RabbitMQ

思维导图

MQ的相关概念

MQ介绍

MQ(Message Queue)消息队列是一种基本的数据结构,遵循先进先出 (FIFO) 原则。它通过将待传递的数据(消息)存储在队列中,利用队列机制来实现消息的传递。在这个模型中,生产者生成消息并将其放入队列,而消费者则负责处理这些消息。消费者可以主动从指定队列中拉取消息,或者通过订阅特定队列来接收MQ服务端推送的消息。这种方式实现了可靠的消息传递和异步通信。

MQ的作用

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

削峰

在高并发场景下,通过消息队列实现异步业务处理,可以有效提升系统的高峰期业务处理能力,避免系统瘫痪。

如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好

解耦

当一个业务需要多个模块协同工作,或者一条消息需要被多个系统处理时,可以通过发送一条消息队列(MQ)来解耦这些模块。主业务完成后,只需发送MQ消息,其他模块可以独立地消费这些消息,从而实现业务功能,降低了模块之间的耦合度。

异步

主要业务执行完毕后,附属业务通过消息队列(MQ)以异步方式处理,以缩短业务响应时间,从而改善用户体验。

MQ 的分类

ActiveMQ

低吞吐

优点:单机吞吐量高达万级,消息时效性在毫秒级别,具备高可用性,采用主从架构提供稳健的高可用性支持,具有较低的消息可靠性风险,降低了数据丢失的概率。

缺点:当前官方社区对ActiveMQ 5.x的维护逐渐减少,因此在高吞吐量场景下的使用相对较少。

Kafka

高吞吐 队列数量和性能成反比

优点: Kafka以其高吞吐量著称,单机写入每秒可达百万条消息。它具有出色的性能表现,时效性达毫秒级,以及高可用性。作为分布式系统,Kafka能够在多个节点上保留数据的多个副本,即使个别节点宕机也不会丢失数据或导致不可用情况。消费者可以通过拉取(Pull)方式获取消息,保证消息有序性,以及确保每条消息被消费且仅被消费一次。Kafka还受益于优秀的第三方管理界面Kafka-Manager。在日志领域,Kafka应用成熟广泛,得到多家公司和开源项目的采用。它主要用于大数据领域的实时计算(例如Flink、CDC、ETL)以及日志采集,提供简单MQ功能和可靠的消息传递。

缺点:Kafka在单机支持队列/分区超过64个时,可能导致负载明显升高。增多的队列数量会降低发送消息的响应时间,因为使用短轮询方式,实时性受轮询间隔时间影响。此外,Kafka不直接支持消费失败的消息重试,这需要由消费者自行处理。尽管它支持消息顺序,但当代理宕机时可能会导致消息乱序。需要注意的是,Kafka社区更新相对缓慢,这可能影响到新功能的发布和问题修复的速度。

RocketMQ

中吞吐 功能比kafka丰富 支持语言不多

优点:RocketMQ以中等吞吐量的性能表现著称,单机吞吐量可达十万级。它拥有出色的可用性,并采用分布式架构,因此能够实现消息的高可靠性,几乎可以做到零消息丢失。RocketMQ的MQ功能相对丰富,且它是一个分布式系统,因此具备出色的扩展性,能够支持高达10亿条消息的堆积,而不会因此导致性能下降。需要指出的是,RocketMQ是使用Java实现的,尽管它在某些设计思想上可能受到了Kafka的影响。

缺点:对于支持的客户端语言,RocketMQ主要支持Java和C++,但需要指出的是C++的客户端支持相对不够成熟。此外,RocketMQ的社区活跃度一般,相对于某些其他消息队列系统,社区支持有限。此外,RocketMQ并没有在其核心中实现JMS等接口,因此在将某些系统迁移到RocketMQ时可能需要修改大量的代码。

RabbitMQ

低吞吐 功能齐全 多语言

Kafka是一个于2007年发布的消息中间件系统,它在高级消息队列协议(AMQP)的基础上开发而成,被广泛认为是当前最主流的消息中间件之一。

优点:RabbitMQ得益于Erlang语言的高并发特性,具备出色的性能。它支持每秒万级别的吞吐量,拥有完善的消息队列(MQ)功能,以及强大的健壮性和稳定性。RabbitMQ易于使用,跨平台支持多种编程语言,包括Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。该系统提供了全面的文档支持,并且具备出色的开源管理界面,为用户提供极佳的使用体验。RabbitMQ拥有活跃的社区支持,更新频率相当高。

缺点:商业版需要收费,学习成本较高。

MQ的选择

  1. Kafka:

    特点:Kafka采用基于Pull的消息消费模式,追求高吞吐量。最初设计用于日志收集和传输,非常适合处理产生大量数据的互联网服务的数据收集业务。对于大型公司,尤其是需要日志采集功能、CDC、ETL的场景,Kafka是首选。

  2. RocketMQ:

    特点:RocketMQ天生为金融互联网领域而生,特别适用于对可靠性要求极高的场景,例如电商中的订单扣款以及业务削峰。在大量交易涌入时,RocketMQ可以保证后端能够及时处理。其稳定性在阿里双11等大型活动中得到了多次验证。如果您的业务涉及到这些高并发场景,强烈建议选择RocketMQ。

  3. RabbitMQ:

    特点:RabbitMQ结合Erlang语言本身的并发优势,具有出色的性能和微秒级的时效性。社区活跃度高,管理界面也非常便捷。如果您的数据量规模不是特别大,尤其对于中小型公司而言,建议优先选择功能比较完备的RabbitMQ。

MQ的缺点

  1. 系统可用性降低。依赖服务也多,服务越容易挂掉。需要考虑MQ瘫痪的情况

  2. 系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性

  3. 业务一致性。主业务和从属业务一致性的处理

前置

项目源码

docker快速安装

1
2
3
docker pull rabbitmq:management

docker run -di --name=rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management

RabbitMQ 的概念

RabbitMQ 是一个由 erlang 开发的 AMQP (Advanced Message Queuing Protocol) 的开源实现

生产者

产生数据发送消息的程序是生产者,产生数据后投递到交换机中

交换机

交换机(Exchange)是RabbitMQ消息传递的核心组件之一。它的主要作用是接收生产者发送的消息,并根据特定的路由规则,将这些消息路由到一个或多个队列中。

Direct exchange

直连交换机

  • 定义与用途
    • 直连交换机(Direct Exchange)是RabbitMQ中的基础交换机类型。
    • 它的主要功能是根据消息中的路由键(Routing Key)将消息准确地路由到对应的队列。
  • 路由键匹配
    • 在直连交换机下,路由键必须进行精确匹配。
    • 只有当消息的Routing Key与队列绑定的Routing Key完全一致时,消息才会被投递到该队列。
  • 队列与路由键的关系
    • 一个队列可以绑定到多个Routing Key。
    • 这为消息的灵活路由提供了可能性。
  • 操作步骤
    1. 将队列绑定到直连交换机,并为该绑定指定一个路由键。
    2. 当消息携带的路由键与某个队列的绑定路由键匹配时,交换机将消息路由到该队列。

demo

添加队列和交换机配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
public class DirectConfig {
public static final String DIRECT_EXCHANGE = "direct_exchange";
public static final String DIRECT_QUEUE = "direct.queue";
public static final String ROUTING_KEY = "queue";

/**
* 创建队列
*
* @return
*/
@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE);
}

/**
* 创建交换机
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}

/**
* 绑定队列
* @param directQueue
* @param directExchange
* @return
*/
@Bean
public Binding bindDirectQueue(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with(ROUTING_KEY);
}
}

添加队列监听

1
2
3
4
5
6
7
8
@Slf4j
@Component
public class DirectListener {
@RabbitListener(queues = DirectConfig.DIRECT_QUEUE)
public void receive(String msg) {
log.info("队列 {} 接收信息:{}", DirectConfig.DIRECT_QUEUE, msg);
}
}

单元测试

1
2
3
4
5
6
7
@Test
public void sendDirect() throws InterruptedException {
for (int i = 0; i < 2; i++) {
rabbitTemplate.convertAndSend(DirectConfig.DIRECT_EXCHANGE, DirectConfig.ROUTING_KEY, i+ "");
}
TimeUnit.SECONDS.sleep(10);
}

日志

1
2
INFO 28275 --- [ntContainer#0-1] com.wgf.demo.listener.DirectListener     : 队列 direct.queue 接收信息:0
INFO 28275 --- [ntContainer#0-1] com.wgf.demo.listener.DirectListener : 队列 direct.queue 接收信息:1

Topic exchange

主题交换机

  1. 定义与用途: 主题交换机(Topic Exchange)是RabbitMQ中一种灵活且功能丰富的交换机类型,主要用于实现复杂的消息路由规则,允许一条消息根据路由键模糊匹配,被投递到多个队列。
  2. 基本特性
    • 主题交换机是直连交换机的扩展,允许一个队列绑定多个Routing Key。
    • 它通过路由键的模糊匹配,能够将消息路由到一个或多个绑定队列。
  3. 路由键规则: 发送到主题交换机的消息的Routing Key必须是由点号(.)分隔的单词列表,如:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。这些单词没有特定要求,但整个路由键的长度不得超过255个字节。
  4. 通配符: 主题交换机在匹配路由键时,支持两种通配符:
    • *(星号):匹配路由键中的一个单词。
    • #(井号):匹配路由键中的零或多个单词。
  5. 灵活性: 通过模糊匹配和通配符的使用,主题交换机能够满足多样化和复杂的消息路由需求,展现出极高的灵活性。

  • quick.orange.rabbit 被队列 Q1Q2 接收到
  • lazy.orange.elephant 被Q1Q3 接收到
  • quick.orange.fox 被队列 Q1 接收到
  • lazy.brown.fox 被队列 Q2 接收到
  • quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃

demo

添加队列和交换机配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Configuration
public class TopicConfig {

public static final String TOPIC_EXCHANGE = "topic_exchange";
public static final String TOPIC_QUEUE_1 = "topic.queue1";
public static final String TOPIC_QUEUE_2 = "topic.queue2";
public static final String ROUTING_KEY = "topic.test";
public static final String ROUTING_KEY2 = "topic.#";

/**
* 创建队列
*
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE_1);
}

/**
* 创建队列
*
* @return
*/
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE_2);
}

/**
* 创建交换机
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}

/**
* 绑定队列
*
* @param topicQueue1
* @param topicExchange
* @return
*/
@Bean
public Binding bindTopicQueue1(Queue topicQueue1, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue1).to(topicExchange).with(ROUTING_KEY);
}

/**
* 绑定队列
*
* @param topicQueue1
* @param topicExchange
* @return
*/
@Bean
public Binding bindTopicQueue2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with(ROUTING_KEY2);
}
}

添加队列监听

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@Component
public class TopicListener {
@RabbitListener(queues = TopicConfig.TOPIC_QUEUE_1)
public void receive(String msg) {
log.info("队列 {} 接收信息:{}", TopicConfig.TOPIC_QUEUE_1, msg);
}

@RabbitListener(queues = TopicConfig.TOPIC_QUEUE_2)
public void receive2(String msg) {
log.info("队列 {} 接收信息:{}", TopicConfig.TOPIC_QUEUE_2, msg);
}
}

单元测试

1
2
3
4
5
6
7
@Test
public void sendTopic() throws InterruptedException {
rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE, TopicConfig.ROUTING_KEY, TopicConfig.ROUTING_KEY);
rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE, "topic.hello", "topic.hello");
rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE, "topic.hi.hello", "topic.hi.hello");
TimeUnit.SECONDS.sleep(10);
}

日志

1
2
3
4
INFO 10456 --- [ntContainer#0-1] com.wgf.demo.listener.TopicListener      : 队列 topic.queue1 接收信息:topic.test
INFO 10456 --- [ntContainer#1-1] com.wgf.demo.listener.TopicListener : 队列 topic.queue2 接收信息:topic.test
INFO 10456 --- [ntContainer#1-1] com.wgf.demo.listener.TopicListener : 队列 topic.queue2 接收信息:topic.hi.hello
INFO 10456 --- [ntContainer#1-1] com.wgf.demo.listener.TopicListener : 队列 topic.queue2 接收信息:topic.hello

Fanout exchange

扇出交换机

  1. 无需路由键: 扇出交换机(Fanout Exchange)在进行消息路由时,不需要路由键(Routing Key)。即便消息中包含路由键,该键也不会影响消息的路由过程。
  2. 广播特性: 当消息发送到扇出交换机时,该交换机会将消息转发给所有绑定到它的队列,实现了一种广播的效果。
  3. 多队列绑定: 如果有N个队列绑定到某个扇出交换机上,当有消息发送给该交换机时,交换机会将消息发送给这所有的N个队列,确保每个队列都能收到消息的拷贝。

demo

添加队列交换机配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Configuration
public class FanoutConfig {
public static final String FANOUT_EXCHANGE = "fanout_exchange";
public static final String FANOUT_QUEUE_1 = "fanout.queue_1";
public static final String FANOUT_QUEUE_2 = "fanout.queue_2";

/**
* 创建队列
*
* @return
*/
@Bean
public Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE_1);
}

/**
* 创建队列
*
* @return
*/
@Bean
public Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE_2);
}

/**
* 创建交换机
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}

/**
* 绑定队列
* @param fanoutQueue1
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
// 不需要 routing_key
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

/**
* 绑定队列
* @param fanoutQueue2
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindFanoutQueue(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
// 不需要 routing_key
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}

添加队列监听

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@Component
public class FanoutListener {
@RabbitListener(queues = FanoutConfig.FANOUT_QUEUE_1)
public void receive(String msg) {
log.info("队列 {} 接收信息:{}", TopicConfig.TOPIC_QUEUE_1, msg);
}

@RabbitListener(queues = FanoutConfig.FANOUT_QUEUE_2)
public void receive2(String msg) {
log.info("队列 {} 接收信息:{}", TopicConfig.TOPIC_QUEUE_2, msg);
}
}

单元测试

1
2
3
4
5
@Test
public void sendFanout() throws InterruptedException {
rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE, null, "test");
TimeUnit.SECONDS.sleep(10);
}

日志

1
2
INFO 31660 --- [ntContainer#1-1] com.wgf.demo.listener.FanoutListener     : 队列 topic.queue1 接收信息:test
INFO 31660 --- [ntContainer#2-1] com.wgf.demo.listener.FanoutListener : 队列 topic.queue2 接收信息:test

Headers exchange

头交换机

  1. 工作原理: 头交换机(Headers Exchange)与主题交换机类似,但不是使用路由键来路由消息,而是使用消息头中的键值对来建立路由规则。通过判断消息头的键值对是否与绑定的条件相匹配,来确定消息的路由路径。
  2. x-match 参数: 头交换机有一个重要的参数:x-match,它决定了消息头的匹配规则。
    • x-match 设置为 any 时,只要消息头中的任意一个键值对满足绑定条件,就视为匹配成功。
    • x-match 设置为 all(默认设置)时,消息头中的所有键值对都必须满足绑定条件,才视为匹配成功。
  3. 性能考虑: 由于头交换机在进行匹配时需要检查消息头中的多个属性,可能会导致性能较差,因此一般不推荐在对性能要求较高的场景中使用。

demo

添加队列交换机配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Configuration
public class HeadersConfig {
public static final String HEADERS_EXCHANGE = "headers_exchange";
public static final String HEADERS_QUEUE_1 = "headers.queue1";
public static final String HEADERS_QUEUE_2 = "headers.queue2";

/**
* 创建队列
*
* @return
*/
@Bean
public Queue headersQueue1() {
return new Queue(HEADERS_QUEUE_1);
}

/**
* 创建队列
*
* @return
*/
@Bean
public Queue headersQueue2() {
return new Queue(HEADERS_QUEUE_2);
}

/**
* 创建交换机
*
* @return
*/
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange(HEADERS_EXCHANGE);
}

/**
* 绑定队列
* @param directQueue
* @param directExchange
* @return
*/
@Bean
public Binding bindHeadersQueue1(Queue headersQueue1, HeadersExchange headersExchange) {
// 消息头属性
Map<String,Object> headerMap = new HashMap<>(8);
headerMap.put("color", "black");
headerMap.put("aging", "fast");
// whereAny 其中一项匹配即可
return BindingBuilder.bind(headersQueue1).to(headersExchange).whereAny(headerMap).match();
}

/**
* 绑定队列
* @param directQueue
* @param directExchange
* @return
*/
@Bean
public Binding bindHeadersQueue2(Queue headersQueue2, HeadersExchange headersExchange) {
// 消息头属性
Map<String,Object> headerMap = new HashMap<>(8);
headerMap.put("color", "black");
headerMap.put("aging", "fast");
// whereAll 完全匹配
return BindingBuilder.bind(headersQueue2).to(headersExchange).whereAll(headerMap).match();
}
}

添加队列监听

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@Component
public class HeadersListener {
@RabbitListener(queues = HeadersConfig.HEADERS_QUEUE_1)
public void receive(String msg) {
log.info("队列 {} 接收信息:{}", TopicConfig.TOPIC_QUEUE_1, msg);
}

@RabbitListener(queues = HeadersConfig.HEADERS_QUEUE_2)
public void receive2(String msg) {
log.info("队列 {} 接收信息:{}", TopicConfig.TOPIC_QUEUE_2, msg);
}
}

单元测试

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void sendHeaders() throws InterruptedException {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("color", "black");
SimpleMessageConverter messageConverter = new SimpleMessageConverter();
Message message = messageConverter.toMessage("test1", messageProperties);
rabbitTemplate.convertAndSend(HeadersConfig.HEADERS_EXCHANGE, null, message);

messageProperties.setHeader("aging", "fast");
message = messageConverter.toMessage("test2", messageProperties);
rabbitTemplate.convertAndSend(HeadersConfig.HEADERS_EXCHANGE, null, message);
}

日志

1
2
3
INFO 4330 --- [ntContainer#3-1] com.wgf.demo.listener.HeadersListener    : 队列 topic.queue1 接收信息:test1
INFO 4330 --- [ntContainer#4-1] com.wgf.demo.listener.HeadersListener : 队列 topic.queue2 接收信息:test2
INFO 4330 --- [ntContainer#3-1] com.wgf.demo.listener.HeadersListener : 队列 topic.queue1 接收信息:test2

默认交换机

默认交换机(Default Exchange)是RabbitMQ预先声明的一个特殊的直连交换机(Direct Exchange),其名称为空字符串。这个交换机具有以下特点,使其在简单应用中非常实用:

  1. 自动绑定: 每个新创建的队列都会自动绑定到默认交换机上。绑定时使用的路由键(Routing Key)与队列的名称相同。
  2. 简化的消息发送: 由于每个队列都自动绑定到默认交换机,并使用队列名称作为路由键,因此在发送消息时,可以省略交换机的名称,直接使用队列名称作为routing key来发送消息。

队列

队列是RabbitMQ的核心组件,作为一种内部数据结构,用于存储经过RabbitMQ的消息。尽管消息在RabbitMQ和应用程序之间流动,但它们实际上只能被存储在队列中。队列的容量基本上只受到主机内存和磁盘限制的约束,因此它可以被视为一个庞大的消息缓冲区。多个生产者可以向同一个队列发送消息,同时多个消费者也可以从一个队列中接收数据。

临时队列

没有消费者订阅时,自动删除

当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制,也就是说当这个队列上最后一个消费者断开连接才会执行删除

消费者

消费者通常是指等待接收并处理消息的程序,这里的“消费”一词与“接收”含义相近。值得注意的是,生产者、消费者和消息中间件常常不会部署在同一台机器上。同时,一个应用程序有可能同时充当生产者和消费者的角色。

获取消息的两种方式

推模式(Push)

推模式是RabbitMQ的 默认 消息获取模式,它以高效率和实时性为特点。

如果消费者处理消息的速率跟不上生产者的生产速度,可能会导致系统宕机。

优点:在推模式下,消息会被提前推送给消费者,消费者需要设置一个 缓冲区 来缓存这些消息。这样,消费者总是有一批待处理的消息存储在内存中,从而保证了处理效率和数据的实时性。

缺点:当大量消息在缓冲区堆积时,会给消费者端的服务带来较大的处理压力。如果消费者的处理能力无法满足大量消息的处理需求,可能会导致服务宕机。

推模式常用的实现方式为spring-boot-starter-amqp的@RabbitListener注解

拉模式(Pull)

拉模式允许消费者以可控的速率获取消息。

消息的实时性在此模式下依赖于消费者的拉取策略。

需要权衡拉取频率和消息的时效性,以避免资源浪费和处理延迟。

优点:与推模式相比,拉模式的一大优势在于消费者可以根据自己的处理能力来拉取消息,这有助于防止因消息堆积导致的服务宕机。

缺点:在拉模式下,消费者端需要不断地从服务器端拉取消息,处理完一定量的消息后再次进行拉取。设定拉取消息的间隔成为了一个挑战:间隔太短会导致消费者处于忙等状态,浪费资源;而间隔太长,则可能导致服务器端的消息未能及时处理。

RabbitMQ支持客户端批量拉取消息,客户端可以连续调用 basicGet 方法拉取多条消息,处理完成之后一次性ACK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Slf4j
@Component
public class PullMsgService {

@Autowired
RabbitTemplate rabbitTemplate;

/**
* 主动拉取消息
* rabbitmq 提供 channel.basicGet API用于主动拉取消息
*
* @param queueName 队列名称
* @param count 每一批次获取总数
* @param timeOut 超时时间,超时强制返回
* @param consumer 业务实现
*/
public void pull(String queueName, Integer count, int timeOut, Consumer<List<String>> consumer) {

rabbitTemplate.execute(new ChannelCallback<Object>() {

@Override
public Object doInRabbit(Channel channel) throws Exception {
List<String> msgList = new ArrayList<>();
long start = System.currentTimeMillis();
long end;
GetResponse response;
GetResponse lastResponse = null;

while (true) {
try {
end = System.currentTimeMillis();
response = channel.basicGet(queueName, false);

if (response == null) {
if (msgList.size() == 0) {
log.info("没有消息");
TimeUnit.SECONDS.sleep(5);
start = System.currentTimeMillis();
continue;
}
} else {
String msg = new String(response.getBody(), "utf-8");
lastResponse = response;
msgList.add(msg);
}

long step = end - start;
if ((step > timeOut && msgList.size() > 0) || msgList.size() >= count) {
consumer.accept(msgList);
channel.basicAck(lastResponse.getEnvelope().getDeliveryTag(), true);
start = System.currentTimeMillis();
msgList.clear();
}
} catch (Exception e) {
channel.basicNack(lastResponse.getEnvelope().getDeliveryTag(), true, true);
}
}
}
});
}
}

RabbitMQ 名词解释

  • Broker:是负责接收、存储和分发消息的应用程序,它构成了消息队列系统的核心组件。RabbitMQ Server便是一个典型的Message Broke
  • Virtual host:Virtual Host(虚拟主机)是一种逻辑分组,它允许你在同一个RabbitMQ实例中隔离不同的环境。每个Virtual Host都有自己独立的Exchange(交换器)、Queue(队列)和Binding(绑定),以及独立的权限管理,是多租户的实现。
  • Channel:Channel(通道)是建立在Connection(连接)之上的一个轻量级的虚拟连接,它是AMQP协议中的一个重要概念。Channel是客户端与Broker之间进行通信的最常用的接口。主要特点有:
    • 轻量级:Channel是轻量级的,创建和销毁的代价较低
    • 多路复用:在一个TCP连接(Connection)中,可以创建多个Channel,每个Channel都有一个唯一的ID,这样可以实现多路复用
    • 隔离性:每个Channel都是独立的,Channel之间互不影响
  • Exchange:用于处理消息路由的核心组件。当生产者发送消息到RabbitMQ时,它会首先被发送到一个Exchange。Exchange根据预定的规则和绑定(Bindings)信息,决定如何路由这个消息
  • Queue:用于存储消息的数据结构。消费者从队列中获取消息进行处理
  • Binding:用于连接Exchange(交换器)和Queue(队列)的路由规则。通过Binding,RabbitMQ能够知道如何根据路由键(Routing Key)和交换器类型将消息正确地路由到相应的队列

使用RabbitMQ

原生客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class BasicTest {
protected ConnectionFactory connectionFactory;
protected Connection connection;
protected Channel channel;

@BeforeEach
public void init() throws IOException, TimeoutException {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("rabbitmq");
connectionFactory.setPassword("wgf123");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ProducerTest extends BasicTest{
@Test
@SneakyThrows
public void send() {
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
String queueName = "test";
String message = "Hello";
channel.queueDeclare(queueName, false, false, false, null);
channel.basicPublish("", queueName, null, message.getBytes());
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
public class ConsumerTest extends BasicTest {

@SneakyThrows
@Test
public void listener() {
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
log.info(message);
};

//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};

String queueName = "test";

/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}

Spring Boot

添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置

1
2
3
4
5
6
spring:
rabbitmq:
port: 5672
host: localhost
username:
password:

创建队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class QueueConfig {
public static final String QUEUE_NAME = "test_queue";

/**
* 简单定义消息队列
* @return
*/
@Bean
public Queue simpleQueue() {
// 不指定交换机则使用默认交换机将消息转发到队列
return QueueBuilder.durable("test_queue").build();
}
}

创建监听器

1
2
3
4
5
6
7
8
9
@Slf4j
@Component
public class SimpleListener {

@RabbitListener(queues = QueueConfig.QUEUE_NAME)
public void receive(String msg) {
log.info(msg);
}
}

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@SpringBootTest
public class QeueuTest {
@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void simpleSend() throws InterruptedException {
rabbitTemplate.convertAndSend(QueueConfig.QUEUE_NAME, "test");
// 暂停等待消息接收
TimeUnit.SECONDS.sleep(5);
}
}

消息分发

多消费者分发

多消费者的消息分发是一种常见的用于提高消息处理能力和实现负载均衡的策略。当多个消费者(Consumers)订阅同一个队列(Queue)时,RabbitMQ默认采用轮询分发策略将队列中的消息分发给各个消费者。多消费者间不保证消息有序,单消费者内消息有序。

多线程分发

为确保每条消息仅被消费一次,在使用直连交换机的场景下,当存在多个消费者或一个消费者启动多个线程进行消息处理时,通常为每个线程分配一个独立的Channel(非线程安全)。这导致消息队列与消费者Channel之间形成一对多的关系,从而使得各个消费者Channel之间产生竞争。

消费者多线程消费

一个消息消费者开启多线程消费,那么每个线程会单独对应一个Channel,多线程消费场景下,不能够保证跨线程间的消息有序性,只能保证本线程消息的有序性

使用@RabbitListener注解的concurrency属性指定线程数

1
@RabbitListener(queues = AckQueueConfig.ACK_QUEUE, concurrency = "5")

轮询分发(默认)

轮询分发是 RabbitMQ 的默认消息分发策略。在这种策略下,RabbitMQ 会按照消息到达的顺序,依次将消息分发给各个消费者。这种方式能够确保所有的消费者能够平均地处理消息,实现负载均衡。

缺点:轮询分发不考虑消费者的实际处理能力和性能差异,可能会导致性能浪费。例如,如果一个消费者处理消息的速度比另一个慢,快速的消费者会在等待新消息时出现空闲,从而导致资源未被充分利用。

验证

每秒发送一条消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@SpringBootTest
public class QeueuTest {
@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void simpleSend() throws InterruptedException {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(QueueConfig.QUEUE_NAME, i + "");
TimeUnit.SECONDS.sleep(1);
}
}
}

定义多个监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
@Component
public class SimpleListener {

@RabbitListener(queues = QueueConfig.QUEUE_NAME)
public void receive(String msg) {
log.info("{}:{}", "receive", msg);
}

@RabbitListener(queues = QueueConfig.QUEUE_NAME)
public void receive2(String msg) {
log.info("{}:{}", "receive2", msg);
}

@RabbitListener(queues = QueueConfig.QUEUE_NAME)
public void receive3(String msg) {
log.info("{}:{}", "receive3", msg);
}
}

执行结果

1
2
3
4
5
6
7
8
9
10
INFO 14984 --- [ntContainer#0-1] com.wgf.demo.listener.SimpleListener     : receive:0
INFO 14984 --- [ntContainer#1-1] com.wgf.demo.listener.SimpleListener : receive2:1
INFO 14984 --- [ntContainer#2-1] com.wgf.demo.listener.SimpleListener : receive3:2
INFO 14984 --- [ntContainer#0-1] com.wgf.demo.listener.SimpleListener : receive:3
INFO 14984 --- [ntContainer#1-1] com.wgf.demo.listener.SimpleListener : receive2:4
INFO 14984 --- [ntContainer#2-1] com.wgf.demo.listener.SimpleListener : receive3:5
INFO 14984 --- [ntContainer#0-1] com.wgf.demo.listener.SimpleListener : receive:6
INFO 14984 --- [ntContainer#1-1] com.wgf.demo.listener.SimpleListener : receive2:7
INFO 14984 --- [ntContainer#2-1] com.wgf.demo.listener.SimpleListener : receive3:8
INFO 14984 --- [ntContainer#0-1] com.wgf.demo.listener.SimpleListener : receive:9

不公平分发

在某些场景下,RabbitMQ的默认轮询分发方式可能不是最优选择。例如,当存在处理速度不一的两个消费者时,轮询方式可能导致快速消费者有大量空闲时间,而慢速消费者则持续繁忙。尽管RabbitMQ默认采用这种公平的分发策略,但在这种情况下,它可能不是最有效的。

为了解决这个问题,我们可以让消费者在完成当前任务并发送确认回执后,才接收下一个消息。这样,RabbitMQ会将新消息分发给当前不忙的消费者。但是,如果所有消费者都在处理任务,且队列中不断有新消息加入,就有可能导致队列积压。此时,我们可以考虑增加消费者数量或调整消息存储策略来解决这个问题。

prefetch 预取值

prefetch值用于限制Consumer消息缓冲区的大小,这对吞吐量有直接影响。

消息的发送本质上是异步的,因此,channel上通常会有多个消息。同时,消费者的手动确认也是异步的,这就产生了一个未确认消息的缓冲区。为避免缓冲区内未确认消息的无限制增长,建议开发人员限制此缓冲区的大小,这可以通过设置预取计数值来实现。

此值定义了channel上允许的未确认消息的最大数量。当未确认消息数量达到此值时,RabbitMQ将停止向该通道发送新消息,直到有至少一个未处理的消息被确认。例如,若通道上有未确认的消息5、6、7、8,且预取计数设置为4,则RabbitMQ不会再向该通道发送新消息,直到有消息被确认。一旦消息被确认,RabbitMQ会立即发送新消息。

消息确认和QoS预取值对吞吐量有显著影响。通常,增加预取值会提高向消费者传递消息的速度。虽然提高预取值会增加传输速率,但同时也会增加已传递但未处理消息的数量,导致消费者的RAM消耗增加. 因此,使用具有高预取值的自动确认模式或手动确认模式时需谨慎,若消费者消费了大量消息但未进行确认,会导致连接节点的内存消耗增大

为了避免这种情况,我们可以设置参数

1
2
3
4
5
6
7
8
9
10
  port: 8081
spring:
rabbitmq:
port: 5672
host: localhost
username: rabbitmq
password: wgf123
listener:
simple:
prefetch: 5 # 动态调整

springBoot默认值是250,当一个队列有多个消费者时,可以根据消费者不同的消费能力灵活配置

demo 启用两个实例,实例1 prefetch = 1,实例2 prefetch = 5,并发送20条消息

结果

实例1

1
2
3
INFO 28572 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener     : receive:2
INFO 28572 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:8
INFO 28572 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:15

实例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener     : receive:1
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:3
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:4
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:5
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:6
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:7
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:9
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:10
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:11
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:12
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:13
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:14
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:16
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:17
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:18
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:19
INFO 28901 --- [ntContainer#5-1] com.wgf.demo.listener.SimpleListener : receive:20

消息队列参数配置

参数名称说明
x-message-ttl消息在队列中过期的时间,单位ms
x-expires队列在多长时间没有被使用后删除
x-max-length限制队列中消息的最大条数。超过设定值后,
新消息会替换旧消息,遵循先进先出原则
x-max-length-bytes设定队列占用内存的最大字节数。达到限制时,
将采用LRU算法删除旧消息
x-dead-letter-routing-key死信队列路由key
x-dead-letter-exchange死信队列交换机,创建queue时参数arguments设置了
x-dead-letter-routing-key和x-dead-letter-exchange,
会在x-message-ttl时间到期后把消息放到x-dead-letter-routing-key
和x-dead-letter-exchange指定的队列中达到延迟队列的目的
x-max-priority具有更高优先级的队列具有较高的优先权,优先级高的消
息具备优先被消费的特权[1,255]
x-queue-modelazt。指定惰性队列 惰性队列会尽可能的将消息存入磁盘中,
而在消费者消费到相应的消息时才会被加载到内存中,它的
一个重要设计目标是能够支持更长的队列,即支持更多的消
息存储。当消费者由于各种各样的原因(比如消费者下线、
跌机、或者由于维护而关闭等)致使长时间不能消费消息而
造成堆积时,惰性队列就很必要了
x-queue-master-locator决定队列master节点的选择策略(集群)
min-masters:选择master queue 数最少的那个服务节点host
client-local:选择与client相连接的那个服务节点host
random:随机分配

创建消息队列

1
2
3
4
5
6
7
8
9
10
/**
* @param name 队列名称
* @param durable 是否持久化,如果设置为true,服务器重启了队列仍然存在(默认为true)
* @param exclusive 是否为独享队列(排他性队列),只有自己可见的队列,即不允许其它用户访问(默认false)
* @param autoDelete 当没有任何消费者使用时,自动删除该队列(默认为false,设置为true即为临时队列)
* @param arguments 其他参数
*/
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,
@Nullable Map<String, Object> arguments) {
}

消息异步发布确认

生产者发送消息,是先发送消息到Exchange,然后Exchange再路由到Queue。这中间就需要确认两个事情,第一,消息是否成功发送到Exchange;第二,消息是否正确的通过Exchange路由到Queue

spring提供了两个回调函数来处理这两种消息发送确认

ConfirmCallback

发布确认

ConfirmCallback 是一个用于处理消息确认的回调接口。它用于确保消息是否成功到达交换机。

  • 消息成功到达交换机

    ​ 如果消息成功发送到交换机,confirm 方法将被调用,ack 参数为 true,表示消息已经被确认。

  • 消息未能到达交换机

    ​ 如果消息未能到达交换机,confirm 方法同样会被调用,但此时 ack 参数为 false

回调参数

  1. CorrelationData correlationData
    • 唯一标识发送的消息,与确认或拒绝的消息关联。
  2. boolean ack
    • true 表示消息成功到达交换机,false 表示发送失败。
  3. String cause
    • 拒绝时包含原因,成功时为 null

源码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@Component
public class MsgConfirmCallback implements RabbitTemplate.ConfirmCallback {

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false
// 如果发送到交换器成功,返回值为还是 true

// 获取消息id
//String messageId = correlationData.getId();
log.info("ConfirmCallback , correlationData = {} , ack = {} , cause = {} ", correlationData, ack, cause);
}
}

ReturnCallback

消息回退

ReturnCallback 是 RabbitMQ 中的一个回调机制,用于处理未能路由到合适队列的消息。只有消息没有被正确路由,才会回调此接口。

回调参数

  • message:未能路由到队列的消息。
  • replyCode:RabbitMQ 返回的错误代码。
  • replyText:RabbitMQ 返回的错误描述。
  • exchange:发送消息时使用的交换机。
  • routingKey:发送消息时使用的路由

实现源码

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
public class MsgReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 获取消息id
// String messageId = message.getMessageProperties().getMessageId();
log.info("ReturnCallback unroutable messages, message = {} , replyCode = {} , replyText = {} , exchange = {} , routingKey = {} ", message, replyCode, replyText, exchange, routingKey);
}
}

整合

添加两个Callback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
public class RabbitmqConfig {
@Autowired
MsgConfirmCallback msgConfirmCallback;

@Autowired
MsgReturnCallback msgReturnCallback;

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
// connectionFactory包含yml配置文件配置信息
RabbitTemplate template = new RabbitTemplate(connectionFactory);

//mandatory参数说明: 交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式; true表示会调用Basic.Return命令返回给生产者; false表示丢弃消息
template.setMandatory(true);

// 消息从 producer 到 exchange 正常则会返回一个 confirmCallback
template.setConfirmCallback(msgConfirmCallback);

// 消息从 exchange–>queue 投递失败则会返回一个 returnCallback
template.setReturnCallback(msgReturnCallback);

return template;
}
}

开启回调配置

1
2
3
4
5
6
7
8
9
server:
port: 8081
spring:
rabbitmq:
port: 5672
host: 121.37.23.172
username: rabbitmq
password: wgf123
publisher-confirm-type: correlated # 发布消息成功到交换器后会触发回调方法

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testCallback() {
//correlationDataId相当于消息的唯一表示
UUID correlationDataId = UUID.randomUUID();
// 这里设置的id是给ConfirmCallback使用的
CorrelationData correlationData = new CorrelationData(correlationDataId.toString());
//发送MQ
rabbitTemplate.convertAndSend(DirectConfig.DIRECT_EXCHANGE, "test-queue", "test", (message) -> {
// 这里设置的id是给 ReturnCallback 使用的
message.getMessageProperties().setMessageId(correlationData.getId());
return message;
}, correlationData);
}

ACK消息确认机制

当消费者从 RabbitMQ 接收到一条消息并成功处理后,消费者需要向 RabbitMQ 发送 ACK 反馈。只有在 RabbitMQ 收到这个 ACK 反馈后,它才会将该消息从队列中删除。

  1. 处理网络不稳定或服务器异常
    • 如果消费者在处理消息时遇到网络不稳定或服务器异常等问题,可能导致无法发送 ACK 反馈。
    • RabbitMQ 将认为这条消息没有被正常消费,于是将其重新放回队列中等待处理。
  2. 集群环境下的处理
    • 在 RabbitMQ 集群中,如果一个消费者未能发送 ACK 反馈,RabbitMQ 不会无限期等待。
    • 相反,它会立即将该消息推送给在线的其他消费者,以确保消息和任务不会丢失。
  3. 消息的删除条件
    • 消费者成功处理消息并发送 ACK 反馈。
    • RabbitMQ 确认接收到了这个 ACK 反馈。

消息的ACK确认机制默认是打开的(自动ACK)

消息自动重新入列

Channel断开,未ACK的消息重新入列

如果消费者因某些原因失去与 RabbitMQ 的连接(例如通道关闭、连接关闭或TCP连接丢失),导致消息未能发送 ACK 确认,RabbitMQ 将察觉到消息未被完全处理,并会将其重新放回队列。如果此时有其他可用的消费者可以处理这条消息,RabbitMQ 将尽快将消息重新分发给另一个消费者。这样,即使某个消费者偶尔故障,也可以确保不会丢失任何消息。这个机制保障了消息的可靠传递和处理,即使在不稳定的网络环境或消费者故障的情况下也能保持消息的完整性。

自动ack

优点

  • 简单易用:不需要显式的确认消息,减少了开发的复杂性。
  • 性能较高:由于不涉及额外的确认步骤,处理速度较快,适用于一些不需要严格消息确认的场景。

缺点

  • 消息丢失风险:最大的缺点是消息一旦被投递给消费者就被认为已处理,如果消费者在处理消息时发生故障,消息将会丢失。
  • 无法保证可靠性:无法确保每条消息都被成功处理,适用于不太关键的场景。

手动ack

优点

  • 消息可靠性:消费者可以在处理消息后显式确认,确保消息已经被成功处理,减少了消息丢失的风险。
  • 灵活性:可以自定义确认的时机,例如,只有在下游业务成功完成后才确认消息。

缺点

  • 开发复杂性:需要显式调用API来确认消息,增加了开发的复杂性。
  • 性能开销:相比自动ACK,手动ACK可能会引入一些性能开销,因为需要额外的确认步骤。

使用手动ack

添加配置

1
2
3
4
spring:
rabbitmq:
listener:
acknowledge-mode: manual # ack应答设置为手动模式

监听器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
@Component
public class AckListener {

@RabbitListener(queues = AckQueueConfig.ACK_QUEUE)
public void listener(String message, Channel channel, Message messageEntity) {
int num = Integer.valueOf(message);

try {
// 让值大于5抛异常进行Nack, 消息就会重回队列头部进行下次消费,这里的代码num>5会是一个死循环消费
if (num >= 5) {
throw new RuntimeException();
}

log.info("队列:{} 接收到消息:{}", AckQueueConfig.ACK_QUEUE, message);

/**
* 消息消费确认
* 如果客户端在线没有签收这条Message,则此消息进入Unacked状态,此时监听器阻塞等待消息确认,不推送新Message
* 如果待消息确认并且客户端下线,下次客户端上线重新推送上次Unacked状态Message
*/
channel.basicAck(messageEntity.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("消费消息异常", e);

/**
* 第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
* 第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的消息都会被处理
* 第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
*/
try {
// nack 消息回到队列头部后会被重新消费,如果多次nack的会无限循环,卡在这里
channel.basicNack(messageEntity.getMessageProperties().getDeliveryTag(), false,true);
} catch (IOException ex) {
// TODO 重试逻辑或业务补偿(发往死信队列或Redis等)
log.error("nack失败", ex);
}
}
}
}

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void send(String msgId, String exchange, String routingKey, Object msg) {
CorrelationData correlationData = new CorrelationData(msgId);
//发送MQ
rabbitTemplate.convertAndSend(exchange, routingKey, msg, (message) -> {
message.getMessageProperties().setMessageId(correlationData.getId());
return message;
}, correlationData);
}

@Test
public void ackTest() {
IntStream.range(0, 6).forEach(line -> {
//correlationDataId相当于消息的唯一表示
String msgId = UUID.randomUUID().toString();
this.send(msgId, DirectConfig.DIRECT_EXCHANGE, AckQueueConfig.ROUTING_KEY, line);
});
}

ack重试配置

retry只能在自动ack模式下使用

重试并不是RabbitMQ重新发送了消息,仅仅是spring封装消费者内部进行了重试,换句话说就是重试跟mq没有任何关系;配置重试次数,当超过重试次数消息投递到异常队列

实现MessageRecoverer接口的recover方法,当重试次数上限则调用该方法处理

添加配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
server:
port: 8081
spring:
rabbitmq:
port: 5672
host: localhost
username: rabbitmq
password: wgf123
publisher-confirm-type: correlated # 发布消息成功到交换器后会触发回调方法
listener:
simple:
prefetch: 5
#acknowledge-mode: manual # 手动ack模式(关闭手动ack)
retry: # 重试配置
enabled: true
max-attempts: 5 # 最大重试次数
max-interval: 10000 # 重试最大间隔时间
initial-interval: 2000 # 重试初始间隔时间
multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
default-requeue-rejected: true # 重试次数超过上面的设置之后是否丢弃

队列和重试策略配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Configuration
public class RetryConfig {
public static final String RETRY_EXCHANGE = "retry.queue";
public static final String RETRY_QUEUE = "retry.queue";
public static final String ROUTING_KEY = "retry";

public static final String ERROR_QUEUE = "retry.error";
public static final String ERROR_ROUTING_KEY = "error";

@Autowired
RabbitTemplate rabbitTemplate;

@Bean
public DirectExchange retryExchange() {
return new DirectExchange(RETRY_EXCHANGE);
}

@Bean
public Queue retryQueue() {
return new Queue(RETRY_QUEUE);
}

@Bean
public Binding bindRetryQueue(Queue retryQueue, DirectExchange retryExchange) {
return BindingBuilder.bind(retryQueue).to(retryExchange).with(ROUTING_KEY);
}

/**
* 异常队列
*
* @return
*/
@Bean
public Queue errorQueue() {
return new Queue(ERROR_QUEUE);
}

/**
* 交换机绑定异常队列
*
* @param errorQueue
* @param directExchange
* @return
*/
@Bean
public Binding bindErrorQueue(Queue errorQueue, DirectExchange retryExchange) {
return BindingBuilder.bind(errorQueue).to(retryExchange).with(ERROR_ROUTING_KEY);
}


/**
* 调试时,配置打开
* MessageRecoverer 的实现类。达到重试次数后将消息丢第到异常队列
*
* @return
*/
@Bean
public MessageRecoverer republishMessageRecoverer() {
// 重试次数上限将消息丢到异常队列
return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, ERROR_ROUTING_KEY);
}
}

消息监听器

1
2
3
4
5
6
7
8
9
10
11
12
@Slf4j
@Component
public class RetryListener {
private AtomicInteger count = new AtomicInteger(0);

@RabbitListener(queues = RetryConfig.RETRY_QUEUE)
public void listener(String message, Channel channel, Message messageEntity) throws IOException {
log.info("消费次数:{}", count.incrementAndGet());
// 必须抛出异常才能触发重试次数
throw new RuntimeException();
}
}

发送消息

1
2
3
4
5
@Test
public void retryTest() throws InterruptedException {
this.rabbitTemplate.convertAndSend(RetryConfig.RETRY_EXCHANGE, RetryConfig.ROUTING_KEY, "test");
TimeUnit.SECONDS.sleep(60);
}

日志

1
2
3
4
5
6
2022-02-04 16:01:32.156  INFO 13932 --- [tContainer#13-1] com.wgf.demo.listener.RetryListener      : 消费次数:1
2022-02-04 16:01:34.157 INFO 13932 --- [tContainer#13-1] com.wgf.demo.listener.RetryListener : 消费次数:2
2022-02-04 16:01:38.164 INFO 13932 --- [tContainer#13-1] com.wgf.demo.listener.RetryListener : 消费次数:3
2022-02-04 16:01:46.167 INFO 13932 --- [tContainer#13-1] com.wgf.demo.listener.RetryListener : 消费次数:4
2022-02-04 16:01:56.168 INFO 13932 --- [tContainer#13-1] com.wgf.demo.listener.RetryListener : 消费次数:5
2022-02-04 16:01:56.169 WARN 13932 --- [tContainer#13-1] o.s.a.r.retry.RepublishMessageRecoverer : Republishing failed message to exchange 'retry.queue' with routing key error

消费次数上限丢到error队列

RabbitMQ持久化

默认情况下,RabbitMQ将exchange、queue、message等数据存储在内存中,这意味着如果RabbitMQ在重启、关闭或宕机时,所有信息都会丢失。

为了解决这个问题,RabbitMQ提供了持久化功能。通过启用持久化,当RabbitMQ在发生重启、关闭或宕机后重新启动时,它可以从硬盘中恢复exchange、queue、message等数据。

持久化的配置方式如下:

  • exchange 持久化,在声明时指定 durable 为 true
  • queue 持久化,在声明时指定 durable 为 true
  • message 持久化,在投递时通过参数指定消息持久化,持久化到磁盘

需要注意的是,exchange和queue的持久化确保了它们自身的元数据不会因异常而丢失,但并不保证其中的message不会丢失。要确保message不会丢失,必须同时将message也标记为持久化。

注意

  • 如果 exchange 和 queue 两者之间有一个持久化,一个非持久化,就不允许建立绑定
  • 如果 exchange 和 queue 都是持久化的,那么它们之间的 binding 也是持久化的
  • exchange 和 queue 持久化的是它们的元数据,保证RabbitMQ重启后 exchange 和 queue 能从磁盘重新加载

exchange 持久化

代码持久化

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public DirectExchange directExchange() {
// 默认持久化
// return new DirectExchange(DIRECT_EXCHANGE);

/**
* 第一个参数:交换机名称
* 第二个参数:是否持久化
* 第三个参数:是否自动删除,条件:有队列或者交换器绑定了本交换器,然后所有队列或交换器都与本交换器解除绑定,autoDelete=true时,此交换器就会被自动删除
*/
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}

控制台持久化

queue 持久化

代码持久化

1
2
3
4
5
6
7
8
9
10
/**
* @param name 队列名称
* @param durable 是否持久化,如果设置为true,服务器重启了队列仍然存在(默认为true)
* @param exclusive 是否为独享队列(排他性队列),只有自己可见的队列,即不允许其它用户访问(默认false)
* @param autoDelete 当没有任何消费者使用时,自动删除该队列(默认为false,设置为true即为临时队列)
* @param arguments 其他参数
*/
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,
@Nullable Map<String, Object> arguments) {
}

控制台持久化

消息持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void persistenceTest() {
// 消息持久化
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
};

rabbitTemplate.convertAndSend(QueueConfig.QUEUE_NAME, "test".getBytes(), messagePostProcessor);
}

死信队列 DLX

死信:无法被消费的消息

死信队列:专门存放无法被消费的消息

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumerqueue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的来源

  • 消息 TTL 过期
  • 队列达到最大限制淘汰旧消息
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

死信队列Example

交换机和队列配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@Configuration
public class DeadConfig {

/**
* 死信配置
*/
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String DEAD_QUEUE = "dead.queue";
public static final String DEAD_ROUTING_KEY = "dead";

/**
* 正常交换机与队列配置
*/
public static final String NORMAL_EXCHANGE = "normal_exchange";

/**
* 有TTL过期的队列
*/
public static final String TTL_QUEUE = "dead.ttl.queue";
public static final String TTL_ROUTING_KEY = "ttl";

/**
* 有长度的队列
*/
public static final String LENGTH_QUEUE = "dead.length.queue";
public static final String LENGTH_ROUTING_KEY = "length";

/**
* 拒绝队列
*/
public static final String NACK_QUEUE = "dead.nack.queue";
public static final String NACK_ROUTING_KEY = "nack";

/**
* 创建死信交换机
*
* @return
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}

/**
* 创建死信队列
*
* @return
*/
@Bean
public Queue deadQueue() {
return new Queue(DEAD_QUEUE);
}

/**
* 绑定死信队列
*
* @param directQueue
* @param directExchange
* @return
*/
@Bean
public Binding bindDeadtQueue(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
}

/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}

/**
* TTL过期队列
*/
@Bean
public Queue deadTtlQueue() {
// 正常队列绑定死信队列
Map<String, Object> args = new HashMap<>(8);
// 设置TTL过期
args.put("x-message-ttl", 10000);

// 声明死信队列路由键
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
// 声明死信队列交换机,消息异常投递到此交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
return new Queue(TTL_QUEUE, true, false, false, args);
}

/**
* 有长度限制的队列
* @return
*/
@Bean
public Queue deadLengthQueue() {
// 正常队列绑定死信队列
Map<String, Object> args = new HashMap<>(8);

// 设置队列最大长度
args.put("x-max-length", 4);

// 声明死信队列路由键
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
// 声明死信队列交换机,消息异常投递到此交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
return new Queue(LENGTH_QUEUE, true, false, false, args);
}

/**
* 拒绝队列,监听时NACK
* @return
*/
@Bean
public Queue deadNackQueue() {
// 正常队列绑定死信队列
Map<String, Object> args = new HashMap<>(8);
// 声明死信队列路由键
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
// 声明死信队列交换机,消息异常投递到此交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
return new Queue(NACK_QUEUE, true, false, false, args);
}

/**
* TTL队列绑定交换机
*
* @param deadTtlQueue
* @param normalExchange
* @return
*/
@Bean
public Binding bingDeadTtlQueue(Queue deadTtlQueue, DirectExchange normalExchange) {
return BindingBuilder.bind(deadTtlQueue).to(normalExchange).with(TTL_ROUTING_KEY);
}

/**
* LENGTH队列绑定交换机
* @param deadLengthQueue
* @param normalExchange
* @return
*/
@Bean
public Binding bingDeadLengthQueue(Queue deadLengthQueue, DirectExchange normalExchange) {
return BindingBuilder.bind(deadLengthQueue).to(normalExchange).with(LENGTH_ROUTING_KEY);
}

/**
* NACK队列绑定交换机
* @param deadNackQueue
* @param normalExchange
* @return
*/
@Bean
public Binding bingDeadNackQueue(Queue deadNackQueue, DirectExchange normalExchange) {
return BindingBuilder.bind(deadNackQueue).to(normalExchange).with(NACK_ROUTING_KEY);
}
}

监听器配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
@Component
public class DeadListener {
/**
* 监听死信队列
*/
@RabbitListener(queues = DeadConfig.DEAD_QUEUE)
public void deadListener(String message, Channel channel, Message messageEntity) throws IOException {
log.info("死信队列接受到死信:{}", message);
channel.basicAck(messageEntity.getMessageProperties().getDeliveryTag(), false);
}

/**
* NACK 监听
*/
@SneakyThrows
@RabbitListener(queues = DeadConfig.NACK_QUEUE)
public void listener(String message, Channel channel, Message messageEntity) throws IOException {
// 拒收消息
log.info("队列:{} 拒收消息:{}", DeadConfig.TTL_QUEUE, message);
// 两种决绝方式都能重新投递死信队列
channel.basicNack(messageEntity.getMessageProperties().getDeliveryTag(), false, false);
// channel.basicReject(messageEntity.getMessageProperties().getDeliveryTag(), false);
}
}

TTL过期

配置正常队列与死信队列绑定

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
@Test
@SneakyThrows
public void deadTtlTest() {
IntStream.range(0, 10).forEach(line -> {
int ttl = 5000;
String msg = "delay:" + line;

rabbitTemplate.convertAndSend(DeadConfig.NORMAL_EXCHANGE, DeadConfig.TTL_ROUTING_KEY, msg);
log.info("发送一条时长: {} 毫秒信息给队列: {} 内容: {}", ttl, DeadConfig.TTL_QUEUE, msg);
});
TimeUnit.SECONDS.sleep(20);
}

图中可以看dead.ttl.queue指定了DLX DLK,当消息TTL过期,则会将消息转发到dead_exchange交换机然后根据配置的死信路由键路由到dead.queue队列

1
2
3
4
5
6
7
8
9
10
INFO 18352 --- [ntContainer#0-1] com.wgf.demo.listener.DeadListener       : 死信队列接受到死信:delay:0
INFO 18352 --- [ntContainer#0-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:delay:1
INFO 18352 --- [ntContainer#0-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:delay:2
INFO 18352 --- [ntContainer#0-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:delay:3
INFO 18352 --- [ntContainer#0-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:delay:4
INFO 18352 --- [ntContainer#0-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:delay:5
INFO 18352 --- [ntContainer#0-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:delay:6
INFO 18352 --- [ntContainer#0-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:delay:7
INFO 18352 --- [ntContainer#0-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:delay:8
INFO 18352 --- [ntContainer#0-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:delay:9

队列达到最大长度

1
2
3
4
5
6
7
8
@Test
@SneakyThrows
public void deadLengthTest() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(DeadConfig.NORMAL_EXCHANGE, DeadConfig.LENGTH_ROUTING_KEY, i + "");
}
TimeUnit.SECONDS.sleep(10);
}

先看队列dead.length.queue消息,剩余4条,和配置的容量符合,编号0-5的消息淘汰后被转发到死信队列

1
2
3
4
5
6
INFO 16096 --- [ntContainer#1-1] com.wgf.demo.listener.DeadListener       : 死信队列接受到死信:0
INFO 16096 --- [ntContainer#1-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:2
INFO 16096 --- [ntContainer#1-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:1
INFO 16096 --- [ntContainer#1-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:4
INFO 16096 --- [ntContainer#1-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:5
INFO 16096 --- [ntContainer#1-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:3

队列溢出的默认行为

当队列的消息超过设置的最大长度或大小时,RabbitMQ默认的做法是将处于队列头部的信息(队列中最老的消息)丢弃或变成死信

消息被拒

监听器Nack,拒绝消息

1
2
3
4
5
6
7
8
@Test
@SneakyThrows
public void deadNackTest() {
for (int i = 0; i < 3; i++) {
rabbitTemplate.convertAndSend(DeadConfig.NORMAL_EXCHANGE, DeadConfig.NACK_ROUTING_KEY, i + "");
}
TimeUnit.SECONDS.sleep(10);
}

最终所有拒收的消息都被转发到死信队列中

1
2
3
INFO 7464 --- [ntContainer#1-1] com.wgf.demo.listener.DeadListener       : 死信队列接受到死信:0
INFO 7464 --- [ntContainer#1-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:1
INFO 7464 --- [ntContainer#1-1] com.wgf.demo.listener.DeadListener : 死信队列接受到死信:2

basicReject / basicNack / basicRecover区别

Reject

channel.basicReject(deliveryTag, true);

basic.reject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列一次只能拒绝一条

Nack

channel.basicNack(deliveryTag, false, true);

basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue与basic.reject区别就是同时支持多个消息,nack后消息重新回到队列头部重新消费,消息还是会被自己重新消费

Recover

channel.basicRecover(true);

basic.recover是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。

延迟队列

延迟队列是一种用于实现消息延迟传递的特殊队列类型。它允许你将消息发送到队列,并在一定的延迟时间之后才将消息投递给消费者。这对于需要执行计划任务、调度任务或实现消息重试机制等情况非常有用。

延迟队列使用场景

  1. 订单超时取消:如果用户下单后十分钟内未支付,系统会自动取消订单,而不需要等待用户手动取消,提供更好的用户体验。
  2. 店铺提醒:新创建的店铺如果在十天内没有上传商品,系统会自动发送消息提醒店主,确保店铺及时添加商品。
  3. 用户登录提醒:用户注册成功后,如果三天内没有登录,系统将通过短信提醒用户,激发用户活跃度。
  4. 退款处理通知:用户发起退款申请后,如果三天内没有得到处理,系统会通知相关运营人员,保障退款流程的顺利进行。
  5. 会议提醒:预定的会议开始前十分钟,系统会自动通知所有与会人员,确保大家按时参加会议,提高会议的准时性。

在这些情况下,延迟队列的作用是在特定的时间点触发相应的任务,而不需要持续不断地查询数据库,从而提高了系统的效率和性能,尤其在处理大规模数据和有时限要求的情况下,延迟队列显得尤为重要和高效。

代码架构图

用TTL和死信队列实现延迟队列功能

原生延迟队列的核心思想是利用消息的 TTL死信队列 来实现。

代码结构可以参考 ttl过期 ,不过需要新增一个队列并将其绑定到死信队列上。重要的是,不监听正常队列,而是监听死信队列。这样,RabbitMQ会在消息的TTL过期后自动将消息转发到死信队列,从而实现延迟效果。

缺点

需要注意的是,这种方法的缺点是每增加一个新的时间需求就需要新增一个队列。例如,如果需要一个小时后处理,就需要创建一个TTL为一个小时的队列。对于一些需要不同延迟时间的场景,可能需要创建大量队列,这会导致管理复杂性增加。

原生延迟队列缺点

当队列中的消息按照它们的TTL(生存时间)排列时,如果队列头部的消息的TTL比后面的消息要长,那么头部的消息会阻塞后面的消息。

缺点

简单来说,如果你使用消息属性上设置TTL的方式,RabbitMQ只会检查队列中的第一个消息是否过期,如果过期,它会被移到死信队列中。问题在于,如果第一个消息的延迟时间很长,而第二个消息的延迟时间很短,第二个消息并不会优先执行,因为RabbitMQ只检查队列头部的消息是否过期。这可能导致消息执行的顺序不符合你的预期。

添加配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Configuration
public class TtlQueueConfig {
/**
* 死信队列
*/
public static final String DEAD_EXCHANGE = "dead_ttl_exchange";
public static final String DEAD_QUEUE = "dead.ttl.overdue.queue";
public static final String DEAD_ROUTING_KEY = "overdue";

/**
* 正常队列,发送消息时,在消息设置TTL过期
*/
public static final String TTL_EXCHANGE = "ttl_exchange";
public static final String TTL_QUEUE = "ttl.queue";
public static final String TTL_ROUTING_KEY = "ttl";

/**
* 创建死信交换机
*/
@Bean
public DirectExchange deadTtlExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}

/**
* 创建死信队列,正常队列TTL过期将死信转发到死信队列
*/
@Bean
public Queue deadTtlOverdueQueue() {
return new Queue(DEAD_QUEUE);
}

/**
* 绑定死信队列
*/
@Bean
public Binding bindDeadTtlOverdueQueue(Queue deadTtlOverdueQueue, DirectExchange deadTtlExchange) {
return BindingBuilder.bind(deadTtlOverdueQueue).to(deadTtlExchange).with(DEAD_ROUTING_KEY);
}

/**
* 创建正常交换机
*/
@Bean
public DirectExchange ttlExchange() {
return new DirectExchange(TTL_EXCHANGE);
}

/**
* 创建正常队列
*/
@Bean
public Queue ttlQueue() {
// 正常队列绑定死信队列
Map<String, Object> args = new HashMap<>(8);
// 声明死信队列交换机,消息异常投递到此交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 声明死信队列路由键
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
return new Queue(TTL_QUEUE, true, false, false, args);
}

/**
* 正常队列绑定交换机
*/
@Bean
public Binding bindTtlQueue(Queue ttlQueue, DirectExchange ttlExchange) {
return BindingBuilder.bind(ttlQueue).to(ttlExchange).with(TTL_ROUTING_KEY);
}
}

添加死信监听器

1
2
3
4
5
6
7
8
@Slf4j
@Component
public class TtlListener {
@RabbitListener(queues = TtlQueueConfig.DEAD_QUEUE)
public void receive(String msg) {
log.info("死信队列 {} 接收信息:{}", TtlQueueConfig.DEAD_QUEUE, msg);
}
}

发送动态TTL消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void ttlMsgTest() throws InterruptedException {
IntStream.range(0, 4).forEach(line -> {
int ttl = 60000 - (line * 5000);
String msg = "delay:" + line;

rabbitTemplate.convertAndSend(TtlQueueConfig.TTL_EXCHANGE, TtlQueueConfig.TTL_ROUTING_KEY, msg, correlationData -> {
// 动态设置消息ttl
correlationData.getMessageProperties().setExpiration(String.valueOf(ttl));
return correlationData;
});


log.info("发送一条时长: {} 毫秒信息给延迟队列: {} 内容: {}", ttl, TtlQueueConfig.TTL_QUEUE, msg);
});
TimeUnit.SECONDS.sleep(70);
}

生产者log

1
2
3
4
19:09:32.706  INFO 7708 --- [           main] com.wgf.demo.QeueuTest                   : 发送一条时长: 60000 毫秒信息给延迟队列: ttl.queue  内容: delay:0
19:09:32.733 INFO 7708 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 55000 毫秒信息给延迟队列: ttl.queue 内容: delay:1
19:09:32.733 INFO 7708 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 50000 毫秒信息给延迟队列: ttl.queue 内容: delay:2
19:09:32.756 INFO 7708 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 45000 毫秒信息给延迟队列: ttl.queue 内容: delay:3

消费者log

1
2
3
4
19:10:32.736  INFO 7708 --- [ntContainer#0-1] com.wgf.demo.listener.TtlListener        : 死信队列 dead.ttl.overdue.queue 接收信息:delay:0
19:10:32.736 INFO 7708 --- [ntContainer#0-1] com.wgf.demo.listener.TtlListener : 死信队列 dead.ttl.overdue.queue 接收信息:delay:1
19:10:32.736 INFO 7708 --- [ntContainer#0-1] com.wgf.demo.listener.TtlListener : 死信队列 dead.ttl.overdue.queue 接收信息:delay:2
19:10:32.737 INFO 7708 --- [ntContainer#0-1] com.wgf.demo.listener.TtlListener : 死信队列 dead.ttl.overdue.queue 接收信息:delay:3

对比生产者和消费者的日志时间不难看出,TTL时间比第一条时间短的消息全部被第一条阻塞

插件实现延迟队列

上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的TTL,并使其在设置的TTL时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题

安装延时队列插件(docker)

  • rabbitmq在容器中的插件目录:/opt/rabbitmq/plugins

  • Tags · rabbitmq/rabbitmq-delayed-message-exchange · GitHub,下载 rabbitmq_delayed_message_exchange 对应版本插件,笔者是在页面获取下载链接使用wget命令下载 3.8.9下载地址

  • 修改文件的权限: chmod 755 rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez

  • 执行命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    注意:不需要带插件的版本和文件后缀.ez

    安装成功日志

  • 查看已安装的插件:rabbitmq-plugins list

  • 重启RabbitMQ docker容器, 查看插件是否生效

代码实现

定义队列和交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Configuration
public class DelayedConfig {
public static final String DELAYED_EXCHANGE = "delayed_exchange";
public static final String DELAYED_QUEUE = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "Delayed";

@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE);
}

@Bean
public CustomExchange delayedExchange() {

// 延迟交换机类型
Map<String, Object> args = new HashMap<>(8);
args.put("x-delayed-type", "direct");

/**
* 1.交换机名称
* 2.交换机类型
* 3.是否需要持久化
* 4.是否自动删除
* 5.其他参数
*/
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message",true, false, args);
}

/**
* 延迟交换机绑定队列
* @param delayedQueue
* @param delayedExchange
* @return
*/
@Bean
public Binding bindDelayedQueue(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}

监听器

1
2
3
4
5
6
7
8
9
@Slf4j
@Component
public class DelayedListener {
// 注释手动ack模式
@RabbitListener(queues = DelayedConfig.DELAYED_QUEUE)
public void listener(String msg) {
log.info("延迟队列 {} 接收信息:{}", DelayedConfig.DELAYED_QUEUE, msg);
}
}

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void delayedTest() throws InterruptedException {
IntStream.range(0, 10).forEach(line -> {
int delay = 60000 - (line * 5000);
String msg = "delay:" + line;

rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, DelayedConfig.DELAYED_ROUTING_KEY, msg, correlationData -> {
// 设置消息的延迟时间 单位ms
correlationData.getMessageProperties().setDelay(delay);
return correlationData;
});

log.info("发送一条时长: {} 毫秒信息给延迟队列: {} 内容: {}", delay, DelayedConfig.DELAYED_QUEUE, msg);
});
TimeUnit.SECONDS.sleep(70);
}

生产者log

1
2
3
4
5
6
7
8
9
10
20:05:29.763  INFO 14100 --- [           main] com.wgf.demo.QeueuTest                   : 发送一条时长: 60000 毫秒信息给延迟队列: delayed.queue  内容: delay:0
20:05:29.785 INFO 14100 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 55000 毫秒信息给延迟队列: delayed.queue 内容: delay:1
20:05:29.786 INFO 14100 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 50000 毫秒信息给延迟队列: delayed.queue 内容: delay:2
20:05:29.808 INFO 14100 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 45000 毫秒信息给延迟队列: delayed.queue 内容: delay:3
20:05:29.808 INFO 14100 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 40000 毫秒信息给延迟队列: delayed.queue 内容: delay:4
20:05:29.808 INFO 14100 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 35000 毫秒信息给延迟队列: delayed.queue 内容: delay:5
20:05:29.831 INFO 14100 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 30000 毫秒信息给延迟队列: delayed.queue 内容: delay:6
20:05:29.832 INFO 14100 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 25000 毫秒信息给延迟队列: delayed.queue 内容: delay:7
20:05:29.832 INFO 14100 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 20000 毫秒信息给延迟队列: delayed.queue 内容: delay:8
20:05:29.832 INFO 14100 --- [ main] com.wgf.demo.QeueuTest : 发送一条时长: 15000 毫秒信息给延迟队列: delayed.queue 内容: delay:9

消费者log

1
2
3
4
5
6
7
8
9
10
20:05:44.852  INFO 14100 --- [ntContainer#3-1] com.wgf.demo.listener.DelayedListener    : 延迟队列 delayed.queue 接收信息:delay:9
20:05:49.845 INFO 14100 --- [ntContainer#3-1] com.wgf.demo.listener.DelayedListener : 延迟队列 delayed.queue 接收信息:delay:8
20:05:54.844 INFO 14100 --- [ntContainer#3-1] com.wgf.demo.listener.DelayedListener : 延迟队列 delayed.queue 接收信息:delay:7
20:05:59.844 INFO 14100 --- [ntContainer#3-1] com.wgf.demo.listener.DelayedListener : 延迟队列 delayed.queue 接收信息:delay:6
20:06:04.821 INFO 14100 --- [ntContainer#3-1] com.wgf.demo.listener.DelayedListener : 延迟队列 delayed.queue 接收信息:delay:5
20:06:09.821 INFO 14100 --- [ntContainer#3-1] com.wgf.demo.listener.DelayedListener : 延迟队列 delayed.queue 接收信息:delay:4
20:06:14.821 INFO 14100 --- [ntContainer#3-1] com.wgf.demo.listener.DelayedListener : 延迟队列 delayed.queue 接收信息:delay:3
20:06:19.799 INFO 14100 --- [ntContainer#3-1] com.wgf.demo.listener.DelayedListener : 延迟队列 delayed.queue 接收信息:delay:2
20:06:24.798 INFO 14100 --- [ntContainer#3-1] com.wgf.demo.listener.DelayedListener : 延迟队列 delayed.queue 接收信息:delay:1
20:06:29.776 INFO 14100 --- [ntContainer#3-1] com.wgf.demo.listener.DelayedListener : 延迟队列 delayed.queue 接收信息:delay:0

从log上看,延迟队列实现了消息粒度的TTL

延迟交换机可以看正在被延迟的消息条数

x-delayed-type 取值

  • direct
  • fanout
  • topic
  • headers

总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。 当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

优先级队列

优先级队列是一种队列类型,它允许你为消息设置不同的优先级,以便按照这些优先级来处理消息,越高优先级的消息可能越早被处理。

消费者需要等待所有消息都进入队列后才能开始消费。这是因为要进行消息的优先级排序,队列必须拥有足够的消息来进行比较。因此,只有当队列中积累了足够的消息后,系统才能按照消息的优先级有序地处理它们。

使用场景

有消息堆积则按优先级排序,优先级高的先执行

  1. 订单处理:在电子商务平台上,处理订单可能需要不同的速度,以应对大客户和普通客户的需求。通过为订单设置不同的优先级,可以确保及时处理高价值订单。
  2. 任务调度:在任务调度系统中,某些任务可能比其他任务更重要或更紧急。使用优先级队列可以确保高优先级任务优先执行,以满足业务需求。
  3. 通知和提醒:在需要发送通知或提醒的应用中,例如发送短信或电子邮件提醒,您可以使用优先级队列来处理即时通知和紧急提醒,确保它们在时间上得到优先处理。
  4. 资源分配:在资源有限的情况下,可以使用优先级队列来分配资源,确保高优先级任务或消息得到优先满足。

代码实现

队列配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Configuration
public class PriorityConfig {
public static final String PRIORITY_QUEUE = "priority.queue";
public static final String PRIORITY_EXCHANGE = "priority.queue";
public static final String ROUTING_KEY = "priority";

/**
* 创建队列设置优先级
*/
@Bean
public Queue priorityQueue() {
Map<String, Object> args = new HashMap<>(8);
// 设置队列的优先级,取值[0~255] 建议[0~10], 如果有消息堆积则会按照优先级排序,越高的越快被消费
args.put("x-max-priority", 10);
return new Queue(PRIORITY_QUEUE, true, false, false, args);
}

/**
* 创建交换机
*/
@Bean
public DirectExchange priorityExchange() {
// 默认持久化
return new DirectExchange(PRIORITY_EXCHANGE, true, false);
}

/**
* 绑定队列
*
* @param directQueue
* @param directExchange
* @return
*/
@Bean
public Binding bindPriorityQueue(Queue priorityQueue, DirectExchange priorityExchange) {
return BindingBuilder.bind(priorityQueue).to(priorityExchange).with(ROUTING_KEY);
}
}

消息监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@Component
public class PriorityListener {
/**
* 要看到效果,必选让消息全部发送到队列后再进行监听消费,让消息有排序的时间,实时消费排序效果不明显
*
* @param msg
*/
@RabbitListener(queues = PriorityConfig.PRIORITY_QUEUE)
public void receive(String msg) {
log.info("队列 {} 接收信息:{}", PriorityConfig.PRIORITY_QUEUE, msg);
}
}

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void priorityTest() {
IntStream.range(1, 10).forEach(line -> {
int temp = line;

// 能被2整除的优先级最高
if (line % 2 == 0) {
temp = 10;
}

int priority = temp;
String msg = "优先级:" + priority;

rabbitTemplate.convertAndSend(PriorityConfig.PRIORITY_EXCHANGE, PriorityConfig.ROUTING_KEY, msg, correlationData -> {
// 设置队列优先级
correlationData.getMessageProperties().setPriority(priority);
return correlationData;
});

log.info("发送一条优先级: {} 信息给队列: {} 内容: {}", priority, PriorityConfig.PRIORITY_QUEUE, msg);
});
}

生产者log

1
2
3
4
5
6
7
8
9
22:29:31.274  INFO 14480 --- [           main] com.wgf.demo.QeueuTest                   : 发送一条优先级: 1 信息给队列: priority.queue  内容: 优先级:1
22:29:31.297 INFO 14480 --- [ main] com.wgf.demo.QeueuTest : 发送一条优先级: 10 信息给队列: priority.queue 内容: 优先级:10
22:29:31.297 INFO 14480 --- [ main] com.wgf.demo.QeueuTest : 发送一条优先级: 3 信息给队列: priority.queue 内容: 优先级:3
22:29:31.321 INFO 14480 --- [ main] com.wgf.demo.QeueuTest : 发送一条优先级: 10 信息给队列: priority.queue 内容: 优先级:10
22:29:31.321 INFO 14480 --- [ main] com.wgf.demo.QeueuTest : 发送一条优先级: 5 信息给队列: priority.queue 内容: 优先级:5
22:29:31.321 INFO 14480 --- [ main] com.wgf.demo.QeueuTest : 发送一条优先级: 10 信息给队列: priority.queue 内容: 优先级:10
22:29:31.345 INFO 14480 --- [ main] com.wgf.demo.QeueuTest : 发送一条优先级: 7 信息给队列: priority.queue 内容: 优先级:7
22:29:31.345 INFO 14480 --- [ main] com.wgf.demo.QeueuTest : 发送一条优先级: 10 信息给队列: priority.queue 内容: 优先级:10
22:29:31.345 INFO 14480 --- [ main] com.wgf.demo.QeueuTest : 发送一条优先级: 9 信息给队列: priority.queue 内容: 优先级:9

消费者log

1
2
3
4
5
6
7
8
9
22:30:29.740  INFO 21004 --- [ntContainer#0-1] com.wgf.demo.listener.PriorityListener   : 队列 priority.queue 接收信息:优先级:10
22:30:29.742 INFO 21004 --- [ntContainer#0-1] com.wgf.demo.listener.PriorityListener : 队列 priority.queue 接收信息:优先级:10
22:30:29.742 INFO 21004 --- [ntContainer#0-1] com.wgf.demo.listener.PriorityListener : 队列 priority.queue 接收信息:优先级:10
22:30:29.742 INFO 21004 --- [ntContainer#0-1] com.wgf.demo.listener.PriorityListener : 队列 priority.queue 接收信息:优先级:10
22:30:29.743 INFO 21004 --- [ntContainer#0-1] com.wgf.demo.listener.PriorityListener : 队列 priority.queue 接收信息:优先级:9
22:30:29.754 INFO 21004 --- [ntContainer#0-1] com.wgf.demo.listener.PriorityListener : 队列 priority.queue 接收信息:优先级:7
22:30:29.754 INFO 21004 --- [ntContainer#0-1] com.wgf.demo.listener.PriorityListener : 队列 priority.queue 接收信息:优先级:5
22:30:29.755 INFO 21004 --- [ntContainer#0-1] com.wgf.demo.listener.PriorityListener : 队列 priority.queue 接收信息:优先级:3
22:30:29.755 INFO 21004 --- [ntContainer#0-1] com.wgf.demo.listener.PriorityListener : 队列 priority.queue 接收信息:优先级:1

优先级队列标志

惰性队列

消息存入磁盘,减少内存占用以便存储更多消息

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列(Lazy Queues)是一种特殊类型的队列,旨在提供对大量消息的高效存储和检索。这些队列的设计目标是最小化内存占用,尤其在处理大型消息或大量消息时。以下是有关RabbitMQ惰性队列的详细说明:

  1. 消息的持久性存储:惰性队列将消息持久地存储在磁盘上而不是内存中,这意味着即使RabbitMQ服务器重新启动,消息也不会丢失。这对于关键数据的可靠性非常重要。
  2. 内存优化:通常,RabbitMQ队列会将消息保留在内存中以加快消息传递速度。然而,在某些情况下,这可能导致内存耗尽,特别是当处理大型消息或大量消息时。惰性队列通过将消息写入磁盘,减少了内存的使用,因此更适合处理大容量数据。
  3. 延迟加载:与普通队列不同,惰性队列在消息入队时不会立即将消息写入磁盘。相反,它们采用延迟加载的策略,只有在消息确实需要时才将其加载到内存中。这有助于降低消息入队的延迟,并减少了磁盘I/O的负担。
  4. 适用场景:惰性队列特别适用于需要处理大型文件、大型数据集或需要长时间存储消息的情况。它们可以减少RabbitMQ服务器的内存占用,从而提高系统的稳定性和性能。

注意

惰性队列的消息存储在磁盘上,当消息被消费时,需要从磁盘加载到内存,然后才会被推送给消费者。因此,惰性队列的消费速度相对较慢。它适合用于消息堆积较多且消息的实时性要求不高的场景。

两种模式

队列具备两种模式:defaultlazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的

在队列声明的时候可以通过x-queue-mode参数来设置队列的模式,取值为defaultlazy

代码实现

队列配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Configuration
public class LazyConfig {
public static final String LAZY_QUEUE = "lazy.queue";
public static final String LAZY_EXCHANGE = "lazy_exchange";
public static final String ROUTING_KEY = "lazy";

/**
* 创建惰性队列
*
* @return
*/
@Bean
public Queue lazyQueue() {

// 延迟交换机类型
Map<String, Object> args = new HashMap<>(8);
args.put("x-queue-mode", "lazy");
return new Queue(LAZY_QUEUE, false, false, false, args);
}

/**
* 创建交换机
*
* @return
*/
@Bean
public DirectExchange lazyExchange() {
return new DirectExchange(LAZY_EXCHANGE);
}


/**
* 队列绑定
* @param directQueue
* @param directExchange
* @return
*/
@Bean
public Binding bindLazyQueue(Queue lazyQueue, DirectExchange lazyExchange) {
return BindingBuilder.bind(lazyQueue).to(lazyExchange).with(ROUTING_KEY);
}
}

发送消息

1
2
3
4
5
6
7
@Test
public void lazyTest() {
String msg = UUID.randomUUID().toString();
for (int i = 0; i < 1000000; i++) {
rabbitTemplate.convertAndSend(LazyConfig.LAZY_EXCHANGE, LazyConfig.ROUTING_KEY, msg);
}
}

惰性队列与普通队列比较

内存占用情况

由此可见惰性队列将数据存储在硬盘中,内存占用较小,适合大量消息堆积场景使用

命令:rabbitmqctl list_queues name messages memory

备份交换机

处理无法被交换机路由的消息

消息不丢失又不增加生产者复杂性

在RabbitMQ中,备份交换机可以理解为交换机的“备胎”。当一个交换机接收到一条无法路由的消息时,它会把这条消息转发给备份交换机,由备份交换机来进行转发和处理。备份交换机的类型通常为“fanout”型,这样就能把所有消息都投递到与其绑定的队列中。

备份交换机的优点在于,它提供了一种有效的处理不可路由消息的机制,避免了这些消息被丢失或未被处理的情况。

代码实现

备份交换机配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
@Component
public class BackUpConfig {
/**
* 备份交换机配置
*/
public static final String BACKUP_EXCHANGE = "backup";
public static final String BACKUP_QUEUE = "backup.queue";
public static final String WARING_QUEUE = "waring.queue";

/**
* 备份队列
*
* @return
*/
@Bean
public Queue backupQueue() {
return new Queue(BACKUP_QUEUE);
}

/**
* 警告队列
*
* @return
*/
@Bean
public Queue waringQueue() {
return new Queue(WARING_QUEUE);
}

/**
* 备份交换机
*
* @return
*/
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE);
}

/**
* 备份交换机绑定报警队列
*
* @param fanoutQueue1
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindWaringQueue(Queue waringQueue, FanoutExchange backupExchange) {
// 不需要 routing_key
return BindingBuilder.bind(waringQueue).to(backupExchange);
}

/**
* 备份交换机绑定备份队列
*
* @param fanoutQueue1
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindBackupQueue(Queue backupQueue, FanoutExchange backupExchange) {
// 不需要 routing_key
return BindingBuilder.bind(backupQueue).to(backupExchange);
}

/**
* 正常交换机队列配置
*/
public static final String EXCHANGE = "product_exchange";
public static final String QUEUE = "product.queue";
public static final String ROUTING_KEY = "product";

/**
* 创建队列
*
* @return
*/
@Bean
public Queue productQueue() {
return new Queue(QUEUE);
}

/**
* 创建交换机
*
* @return
*/
@Bean
public DirectExchange productExchange() {
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(EXCHANGE)
.durable(true)
// 指定交换机的备份交换机
.withArgument("alternate-exchange", BACKUP_EXCHANGE);
return exchangeBuilder.build();
}

/**
* 正常交换机绑定商品队列
* @param productQueue
* @param productExchange
* @return
*/
@Bean
public Binding bindProductQueue(Queue productQueue, DirectExchange productExchange) {
return BindingBuilder.bind(productQueue).to(productExchange).with(ROUTING_KEY);
}
}

消息监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@Component
public class BackUpListener {
@RabbitListener(queues = BackUpConfig.QUEUE)
public void listenerProduct(String msg) {
log.info("商品队列接收消息:{}", msg);
}

@RabbitListener(queues = BackUpConfig.BACKUP_QUEUE)
public void listenerBackup(String msg) {
log.info("备份队列接收消息:{}", msg);
}

@RabbitListener(queues = BackUpConfig.WARING_QUEUE)
public void listenerWaring(String msg) {
log.info("报警队列接收消息:{}", msg);
}
}

发送消息

1
2
3
4
5
@Test
public void backUpTest() {
rabbitTemplate.convertAndSend(BackUpConfig.EXCHANGE, BackUpConfig.ROUTING_KEY, "正常商品消息");
rabbitTemplate.convertAndSend(BackUpConfig.EXCHANGE, "test", "没有路由键的商品消息");
}

消费者log

1
2
3
INFO 860 --- [ntContainer#1-1] com.wgf.demo.listener.BackUpListener     : 商品队列接收消息:正常商品消息
INFO 860 --- [ntContainer#3-1] com.wgf.demo.listener.BackUpListener : 备份队列接收消息:没有路由键的商品消息
INFO 860 --- [ntContainer#2-1] com.wgf.demo.listener.BackUpListener : 报警队列接收消息:没有路由键的商品消息

由log体现,路由键为test的消息被转发到备份交换机处理

备份交换机标识

控制台指定备份交换机

备份交换机和ReturnCallback

当交换机绑定了备份交换机并且生产者代码中开启了 mandatory 参数和声明了ReturnCallback时,备份交换机的优先级更高,会将消息投递到备份交换机,而 ReturnCallback 不执行

RPC模式

TODO

https://zq99299.github.io/mq-tutorial/rabbitmq-ac/04/06.html

RabbitMQ集群

集群搭建

仲裁队列创建

最开始我们介绍了如何安装及运行 RabbitMQ 服务,不过这些是单机版的,无法满足目前真实应用的 要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ 服务器可以满足每秒 1000 条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒 10 万条消息的吞吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个 RabbitMQ 集群才是解决实际问题的关键

集群节点类型

  • 内存节点(ram):就是将元数据都放在内存里,内存节点的话,只要服务重启,该节点的所有数据将会丢失

  • 硬盘节点(disk):就是将元数据都放在硬盘里,所以服务重启的话,数据也还是会存在

注意:硬盘节点如果宕机了,交换机,队列,绑定关系将不可创建,但是原有队列还可以正常使用

仲裁队列

在 RabbitMQ 中,仲裁队列(Quorum Queue)是一种高可用性的队列类型,基于 raft 共识算法的变种实现,它提供了一种可靠的消息存储和传递机制,适用于分布式系统中对消息丢失非常敏感的应用场景。仲裁队列是 RabbitMQ 3.8 版本引入的新功能,它与传统的镜像队列(Mirrored Queue)相比具有数据一致性优势。

描述:

  • 基于 Raft:仲裁队列基于 Raft 协议实现,确保在分布式环境中的一致性和容错能力。
  • 同步复制:消息在被认为已提交之前,必须被复制到集群中的大多数节点。

优点:

  • 强一致性:由于使用了 Raft 协议,仲裁队列提供了强一致性保证。
  • 读取性能:读取操作可以在任何节点上进行,提供较好的读取性能。

缺点:

  • 写入性能:由于同步复制的要求,写入性能可能不如镜像队列。

使用场景:

  • 当数据一致性是首要考虑的因素时。

仲裁队列的消息会优先保存在内存中,使用仲裁队列时,建议定义队列最大长度和最大内存占用,在消息堆积超过阈值时从内存转移到磁盘,以免造成内存高水位

镜像队列

描述:

  • 异步复制:消息被首先写入主节点,然后异步复制到镜像节点。
  • 主-备架构:主节点负责处理写入,镜像节点用于故障恢复。

优点:

  • 写入性能:由于使用异步复制,写入性能通常优于仲裁队列。
  • 灵活配置:允许灵活配置哪些节点应该存储队列的副本。

缺点:

  • 一致性问题:在网络分区或节点故障的情况下,可能会面临一致性问题。
  • 配置复杂性:配置比仲裁队列复杂。

使用场景:

  • 当写入性能是首要考虑的因素时

Haproxy

  • 负载均衡:HAProxy 可以将来自客户端的连接请求分发到 RabbitMQ 集群中的多个节点。
  • 高可用性:在 RabbitMQ 节点发生故障时,HAProxy 可以自动将流量重定向到健康的节点。

Keepalived

故障转移:RabbitMQ 客户端使用虚拟 IP 地址连接到 RabbitMQ 集群。这样,即使当前的主 RabbitMQ 节点失败,Keepalived 会将虚拟 IP 地址切换到另一个健康的节点,客户端可以在不更改 IP 地址的情况下继续发送和接收消息。

RabbitMQ 可靠性知识

消息幂等

在使用 RabbitMQ 时,有时候我们会遇到这样一个问题:同一条消息被消费多次,也就是 重复消费。这通常发生在以下几种情况:

  1. 网络中断:消费者成功处理了消息,但在发送确认(ack)给 RabbitMQ 时网络中断,导致 RabbitMQ 没有收到确认。
  2. 消费异常:消费者在处理消息时发生异常,虽然消息实际上已被处理,但消费者没有发送确认给 RabbitMQ。

如何保证消息幂等性,不被重复消费

  • 使用唯一消息标识符
    • 为每条消息分配一个全局唯一的 ID(MessageID)。
    • 在消费消息之前,检查该消息ID是否已被处理过。
    • 如果已处理过,跳过该消息;如果未处理过,正常消费并记录该消息ID。
  • 利用业务逻辑
    • 使用与业务相关的唯一标识符(例如,订单号)来检查消息是否已被处理。
    • 在处理消息之前,验证该业务标识符是否已在系统中被处理或记录过。
    • 如果已处理,忽略该消息;如果未处理,正常消费并记录该业务标识符

全局MessageID 方案(Redis)

使用 Redis BitMap判断是否幂等消费

生成全局唯一ID,然后由BitMap判断是否消费

  • 利用 redis 执行 setnx 命令,天然具有幂等性。
  • 利用 SADD queueName 消息id 命令存储待消费消息id
  • 利用 SISMEMBER queueName 消息id 命令判断消息是否已消费
  • 利用SREM queueName 消息id 删除已消费的消息id

架构图

优点:消息id不和业务绑定,通用性较强

缺点:架构引入了Redis,需要保证Redis的可用性

使用业务ID方案

使用业务编号作为消息id

每次消费时根据消息id查询数据状态,满足则消费消费,否则丢弃消息保证幂等

场景:订单异步退款

架构图

优点:架构实现简单,相对可靠,贴合业务

缺点:与业务严重偶合,带有一定的业务入侵,无法通用所有场景

消息可靠投递

消息丢失场景

  • Producer发送消息到 Broker 的过程中,如果ProducerBroker由于网络或者磁盘问题,未能成功接收消息,可能导致消息丢失
  • Exchange路由Queue过程中,没有匹配到相应的Routing Key导致消息丢失
  • Comsumer在消费异常时,自动ACK,导致消息没有重回队列头部,消息丢失

消息投递到Broker可靠保证

​ 使用 ConfirmCallback 异步回调确认机制确认消息是否发送到Exchange

消息Exchange路由到Queue可靠保证

保证消息最终被正确消费

  • 启用 手动ack 机制,消费异常进行Nack,让消息重回队列
  • 或使用 死信队列 ,消息消费异常拒绝消息签收,转发到死信队列,由其他业务进行补偿

消息重发机制(投递失败业务补偿)

  • 使用Redis持久化消息,提供失败消息重新投递数据支撑,DB层也可选择Mysql等其他产品,具体看业务场景
  • 定时任务实现消息重新投递,进行业务补偿

消息幂等

消息幂等,基于Redis的自增Id和BitMap的通用方案或使用业务数据状态

消息最终消费保证

​ 采用手动ack + 定时任务 + 人工处理方式保证

人工补偿,防止消息无限消费

当重试超过阈值消息永久存储在DB中,这时需要人工介入进行业务补偿,补偿完毕删除DB数据

具体实现

根据以上的理论对方案进行代码实现,解决点有

  • 消息幂等
  • 消息可靠投递保证
  • 消息最终消费保证

技术栈

  • Mysql,针对11.2章节方案,DB层改为Mysql。原因:底层更通用,便于人工补偿数据检索和扩展。
  • MybatisPlus
  • RabbitMQ
  • Redis,用于生成消息ID (可不用),可用雪花算法或美团的 Leaf 等方式生成;保证消息幂等
  • Scheduled,为了方便实现使用Spring定时任务,推荐使用xxl-job等第三方框架进行补偿解耦

源码地址:rabbitmq-example,模拟用户下单异步生成订单扣减库存流程

关键字段说明

图中的状态代表消息日志状态的流转

logId 消息日志主键,用于变更状态

msgId 消息全局唯一id

messageId logId_msgId 拼接,消息体实际使用的id

服务说明

producer_server 消息发送服务,提供消息发送和消息重新投递能力

consumer_server 消息消费者

reparation-server 补偿服务,提供定期补偿消息重新投递,堆积消息日志异步删除能力

核心代码讲解

producer-server

  • 集成swagger 项目启动访问 http://localhost:9000/doc.html

  • MsgController 对外提供消息投递与消息重新投递能力

  • RabbitMqUtils 消息投递与持久化具体实现

  • SequenceUtils 全局唯一id具体实现

  • MsgConfirmCallback 消息投递Broker失败异步回调,重新实现消息投递核心

  • MsgReturnCallback 消息路由失败异步回调,重新实现消息投递核心

consumer-server

  • @IdempotentMessage 消息幂等与消息重新消费自定义注解,作用于监听方法(可选择是否使用)
  • RabbitListenerAop 消息幂等于重新消费限制具体实现

reparation-server

  • ReparationTask 消息重新投递具体实现
  • CleanMsgTask 消息日志异步清理实现

消息状态

  • 投递中
  • 投递成功
  • 投递失败
  • 人工修复
  • 消费成功
  • 重复消费

额外扩展

路由失败: 也可以使用 备份交换机 方案,由reparation-server 监听备份队列进行业务补偿

拒绝消息 重试多次消费上限丢弃消息可以使用 死信队列 方案解决

项目下demo.sql为mysql DDL 脚本


RabbitMQ
https://wugengfeng.cn/2022/01/25/RabbitMQ/
作者
wugengfeng
发布于
2022年1月25日
许可协议