Kafka多线程Consumer的实例代码
发表于:2025-01-27 作者:千家信息网编辑
千家信息网最后更新 2025年01月27日,这篇文章主要介绍"Kafka多线程Consumer的实例代码",在日常操作中,相信很多人在Kafka多线程Consumer的实例代码问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对
千家信息网最后更新 2025年01月27日Kafka多线程Consumer的实例代码
这篇文章主要介绍"Kafka多线程Consumer的实例代码",在日常操作中,相信很多人在Kafka多线程Consumer的实例代码问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Kafka多线程Consumer的实例代码"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
多线程示例代码:
这里要根据自身需求开发,我这里只举一个简单的例子,就是几个分区就启动几个consumer,一一对应。
三个类:
Main:
public static void main(String[] args) {
String bootstrapServers = "kafka01:9092,kafka02:9092";
String groupId = "test";
String topic = "testtopic";
int consumerNum = 3;
ConsumerGroup cg = new ConsumerGroup(consumerNum,bootstrapServers,groupId,topic);
cg.execute();
}
import java.util.ArrayList;
import java.util.List;
public class ConsumerGroup {
private List consumers;
public ConsumerGroup(int consumerNum,String bootstrapServers,String groupId,String topic){
consumers = new ArrayList<>(consumerNum);
for(int i=0;i < consumerNum;i++){
ConsumerRunnable ConsumerRunnable = new ConsumerRunnable(bootstrapServers,groupId,topic);
consumers.add(ConsumerRunnable);
}
}
public void execute(){
for(ConsumerRunnable consumerRunnable:consumers){
new Thread(consumerRunnable).start();
}
}
}
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerRunnable implements Runnable{
private final KafkaConsumer consumer;
public ConsumerRunnable(String bootstrapServers,String groupId,String topic){
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
while (true) {
ConsumerRecords records = consumer.poll(10);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
poll方法详解:
(旧版本:多分区多线程 新版本:一个线程管理多个socket连接)
但新版本KafkaConsumer是双线程的,主线程负责:消息获取,rebalance,coordinator,位移提交等等,
另一个是后台心跳线程。
根据上边的各种配置,poll方法会找到offset,当获取了足够多的可用数据,或者等待时间超过了指定的超时时间,就会返回。
java consumer不是线程安全的,同一个KafkaConsumer用在了多个线程中,将会报Kafka Consumer is not safe for multi-threaded assess异常。可以加一个同步锁进行保护。
poll的超时参数,已经说过1000的话是超时设定,如果没有很多数据,也就等一秒,就返回了,比如定时5秒的将消息写入,就可以将超时参数设置为5000,达到效率最大化。
如果没有定时任务呢,那就设置为 Long.MAX_VALUE 未获取足够多的数据就无限等待。这里要捕获一下WakeupException。
到此,关于"Kafka多线程Consumer的实例代码"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
线程
代码
实例
学习
数据
方法
参数
多个
时间
更多
消息
帮助
实用
最大
安全
一一对应
接下来
三个
上边
任务
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
手机网络安全心得体会3000字
怎样开启谷歌pop服务器
咸宁测试软件开发
会员卡消费 数据库安全
cacd数据库
莆田软件开发培训学校
数据库提示无权打开文件
瑞友天翼副服务器
数据库封锁的作用是什么
数据库原理与技术申时凯
长沙学软件开发哪里好
红杏林公司招聘网络技术人员
辽事通个人身份认证数据库异常
全天候网络技术管理
centos7服务器防护
东方财富 数据库
数据库事物的四个特效及含义
企业网络安全解决方案的思路
智能机床工业互联网络安全协议
服务器内存图片
怎么玩java我的世界服务器
阿里巴巴第三代神龙云服务器
win7搭建服务器
怎么用电脑本地连接服务器
网络安全科技产品
服务器暂不可用是怎么回事
数据库中什么地方打引号
取消服务器ie浏览器高安全
云南省委网络安全办主任
表格类软件开发平台