Kafka保证消息顺序性的特点如下所示:
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。在单个partition中的数据是有序的,如果消息分散在不同的 partition,Kafka 无法保证其顺序性。但只需要确保要求顺序性的若干消息发送到同一个 partiton,即可满足其顺序性。并且在进行消息消费的时候,需要确保消费者是进行单线程消费。
要保证若干消息发送到同一个partiton中,那么我们就需要在发送消息的时候指定一个分区的id,那么这样的话消息就被发送到同一个分区中。
// 发送消息到指定的分区,保证分区的消息顺序性
public static void sendMessageToDestPartition() {
for(int x = 0 ; x < 5 ; x++) {
// Kafka消息的异步发送
String msg = "Kakfa环境测试...." + x ;
kafkaTemplate.send("dd" ,0 , "order" ,
msg).addCallback((obj) -> {
LOGGER.info("send msg to kafka broker success ---> {} " ,
((SendResult)obj).getProducerRecord().value());
} , (t) -> {
t.printStackTrace();
});
LOGGER.info("send msg to local cache success ---> {} " , msg);
}
}
消费者进行指定分区的消费:
@KafkaListener(topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = "dd" ,
partitions = "0")} , groupId = "dd.demo")
public void consumerOrderMessageHandler(String msg , KafkaConsumer
consumer) {
LOGGER.info("consumer topic is : {} , msg is ----> {} " , "itcast" ,
msg);
consumer.commitAsync();
}
当消费端报错,消息重试了指定次数以后,还没有收到服务端的ack,此时就会抛出异常。 如果我们还需要保证消息的顺序性,那么我们就需要将max.in.flight.requests.per.connection设置为1,该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
如何保证顺序性:如果把 retries 设为非零整数,同时把max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把retries 设为 0。可以把 max.in.flight.requests.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。
配置如下所示:
spring:
kafka:
producer:
bootstrap-servers: 192.168.23.131:9092
acks: all
retries: 2
properties: {'max.in.flight.requests.per.connection' : 1}
阅读量:2013
点赞量:0
收藏量:0