深入浅出消息队列---3、RabbitMQ延迟队列-灵析社区

提笔写架构

延迟队列

当用户秒杀成功以后,就需要引导用户去订单页面进行支付。如果用户在规定的时间之内(30分钟),没有完成订单的支付,此时我们就需要进行库存的回退操作。

库存回退架构

回退库存的架构如下图所示:

过期时间

目前有两种方法可以设置消息的TTL:

 1、通过队列的属性设置,队列中所有的消息都有相同的过期时间。

 2、是对消息本身进行单独的设置,每条消息的TTL可以不同。

 如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。消息在队列中的生存时间一旦

 超过设置的TTL值时,就会变成"死信"。

设置消息的TTL

在发送消息的时候可以直接通过消息属性来设置消息的过期时间,代码如下所示:

// 测试发送消息时设置发送消息的过期时间 
private static void sendPreMessageTTL() { 
	// void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) 
	rabbitTemplate.convertAndSend("direct.exchange" , "create" , "测试延迟消息发送,为每一个消息去设置过期时间" , (message) -> { 
	MessageProperties messageProperties = 
	message.getMessageProperties();       // 获取消息属性对象 
	messageProperties.setExpiration("30000");                      
	// 设置消息的过期时间为30秒 
	return message ; 
	    }); 
} 

如果消息没有及时消费,那么经过30秒以后,消息变成死信,Rabbitmq会将这个消息直接丢弃。

设置队列的TTL

我们也可以直接通过队列的属性设置消息的过期时间,队列中所有的消息都有相同的过期时间,代码如下所示:

// 声明队列 
@Bean(name = "direct.queue_02") 
public Queue commonQueue02() { 
	QueueBuilder queueBuilder = QueueBuilder.durable("direct.queue_02"); queueBuilder.ttl(30000) ;               // 设置队列消息的过期时间,为30 
	秒 
	return queueBuilder.build() ; 
}

死信队列

当一个消息变成死信了以后,默认情况下这个消息会被mq删除。如果我们给队列指定了"死信交换机"(DLX: Dead-Letter-Exchange),那么此时这个消息就会转发到死信交换机,进而被与死信交换机绑定的队列(死信队列)进行消费。从而实现了延迟消息发送的效果。

原理介绍

具体的原理如下图所示:


 一个消息成为死信常见情况:

 1、消息过期

 2、队列达到最大长度(可以通过x-max-length参数来指定队列的长度,如果不指定,可以认为是无限长)

具体实现

接下来我们就来演示一下死信队列的使用,使用死信队列来实现消息的延迟发送,具体的代码实现如下所示:

 1、声明死信交换机、死信队列、死信交换机和死信队列的绑定

// 声明死信交换机 
@Bean(name = "dlx.exchange") 
public Exchange dlxExchange() { 
	return 
	ExchangeBuilder.directExchange("dlx.exchange").durable(true).build() ; 
} 
// 声明死信队列 
@Bean(name = "dlx.queue") 
public Queue dlxQueue() { 
	return QueueBuilder.durable("dlx.queue").build() ; 
} 
// 完成死信队列和死信交换机的绑定 
@Bean 
public Binding dlxQueueBindToDlxExchange(@Qualifier(value = 
"dlx.exchange") Exchange exchange , @Qualifier(value = "dlx.queue") Queue queue) { 
return 
	BindingBuilder.bind(queue).to(exchange).with("delete").noargs() ; 
} 

2、将死信队列作为普通队列的属性设置过去

// 声明队列 
@Bean(name = "direct.queue_02") 
public Queue commonQueue02() { 
	QueueBuilder queueBuilder = 
	QueueBuilder.durable("direct.queue_02"); 
	queueBuilder.deadLetterExchange("dlx.exchange") ;   // 将死信交换机作 为普通队列的属性设置过去 
	queueBuilder.deadLetterRoutingKey("delete") ;       // 设置消息的 routingKey 
	queueBuilder.ttl(30000) ;                        // 设置队列消息的 过期时间,为30秒 
	queueBuilder.maxLength(2) ;                      // 设置队列的最大 长度 
	return queueBuilder.build() ; 
} 

3、消费端进行同样的设置,并且指定消费死信队列

@Component 
public class RabbitmqDlxQueueConsumer { 
	// 创建日志记录器 
	private static final Logger LOGGER = 
	LoggerFactory.getLogger(RabbitmqDlxQueueConsumer.class) ; @RabbitListener(queues = "dlx.queue") 
	public void dlxQueueConsumer(String msg) { 
	    LOGGER.info("dlx queue msg is : {} " , msg ); 
	} 
}

另外还可以基于RabbitMQ 插件实现的延时队列。

阅读量:1868

点赞量:0

收藏量:0