千家信息网

kafka consumer怎么使用

发表于:2024-10-27 作者:千家信息网编辑
千家信息网最后更新 2024年10月27日,这篇文章主要介绍"kafka consumer怎么使用",在日常操作中,相信很多人在kafka consumer怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"k
千家信息网最后更新 2024年10月27日kafka consumer怎么使用

这篇文章主要介绍"kafka consumer怎么使用",在日常操作中,相信很多人在kafka consumer怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"kafka consumer怎么使用"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

consumer作为kafka当中一个重要元素,它的常用操作并不复杂,说白了无非就是2点,1、把数据poll出来,2、把位置标记上。我们找到kafka的java api doc,找到了官方提供的几种consumer操作的例子,逐一进行分析,看看都有几种操作类型。

Automatic Offset Committing

自动 Offset 提交

这个例子显示了一个基于offset自动提交的consumer api的简单应用。

Properties props = new Properties();     props.put("bootstrap.servers", "localhost:9092");     props.put("group.id", "test");     props.put("enable.auto.commit", "true");     props.put("auto.commit.interval.ms", "1000");     props.put("session.timeout.ms", "30000");     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");     KafkaConsumer consumer = new KafkaConsumer<>(props);     consumer.subscribe(Arrays.asList("foo", "bar"));     while (true) {         ConsumerRecords records = consumer.poll(100);         for (ConsumerRecord record : records)             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());     }

enable.auto.commit 意味着offset将会得到自动提交,而这个自动提交的时间间隔由 auto.commit.interval.ms 来进行控制。

客户端通过 bootstrap.servers 的配置来连接服务器,这个配值当中可以是一个或多个broker,需要注意的是,这个配置仅仅用来让客户端找到我们的server集群,而不需要把集群当中的所有服务器地址都列上。

在这个例子当中,客户端作为test group的一员,订阅了foo和bar2个topic。

( 这一段直接翻译很蹩脚,我会试着根据自己的理解翻译出来)首先假设,foo和bar这2个topic,都分别有3个partitions,同时我们将上面的代码在我们的机器上起3个进程,也就是说,在test group当中,目前有了3个consumer,一般来讲,这3个consumer会分别获得 foo和bar 的各一个partitions,这是前提。3个consumer会周期性的执行一个poll的动作(这个动作当中隐含的有一个heartbeat的发送,来告诉cluster我是活的),这样3个consumer会持续的保有他们对分配给自己的partition的访问的权利,如果某一个consumer失效了,也就是poll不再执行了,cluster会在一段时间( session.timeout.ms )之后把partitions分配给其他的consumer。

反序列化的设置,定义了如何转化bytes,这里我们把key和value都直接转化为string。

Manual Offset Control

手动的offset控制

除了周期性的自动提交offset之外,用户也可以在消息被消费了之后提交他们的offset。

某些情况下,消息的消费是和某些处理逻辑相关联的,我们可以用这样的方式,手动的在处理逻辑结束之后提交offset。

简要地说,在这个例子当中,我们希望每次至少消费200条消息并将它们插入数据库,之后再提交offset。如果仍然使用前面的自动提交方式,就可能出现消息已经被消费,但是插入数据库失败的情况。这里可以视作一个简单的事务封装。

但是,有没有另一种可能性,在插入数据库成功之后,提交offset之前,发生了错误,或者说是提交offset本身发生了错误,那么就可能出现某些消息被重复消费的情况。

个人认为这段话说的莫名其妙,简单地说,采用这样的方式,消息不会被丢失,但是有可能出现重复消费。

Properties props = new Properties();     props.put("bootstrap.servers", "localhost:9092");     props.put("group.id", "test");     props.put("enable.auto.commit", "false");     props.put("auto.commit.interval.ms", "1000");     props.put("session.timeout.ms", "30000");     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");     KafkaConsumer consumer = new KafkaConsumer<>(props);     consumer.subscribe(Arrays.asList("foo", "bar"));     final int minBatchSize = 200;     List> buffer = new ArrayList<>();     while (true) {         ConsumerRecords records = consumer.poll(100);         for (ConsumerRecord record : records) {             buffer.add(record);         }         if (buffer.size() >= minBatchSize) {             insertIntoDb(buffer);             consumer.commitSync();             buffer.clear();         }     }

上面的例子当中,我们用commitSync来标记所有的消息;在有些情况下,我们可能希望更加精确的控制offset,那么在下面的例子当中,我们可以在每一个partition当中分别控制offset的提交。

try {         while(running) {             ConsumerRecords records = consumer.poll(Long.MAX_VALUE);             for (TopicPartition partition : records.partitions()) {                 List> partitionRecords = records.records(partition);                 for (ConsumerRecord record : partitionRecords) {                     System.out.println(record.offset() + ": " + record.value());                 }                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));             }         }     } finally {       consumer.close();     }

注意:提交的offset应该是next message,所以,提交的时候需要在当前最后一条的基础上+1.

Manual Partition Assignment

手动的分区分配

前面的例子当中,我们订阅一个topic,然后让kafka把该topic当中的不同partitions,公平的在一个consumer group内部进行分配。那么,在某些情况下,我们希望能够具体的指定partitions的分配关系。

  • 如果某个进程在本地管理了和partition相关的状态,那么它只需要获得跟他相关partition。


  • 如果某个进程自身具备高可用性,那么就不需要kafka来检测错误并重新分配partition,因为消费者进程会在另一台设备上重新启动。

要使用这种模式,可以用assign方法来代替subscribe,具体指定一个partitions列表。

String topic = "foo";     TopicPartition partition0 = new TopicPartition(topic, 0);     TopicPartition partition1 = new TopicPartition(topic, 1);     consumer.assign(Arrays.asList(partition0, partition1));

分配之后,就可以像前面的例子一样,在循环当中调用poll来消费消息。手动的分区分配不需要组协调,所以消费进程失效之后,不会引发partition的重新分配,每一个消费者都是独立工作的,即使它和其他消费者属于同一个group。为了避免offset提交的冲突,在这种情况下,通常我们需要保证每一个consumer使用自己的group id。

需要注意的是,手动partition分配和通过subscribe实现的动态的分区分配,2种方式是不能混合使用的。

到此,关于"kafka consumer怎么使用"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

分配 消费 例子 消息 情况 手动 进程 数据 方式 学习 控制 客户 客户端 数据库 消费者 错误 面的 也就是 动作 周期 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 惠普刀片服务器换完风扇不进系统 观看网络安全教育后的新的视频 sql数据库高版本备份到低版本 网络安全监测探针设备A型 山西网络安全培训机构 服务器主机风扇会转吗 金铲铲之战不同服务器可以联机吗 云南专业网络技术服务工程 数据库机器人代码 中兴网络技术支持工程师跳槽 银行服务器怎么更改预留手机号码 网络安全与第三方共建计划 组织开展网络安全进社区活动 辽宁什么是网络技术服务推广 sql移动系统数据库的表 武汉淘客软件开发 吉林网络安全产业 苏州学习软件开发公司排名 成都四零七网络技术有限公司 C 数据库并行查询 司法厅网络安全攻防 深圳宽带首选dns服务器怎么填 小游戏服务器在哪里找 扬州商城软件开发要多少钱 12351网络安全宣传 个人电脑web服务器 成都医联科技互联网电子病历 软件开发的体会与心得 网络技术服务主要有哪些 网络技术支撑的分类是什么
0