消费端消息丢失
场景描述:
位移提交:对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。
单词"offset"可以编译为"偏移量",也可以翻译为"位移",在很多的中文资料中都会交叉使用"偏移量"和"位移"这两个词,对于消息在分区中的位置,我们将offset称之为"偏移量";对于消费者消费到的位置,将offset称为"位移",有时候也会更明确地称之为"消费位移"("偏移量"是在讲分区存储层面的内容,"位移"是在讲消费层面的内容)当然对于一条消息而言,它的偏移量和消费者消费它时的消费位移是相等的。
当每一次调用poll()方法时,它返回的是还没有消费过的消息集(当然这个前提是消息以及存储在Kafka中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。
消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来的动作称之为"提交",消费者在消费完消息之后需要执行消费位移的提交。默认情况下Kafka的消费位移提交是自动提交,这个由消费者客户端参数enable.auto.commit配置,默认值为true。当然这个默认的自动提交并不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒。自动位移提交的动作是在poll()方法的逻辑里面完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
自动位移提交带来为问题:
1、重复消费
2、消息丢失
重复消费原理如下图所示:
消息丢失如下图所示:
解决方案:将自动位移提交更改为手动位移提交
注意:
1、在单独使用Kafka的java客户端将位移提交的模式更改为手动位移提交,那么我们就需要显示的调用consumer的方法完成位移提交。
// 通过Kafka的方式消费消息,如果将位移提交更改为手动位移提交。那么我们就需要主动调用consumer的方法完成位移提交
private static void consumerFromKafka() {
// 创建属性集合,设置初始化参数
Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", "192.168.23.131:9092");
properties.put("group.id", "dd.demo");
properties.put("enable.auto.commit" , "false") ;
// 创建KafkaConsumer对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("dd"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
// consumer.commitSync(); // 进行同步位移提交,会阻塞当前线程
consumer.commitAsync(new OffsetCommitCallback(){
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null) {
System.out.println(offsets);
} else {
LOGGER.error("fail to commit offsets {}", offsets, exception);
}
}
});
}
}
2、 在使用的是spring boot和Kafka进行整合,当我们将spring.kafka.consumer.enable-auto-commit的值设置为false以后,只有在特定的提交模式下我们可以手动进行提交。
在一些提交模式下由spring根据约定的条件控制提交。
常见的提交模式是在ContainerProperties.AckMode这个枚举类中定义。AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:
spring配置如下:
# kafka配置
spring:
kafka:
consumer:
bootstrap-servers: 192.168.23.131:9092
enable-auto-commit: false # 设置手动位移提交
listener:
ack-mode: manual_immediate
代码实现:
@KafkaListener(topics = "dd" , groupId = "dd.demo")
public void consumerHandler(String msg , KafkaConsumer consumer) {
LOGGER.info("consumer topic is : {} , msg is ----> {} " , "itheima"
, msg);
// consumer.commitSync(); // 同步位移提交
consumer.commitAsync((offsets , e) -> {
if(e == null){
LOGGER.info("位移提交成功....");
}else {
Set<Map.Entry<TopicPartition, OffsetAndMetadata>> entries =offsets.entrySet();
for(Map.Entry<TopicPartition , OffsetAndMetadata> entry :entries) {
TopicPartition topicPartition = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
LOGGER.info("位移提交失败....topicPartition is {} ,offsets is {} " , topicPartition.partition() ,offsetAndMetadata.offset());
}
}
});
}
生产者将消息发送到Broker中以后,消息还没有被及时消费,此时Broker宕机了,这样就会导致消息丢失。
Kafka消息的发送流程:
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程会根据指定的条件,不断从RecordAccumulator中拉取消息发送到Kafka broker。如下图所示:
Sender线程拉取消息的条件:
发送消息的三种方式:1、发后即忘 2、同步消息发送 3、异步消息发送
// 演示消息发送: 发送即忘
public static void sendMessageMethod01() {
for(int x = 0 ; x < 5 ; x++) {
String msg = "Kakfa环境测试...." + x ;
kafkaTemplate.send("itheima" , msg) ; // 发后即
忘
LOGGER.info("send msg success ---> {} " , msg);
}
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V
data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic,
data);
return doSend(producerRecord);
} 12345
要实现同步发送的,可以利用返回的ListenableFuture实现,如下:
// 演示消息发送: 同步发送
public static void sendMessageMethod02() throws ExecutionException,
InterruptedException {
for(int x = 0 ; x < 5 ; x++) {
String msg = "Kakfa环境测试...." + x ;
kafkaTemplate.send("itheima" , msg).get() ; // 同步消息发
送
LOGGER.info("send msg success ---> {} " , msg);
}
}
实际上send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。在执行send()方法之后直接链式调用了get()方法可以阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。这种方式性能最差,可靠性较好。
// 演示消息发送: 异步发送
public static void sendMessageMethod03() throws ExecutionException,
InterruptedException {
for(int x = 0 ; x < 5 ; x++) {
// Kafka消息的异步发送
String msg = "Kakfa环境测试...." + x ;
kafkaTemplate.send("itheima" , msg).addCallback((obj) -> {
LOGGER.info("send msg to kafka broker success ---> {} " ,
((SendResult)obj).getProducerRecord().value());
} , (t) -> {
t.printStackTrace();
});
LOGGER.info("send msg to local cache success ---> {} " , msg);
}
}
这种方式的特点:性能较好,可靠性也有保障
针对上述情况所产生的消息丢失,可以的解决方案有如下几种:
当生产者发送消息完毕以后,没有收到Broker返回的ack,此时就会触发重试机制或者抛出异常。我们可以通过retries参数设置重试次数(spring boot和Kafka整合默认的重试次数为0),发送客户端会进行重试直到broker返回ack。
阅读量:2015
点赞量:0
收藏量:0