如何进行Kafka 1.0.0 d代码示例分析
发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,这篇文章将为大家详细讲解有关如何进行Kafka 1.0.0 d代码示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。package kafka.d
千家信息网最后更新 2025年02月08日如何进行Kafka 1.0.0 d代码示例分析
这篇文章将为大家详细讲解有关如何进行Kafka 1.0.0 d代码示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
package kafka.demo;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;/** * *Description: kafka 1.0.0
* @author guangshihao * @date 2018年9月19日 * */public class KafkaProduderDemo { public static void main(String[] args) { Mapprops = new HashMap<>(); /* * acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1 * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。 * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。 * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。 * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。 * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。 * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。 * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失 */ props.put("acks", "1"); //配置默认的分区方式 props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); //配置topic的序列化类 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //配置value的序列化类 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /* * kafka broker对应的主机,格式为host1:port1,host2:port2 */ props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); //topic String topic = "test7"; KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props); for(int i = 1 ;i <= 100 ; i++) { String line = i+" this is a test "; ProducerRecord record = new ProducerRecord (topic,line ); producer.send(record); } producer.close(); } }//---------------------------------------------------------------------------------------------------------------------------package kafka.demo;import java.util.Arrays;import java.util.Properties;import java.util.Random;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;public class KafkaConsumerDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");props.put("group.id", "group_test7");//配置topic的序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//配置value的序列化类props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//自动同步offset props.put("enable.auto.commit","true"); //自动同步offset的时间间隔 props.put("auto.commit.intervals.ms", "2000"); //当在zookeeper中发现要消费的topic没有或者topic的offset不合法时自动设置为最小值,可以设的值为 latest, earliest, none,默认为largest props.put("auto.offset.reset", "earliest "); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test7"));//consumer.beginningOffsets("");try {while(true) {ConsumerRecords records = consumer.poll(1000);for(ConsumerRecord record: records) {System.out.println("partition:"+record.partition()+" "+record.value());}//consumer.commitSync();if((new Random(10)).nextInt()>5) {consumer.wakeup();}}}catch(WakeupException e) {e.printStackTrace();}finally {consumer.close();}}}
关于如何进行Kafka 1.0.0 d代码示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
数据
配置
序列
意味
代码
示例
分析
内容
持久性
文章
更多
知识
篇文章
同步
不错
最低
最小
成功
三个
主机
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
河北通用软件开发近期价格
sql数据库表的使用
河南企业软件开发价格
服务器芯片主要用啥材料
万方数据库高级检索在哪
网络安全厂家业绩排名
德钦天气预报软件开发
梦想精灵谷服务器神兽都刷在哪
传统商用开源到云原生数据库
win2008域控服务器
长沙物会网络技术有限公司
网络安全我知道教案幼儿
剑侠世界3一天开几个服务器
元原神服务器爆满
神武4新服务器多少级开始限制
山西移动公司网络技术中心
扩展服务器无法连接
如何创建数据库的用户名
软件开发转项目管理好吗
服务器构建家庭网络管理系统
东科优信网络安全吗
魔百盒服务器未连接怎么办
黑色玫瑰是哪里的服务器
中国网络安全法导致外资撤离
利用互联网科技发展
天子星餐饮提示数据库失败
信息网络技术职称考试
郫都网络安全周
学校监控服务器本地安装
袜子信息软件开发