千家信息网

如何进行kafka批量消费多消费者问题分析

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。package com.llw.m
千家信息网最后更新 2025年01月22日如何进行kafka批量消费多消费者问题分析

今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord record) {        Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {            Object message = kafkaMessage.get();            System.out.println("----------------- record =" + record);            System.out.println("----------------- message =" + message);        }    }@KafkaListener(id = "2", topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            })public void listen2(ConsumerRecord record) {        Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {            Object message = kafkaMessage.get();            System.out.println("----------------- record 1=" + record);            System.out.println("------------------ message 1=" + message);        }    }//id = "4",    //id="4"    @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )*//*            },*/ containerFactory = "kafkaBatchListener6")public void listen3(List> records) {//, Acknowledgment ack        try {for (ConsumerRecord record : records) {                Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {                    Object message = kafkaMessage.get();                    System.out.println("----------------- record 4=" + record);//   System.out.println("------------------ message 4=" + message);                }            }        } finally {//   ack.acknowledge();        }    }//id="5"    @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            },*/ containerFactory = "kafkaBatchListener6")public void listen2(List> records) {//, Acknowledgment ack        try {for (ConsumerRecord record : records) {                Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {                    Object message = kafkaMessage.get();                    System.out.println("----------------- record 6=" + record);//   System.out.println("------------------ message 6=" + message);                }            }        } finally {//   ack.acknowledge();        }    }//https://www.cnblogs.com/linjiqin/p/13171789.html    @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            }, */containerFactory = "kafkaBatchListener6")public void listen4(List> records) {try {for (ConsumerRecord record : records) {                Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {                    Object message = kafkaMessage.get();                    System.out.println("----------------- record 3=" + record);//   System.out.println("------------------ message 6=" + message);                }            }        } finally {//   ack.acknowledge();        }    }}

一个partition只能有一个消费者,如果多个消费者会是广播模式,每个消费者都会有一条数据,kafka是一个发布和订阅模式的主键,并不是队列模式,

spring boot整合时,如果使用topicPartitions 注解参数指定partition会有消息重复消费的问题,最好使用topics注解,并指定groupId。

看完上述内容,你们对如何进行kafka批量消费多消费者问题分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0