深入浅出消息队列---9、Kafka消息顺序性-灵析社区

提笔写架构

Kafka消息顺序性

消息顺序性

Kafka保证消息顺序性的特点如下所示:

 topic中的数据分割为一个或多个partition。每个topic至少有一个partition。在单个partition中的数据是有序的,如果消息分散在不同的 partition,Kafka 无法保证其顺序性。但只需要确保要求顺序性的若干消息发送到同一个 partiton,即可满足其顺序性。并且在进行消息消费的时候,需要确保消费者是进行单线程消费。

要保证若干消息发送到同一个partiton中,那么我们就需要在发送消息的时候指定一个分区的id,那么这样的话消息就被发送到同一个分区中。

// 发送消息到指定的分区,保证分区的消息顺序性
public static void sendMessageToDestPartition() {
    for(int x = 0 ; x < 5 ; x++) {
        // Kafka消息的异步发送
        String msg = "Kakfa环境测试...." + x ;
        kafkaTemplate.send("dd" ,0 , "order" ,
msg).addCallback((obj) -> {
            LOGGER.info("send msg to kafka broker success ---> {} " ,
((SendResult)obj).getProducerRecord().value());
       } , (t) -> {
            t.printStackTrace();
       });
        LOGGER.info("send msg to local cache success ---> {} " , msg);
   }
}

消费者进行指定分区的消费:

@KafkaListener(topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = "dd" ,
partitions = "0")} , groupId = "dd.demo")
public void consumerOrderMessageHandler(String msg , KafkaConsumer
consumer) {
    LOGGER.info("consumer topic is : {} , msg is ----> {} " , "itcast" ,
msg);
    consumer.commitAsync();
}

消费顺序性

当消费端报错,消息重试了指定次数以后,还没有收到服务端的ack,此时就会抛出异常。 如果我们还需要保证消息的顺序性,那么我们就需要将max.in.flight.requests.per.connection设置为1该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

如何保证顺序性:如果把 retries 设为非零整数,同时把max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。

 一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把retries 设为 0。可以把 max.in.flight.requests.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

配置如下所示:

spring:
 kafka:
   producer:
     bootstrap-servers: 192.168.23.131:9092
     acks: all
     retries: 2
     properties: {'max.in.flight.requests.per.connection' : 1}


阅读量:2013

点赞量:0

收藏量:0