在我们的业务系统中,一旦使用到了消息队列,我们就必须考虑消息的丢失问题。比如在秒杀业务中,一旦消息丢失了对我们用户而言就是不公平的。
场景描述
生产者已经将消息发送给了队列,但是此时消费者还没以及时对消息进行消费,这个时候指定的队列主机宕机了,这样存储在队列的消息也会丢失。
解决方案
对消息进行持久化操作。当对消息进行持久化操作以后,这个消息一旦被发送到mq中的某一个队列,那么此时Rabbitmq会立马将消息进行持久化。
注意
spring boot和rabbitmq进行整合以后,默认消息的存储就是持久化方式。我们可以将所有的消息都设置为持久化,但是这样会影响Rabbitmq的性能。因为我们需要将消息写入到内存的同时还需要将消息写入到磁盘。对于可靠性不是那么高的消息可以不采用持久化处理,以提高整体系统的吞吐量。
消息默认是持久化模式:
设置消息为非持久化模式:
// 测试Direct类型的交换机
private static void directExchangeMessageTransport() {
rabbitTemplate.convertAndSend("direct.exchange" , "create" , "direct exchange 测试数据" , (message) -> {
// spring boot和rabbitmq整合以后,默认消息是会被持久化的,我们可以将消息 的持久化方式设置为不进行持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_ PERSISTENT);
return message ;
});
}
不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。
1、持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。
2、非持久化的消息一般只保存在内存中,在内存吃紧的时候会被写入到磁盘中,以节省内存空间。
这两种类型的消息的落盘处理都在RabbitmqMQ的"持久层"中完成。持久层的组成如下所示:
rabbit_queue_index:负责维护队列中的落盘消息的信息,包括消息的存储地点、是否已被交付给消费者、 是否已被消费者ack。每一个队列都有与之对应的一个rabbitmq_queue_index。
rabbit_msg_store: 负责消息的存储,它被所有的队列共享,在每个节点中有且只有一个。
rabbit_msg_store可以在进行细分:
在容器中默认这些信息是通过/var/lib/rabbitmq/mnesia/rabbit@977b5f791952这个路径下的3个文件夹进 行存储:
消息可以存储在rabbit_queue_index中也可以存储在rabbit_msg_store中。最佳的配置是较小的消息存储在rabbit_queue_index中而较大的消息存储在rabbit_msg_store中。这个消息的界定可以通过queue_index_embed_msgs_below来配置,默认大小为4096,单位为B。注意这里的消息大小是
指消息体、属性以及headers整体的大小。当一个消息小于设定的大小阈值时就可以存储在rabbit_queue_index中,这样可以得到性能上的优化。这种存储机制是在Rabbitmq3.5 版本以后引入
的,该优化提高了系统性能10%左右。
那么我们是不是把queue_index_embed_msgs_below参数的值调节的越大越好呢?
肯定不是的rabbit_queue_index中以顺序(文件名从0开始累加)的段文件来进行存储,后缀为".idx",每个段文件中包含固定的SEGMENT_ENTRY_COUNT条记录,SEGMENT_ENTRY_COUNT默认值为
16384。每个rabbit_queue_index从磁盘中读取消息的时候至少在内存中维护一个段文件,所以设置
queue_index_embed_msgs_below值的时候需要格外谨慎,一点点增大也可能会引起内存爆炸式增长。
队列的结构以及消息的状态
Rabbitmq中队列的是由两部分组成:rabbit_amqpqueue_process和backing_queue组成:
rabbit_amqpqueue_process: 负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。
backing_queue: 是消息存储的具体形式和引擎,并向rabbit_amqpqueue_process提供相关的接口以供调用。
如果消息发送的队列是空的且队列有消费者,该消息不会经过该队列而是直接发往消费者,如果无法直接被消费,则需要将消息暂存入队列,以便重新投递。消息在存入队列后,主要有以下几种状态:
持久化的消息,消息内容和消息索引必须先保存在磁盘中,才会处于上面状态中的一种,gamma状态只有持久化的消息才有这种状态。Rabbitmq在运行时会根据统计的消息传送速度。
定期计算一个当前内存中能够保存的最大消息数量(target_ram_count), 如果alpha状态的消息数量大于此值时,就会引起消息的状态转换,多余的消息可能会转换到beta状态、gamma状态或者delta状态。
区分这4种状态的主要作用是满足不同的内存和CPU 的需求。
对于普通队列而言,backing_queue内部的实现是通过5个子队列来体现消息的状态的:
Q4:只包含alpha状态的消息
一般情况下,消息按照Q1->Q2->Delta->Q3->Q4这样的顺序进行流动,但并不是每一条消息都会经历所有状态,这取决于当前系统的负载情况(比如非持久化的消息在内存负载不高时, 就不会经历delta)。如此设计的好处:可以在队列负载很高的情况下,能够通过将一部分消息由磁盘保存来 节省内存空间,而在负载降低的时候,这部分消息又渐渐回到内存被消费者获取, 使得整个队列具有良好的弹性。
消费者消费消息也会引起消息状态的转换,状态转换的过程如下所示:
在系统负载较高中,已经收到的消息若不能很快被消费掉,就是这些消息就是在队列中"堆积", 那么此时
Rabbitmq就需要花更多的时间和资源处理"堆积"的消息,如此用来处理新流入的消息的能力就会降低,使得流入的消息又被"堆积"继续增大处理每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。
减少消息堆积的常见解决方案:
1、增加prefetch_count的值,设置消费者存储未确认的消息的最大值,消息达到prefetch_count最大值,直到确认了,生产者才会可能推送新的消息
2、消费者进行multiple ack,降低ack带来的开销
默认情况下,当生产者将消息发送到Rabbitmq的时候,队列中的消息会尽可能地存储在内存中,这样可以更快地将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。这样的机制无形会占用更多系统资源,毕竟内存应该留给更多有需要的地方。如果发送端过快或消费端宕机,导致消息大量积压,此时消息还是在内存和磁盘各存储一份,在消息大爆发的时候,MQ服务器会撑不住,影响其他队列的消息收发,能不能有效的处理这种情况呢。答案 惰性队列。
RabbitMQ从3.6.0版本开始引入了惰性队列(Lazy Queue)的概念。惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的,这样可以减少了内存的消耗,但是会增加I/0的使用,如果消息是持久化的,那么这样的I/0操作不可避免,惰性队列和持久化的消息可谓是"最佳拍档"。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。
把一个队列设置成惰性队列的方式:
// 声明队列
@Bean(name = "direct.queue_03")
public Queue commonQueue03() {
QueueBuilder queueBuilder = QueueBuilder.durable("direct.queue_03");
queueBuilder.lazy(); // 把队列设置成惰性队列
return queueBuilder.build();
}
场景描述
消费者消费到这个消息但是还没有及时处理,消费者宕机了。
解决方案
默认情况下,消费者消费到这个消息以后会自动给服务端发送一个Basic.Ack指令,告知这个消息已经被消费了,此时服务端会将这个消息从内存(磁盘)删除掉。
针对上述消息丢失的场景:我们只需要将自动应答更改为手动应答即可。
具体的实现如下所示:
1、更改消费端应答模式为:手动应答
listener:
simple:
acknowledge-mode: manual # 更改消息的应答模式为手动应答
2、当消费者消费完消息以后,进行消息应答
@RabbitListener(queues = "direct.queue_01")
public void directExchangeQueue01(String messageBody , Channel channel
, Message message) {
try {
// 对消息进行消费
LOGGER.info("direct exchange queue 01 message is : {}" ,
messageBody);
// 进行手动应答,第二个参数表示是否需要将该消息之前的所有的消息都进行应答
channel.basicAck(message.getMessageProperties().getDeliveryTag() ,
false);
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("direct exchange queue 01 处理失败, message is :
{}" , messageBody);
}
}
默认的分发机制:当Rabbitmq队列拥有多个消费者时,队列收到的消息将以轮询的方式发给消费者。每条消息只会发给订阅列表里的一个消费者。这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
很多时候轮询的分发机制也不是那么的优雅。默认情况下,如果有n个消费者,那么Rabbitmq会将第m条消息分发给第m%n(取余的方式)个消费者,Rabbitmq不管消费者是否消费并已经确认了消息。试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他的消费者由于某种原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。
那么该如何处理呢?这里就要用到channel.basicQos(int prefetchCount)这个方法。channel.basicQos方法允许限制通道上的消费者所能够保持的最大未确认消息的数量。
spring.rabbitmq.listener.simple.prefetch: 1 # 设置队列中最大的未确认的消息数量
场景描述
生成者将消息发送给交换机以后,正当交换机将这个消息发送给指定队列的时候,该队列所在的主机宕机了,那么这一则消息就会丢失。
解决方案
要想解决这种情况下消息的丢失,我们就需要知道生产者针对该消息的投递结果。默认情况下发送消息的操作,服务端是不会返回任何信息给生产者的,也就是说默认情况下生产者是不知道消息有没有正确地到达服务器端。那么要想知道生产者针对该消息的投递结果,我们有两种解决方案:
1、通过事务机制实现
2、通过发送方确认(publisher confirm)机制实现
在配置类中配置Rabbitmq的事务管理器:
// 配置事务管理器
@Bean(name = "rabbitTransactionManager")
public RabbitTransactionManager
rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory) ;
}
定义发送消息的类:
@Component
public class RabbitmqProducer {
// 定义日志记录器
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitmqProducer.class) ;
@Autowired
private RabbitTemplate rabbitTemplate ;
@Transactional(rollbackFor = Exception.class , transactionManager =
"rabbitTransactionManager")
public void sendMessage() {
rabbitTemplate.setChannelTransacted(true); // 将消息通道设置为事务机制
String msg = "测试生产者事务消息" ;
rabbitTemplate.convertAndSend("direct.exchange" , "create" ,
msg.getBytes());
int a = 1 / 0 ;
LOGGER.info("transactionManager message send success ----> " +
msg);
}
}
在调用方进行try…catch处理(可以重新尝试发送消息)
// 测试事务消息
private static void
sendTransactionManagerMsg(ConfigurableApplicationContext
applicationContext) {
RabbitmqProducer rabbitmqProducer =
applicationContext.getBean(RabbitmqProducer.class);
try {
rabbitmqProducer.sendMessage();
}catch (Exception e) {
e.printStackTrace();
System.out.println("事务消息回滚了");
}
}
我们也可以通过Wireshark捕获到命令的传递过程,如下所示:
事务机制会影响Rabbitmq的性能,事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitmqMQ的 回应,之后才能继续发送下一条消息。因此在真实开发过程中很少的使用。
Rabbitmq提供了一个改进方案,即发送方确认机制(publisher confirm)。
生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,Rabbitmq就会发送一个确认(Basic.ACK)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。
与事务机制相比,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息。如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条Basic.Nack命令,生产者应用程序同样可以在回调方法中处理该nack命令。
1、在配置文件中配置开启生产者确认机制
spring.rabbitmq.publisher-confirm-type: correlated # 开启生产者确认机制
2、定义发送消息的类:
@Component
public class RabbitmqSendMsgConfirm implements
RabbitTemplate.ConfirmCallback {
// 定义日志记录对象
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitmqSendMsgConfirm.class) ;
@Autowired
private RabbitTemplate rabbitTemplate ;
// 发送消息
public void sendMsg(String msg , String exchangeName , String
routingKey) {
// 设置消息发送是否成功的回调
rabbitTemplate.setConfirmCallback(this);
// void convertAndSend(String exchange, String routingKey,
Object message, MessagePostProcessor messagePostProcessor,
CorrelationData correlationData)
CorrelationData correlationData = new CorrelationData(msg) ;
rabbitTemplate.convertAndSend(exchangeName , routingKey , msg ,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PER
SISTENT); // 进行消息持久化操作
return message ;
} , correlationData);
}
/**
* CorrelationData: 每个发送的消息都需要配备一个 CorrelationData 相关数据
对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
if(ack) {
LOGGER.info("消息发送成功");
}else {
LOGGER.info("消息发送失败");
}
}
}
上述这种确认机制,是确保消息是否已经发送到正确的交换机上。
消息一旦被发送到正确到交换机上以后,ack的值就是true。那么这个消息是否被投递到了指定的队列,如果
消息没有被投递到指定的队列。那么作为生产者如何获知到呢?
我们需要在次进行确认,具体的实现如下所示:
1、在配置文件中进行如下配置
spring.rabbitmq.template.mandatory: true (表示的意思是交换机无法根据自身的类型和路由键找到一个符号条件的队列,那么RabbitmqMQ会调用Basic.Return返回消息给生产者)
spring.rabbitmq.publisher-returns: true 开启return确认机制
2、添加return确认机制的回调函数
// 发送消息
public void sendMsg(String msg , String exchangeName , String
routingKey) {
...
// 添加return确认机制的回调函数
rabbitTemplate.setReturnCallback(this);
...
}
// 当消息没有被交换机投递到指定的队列的回调函数
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
LOGGER.info("消息没有被投递到指定的队列 ---> " + new String(message.getBody()));
}
阅读量:1041
点赞量:0
收藏量:0