Java使用Kafka的方法
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,本篇内容主要讲解"Java使用Kafka的方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Java使用Kafka的方法"吧!1、maven依赖 o
千家信息网最后更新 2025年02月05日Java使用Kafka的方法
本篇内容主要讲解"Java使用Kafka的方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Java使用Kafka的方法"吧!
1、maven依赖
org.apache.kafka kafka-clients 0.11.0.0
2、Producer
2.1、producer发送消息
import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;/** * @author Thomas * @Description:最简单的kafka producer * @date 22:18 2019-7-5 */public class ProducerDemo { public static void main(String[] args) { Properties properties =new Properties(); //zookeeper服务器集群地址,用逗号隔开 properties.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //自定义producer拦截器 properties.put("interceptor.classes", "com.lt.kafka.producer.MyProducerInterceptor"); //自定义消息路由规则(消息发送到哪一个Partition中) //properties.put("partitioner.class", "com.lt.kafka.producer.MyPartition"); Producerproducer = null; try { producer = new KafkaProducer (properties); for (int i = 20; i < 40; i++) { String msg = "This is Message:" + i; /** * kafkaproducer中会同时调用自己的callback的onCompletion方法和producerIntercepter的onAcknowledgement方法。 * 关键源码:Callback interceptCallback = this.interceptors == null * callback : new InterceptorCallback<>(callback, * this.interceptors, tp); */ producer.send(new ProducerRecord ("leixiang", msg),new MyCallback()); } } catch (Exception e) { e.printStackTrace(); } finally { if(producer!=null) producer.close(); } }}
2.2、自定义producer拦截器
import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;/** * @author Thomas * @Description:自定义producer拦截器 * @date 22:21 2019-7-5 */public class MyProducerInterceptor implements ProducerInterceptor{ /** * 打印配置相关信息 */ public void configure(Map configs) { // TODO Auto-generated method stub System.out.println(configs.toString()); } /** * producer发送信息拦截方法 */ public ProducerRecord onSend(ProducerRecord record) { System.out.println("拦截处理前============="); String topic=record.topic(); String value=record.value(); System.out.println("拦截处理前的消息====:"+value); ProducerRecord record2=new ProducerRecord (topic, value+" (intercepted)"); System.out.println("拦截处理后的消息:"+record2.value()); System.out.println("拦截处理后==============="); return record2; } /** * 消息确认回调函数,和callback的onCompletion方法相似。 * 在kafkaProducer中,如果都设置,两者都会调用。 */ public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (metadata != null) System.out.println("MyProducerInterceptor onAcknowledgement:RecordMetadata=" + metadata.toString()); if (exception != null) exception.printStackTrace(); } /** * interceptor关闭回调 */ public void close() { System.out.println("MyProducerInterceptor is closed!"); }}
2.3、自定义消息路由规则
自定义路由规则,可以根据自己的需要定义消息发送到哪个分区。自定义路由规则需要实现Partitioner。
import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;/** * @author Thomas * @Description: * @date 22:24 2019-7-5 */public class MyPartition implements Partitioner { public void configure(Maparg0) { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) { // TODO Auto-generated method stub return 0; }}
3、Consumer
3.1、自动提交
import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;/** * @author Thomas * @Description: * @date 22:26 2019-7-5 */public class AutoCommitConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); props.put("group.id", "leixiang"); props.put("enable.auto.commit", "true"); //想要读取之前的数据,必须加上 //props.put("auto.offset.reset", "earliest"); /* 自动确认offset的时间间隔 */ props.put("auto.commit.interval.ms", "1000"); /* * 一旦consumer和kakfa集群建立连接, * consumer会以心跳的方式来高速集群自己还活着, * 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence */ props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。 //props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor"); @SuppressWarnings("resource") KafkaConsumerconsumer = new KafkaConsumer (props); try { /* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/ consumer.subscribe(Arrays.asList("leixiang")); while (true) { //轮询数据。如果缓冲区中没有数据,轮询等待的时间为毫秒。如果0,立即返回缓冲区中可用的任何记录,则返回空 ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } }}
3.2、手动提交
import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;/** * @author Thomas * @Description: * @date 22:28 2019-7-5 */public class ManualCommitConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); props.put("group.id", "leixiang"); props.put("enable.auto.commit", "false");//手动确认 /* 自动确认offset的时间间隔 */ props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest");//想要读取之前的数据,必须加上 /* * 一旦consumer和kakfa集群建立连接, * consumer会以心跳的方式来高速集群自己还活着, * 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence */ props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。 props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor"); KafkaConsumerconsumer = new KafkaConsumer (props); /* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/ consumer.subscribe(Arrays.asList("leixiang")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { //处理消息 saveMessage(record); //手动提交,并且设置Offset提交回调方法 //consumer.commitAsync(new MyOffsetCommitCallback()); consumer.commitAsync(); } } } public static void saveMessage(ConsumerRecord record){ System.out.printf("处理消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}
自定义Consumer拦截器
import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerInterceptor;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;/** * @author Thomas * @Description: * @date 22:29 2019-7-5 */public class MyConsumerInterceptor implements ConsumerInterceptor{public void configure(Map configs) { System.out.println("MyConsumerInterceptor configs>>>"+configs.toString()); }public ConsumerRecords onConsume(ConsumerRecords records) { System.out.println("onConsume"); return records; }public void onCommit(Map offsets) { System.out.println("onCommit"); }public void close() { System.out.println("MyConsumerInterceptor is closed!"); }}
自定义Offset提交回调方法
import java.util.Map;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.clients.consumer.OffsetCommitCallback;import org.apache.kafka.common.TopicPartition;/** * @author Thomas * @Description: * @date 22:31 2019-7-5 */public class MyOffsetCommitCallback implements OffsetCommitCallback { public void onComplete(Mapoffsets, Exception exception) { if (offsets != null) System.out.println("offsets>>>" + offsets.toString()); if (exception != null) exception.printStackTrace(); }}
到此,相信大家对"Java使用Kafka的方法"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
方法
消息
拦截器
处理
服务器
集群
服务
数据
规则
路由
订阅
手动
时间
逗号
配置
信息
内容
内心
功能
多个
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络技术校园网的设计和组件
深珏网络技术有限公司
人大金仓数据库备份加密
广电宽带连接不到服务器
重装上阵服务器打不开
广州链动互联网科技有限公司官网
四川宜宾巴网络技术有限公司
运维如何管理不同区域的服务器
卫星校时服务器
知网和知网数据库
网络安全宣传学习方法
网络安全法心得250字
三级网络技术序列码
川拍法辅网络技术有限公司
域名服务器查询
晟同网络技术
服务器提升
重庆特色软件开发口碑推荐
普陀区服务器回收公司哪家好
网络安全企业宣传视频
怎样保证网络安全信息不被泄露
数据库约束的难点
no sql数据库包含
102服务器
数据库建表脚本语句
宿迁机械软件开发
软件开发写代码是程序员吗
妇联网络安全核查情况报告
网络安全企业宣传视频
我的世界神奇宝贝服务器刷新不了