Skip to content
目录概览

死信队列和延迟队列的使用?

  • 死信消息: 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false 消息过期了 队列达到最大的长度

  • 过期消息: 在 rabbitmq 中存在2种方可设置消息的过期时间,第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间,第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息。

  • 队列设置: 在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒

  • 单个消息设置: 是设置消息属性的 expiration 参数的值,单位为 毫秒

  • 延时队列: 在rabbitmq中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。


场景演示: 需求:用户在系统中创建一个订单,如果超过时间用户没有进行支付,那么自动取消订单。

分析:

  1. 上面这个情况,我们就适合使用延时队列来实现,那么延时队列如何创建
  2. 延时队列可以由 过期消息+死信队列 来时间
  3. 过期消息通过队列中设置 x-message-ttl 参数实现
  4. 死信队列通过在队列申明时,给队列设置 x-dead-letter-exchange 参数,然后另外申明一个队列绑定x-dead-letter-exchange对应的交换器。
ConnectionFactory factory = new ConnectionFactory(); 
factory.setHost("127.0.0.1"); 
factory.setPort(AMQP.PROTOCOL.PORT); 
factory.setUsername("guest"); 
factory.setPassword("guest"); 
Connection connection = factory.newConnection(); 
Channel channel = connection.createChannel();
// 
// 声明一个接收被删除的消息的交换机和队列 
String EXCHANGE_DEAD_NAME = "exchange.dead"; 
String QUEUE_DEAD_NAME = "queue_dead"; 
channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT); 
channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null); 
channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead"); 
// 
String EXCHANGE_NAME = "exchange.fanout"; 
String QUEUE_NAME = "queue_name"; 
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); 
Map<String, Object> arguments = new HashMap<String, Object>(); 
// 统一设置队列中的所有消息的过期时间 
arguments.put("x-message-ttl", 30000); 
// 设置超过多少毫秒没有消费者来访问队列,就删除队列的时间 
arguments.put("x-expires", 20000); 
// 设置队列的最新的N条消息,如果超过N条,前面的消息将从队列中移除掉 
arguments.put("x-max-length", 4); 
// 设置队列的内容的最大空间,超过该阈值就删除之前的消息
arguments.put("x-max-length-bytes", 1024); 
// 将删除的消息推送到指定的交换机,一般x-dead-letter-exchange和x-dead-letter-routing-key需要同时设置
arguments.put("x-dead-letter-exchange", "exchange.dead"); 
// 将删除的消息推送到指定的交换机对应的路由键 
arguments.put("x-dead-letter-routing-key", "routingkey.dead"); 
// 设置消息的优先级,优先级大的优先被消费 
arguments.put("x-max-priority", 10); 
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 
String message = "Hello RabbitMQ: "; 
// 
for(int i = 1; i <= 5; i++) { 
	// expiration: 设置单条消息的过期时间 
	AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().priority(i).expiration( i * 1000 + ""); 
	channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8")); 
} 
channel.close(); 
connection.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44