Java使用Kafka的方法
发表于:2024-10-24 作者:千家信息网编辑
千家信息网最后更新 2024年10月24日,本篇内容主要讲解"Java使用Kafka的方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Java使用Kafka的方法"吧!1、maven依赖 o
千家信息网最后更新 2024年10月24日Java使用Kafka的方法
本篇内容主要讲解"Java使用Kafka的方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Java使用Kafka的方法"吧!
1、maven依赖
org.apache.kafka kafka-clients 0.11.0.0
2、Producer
2.1、producer发送消息
import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;/** * @author Thomas * @Description:最简单的kafka producer * @date 22:18 2019-7-5 */public class ProducerDemo { public static void main(String[] args) { Properties properties =new Properties(); //zookeeper服务器集群地址,用逗号隔开 properties.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //自定义producer拦截器 properties.put("interceptor.classes", "com.lt.kafka.producer.MyProducerInterceptor"); //自定义消息路由规则(消息发送到哪一个Partition中) //properties.put("partitioner.class", "com.lt.kafka.producer.MyPartition"); Producerproducer = null; try { producer = new KafkaProducer (properties); for (int i = 20; i < 40; i++) { String msg = "This is Message:" + i; /** * kafkaproducer中会同时调用自己的callback的onCompletion方法和producerIntercepter的onAcknowledgement方法。 * 关键源码:Callback interceptCallback = this.interceptors == null * callback : new InterceptorCallback<>(callback, * this.interceptors, tp); */ producer.send(new ProducerRecord ("leixiang", msg),new MyCallback()); } } catch (Exception e) { e.printStackTrace(); } finally { if(producer!=null) producer.close(); } }}
2.2、自定义producer拦截器
import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;/** * @author Thomas * @Description:自定义producer拦截器 * @date 22:21 2019-7-5 */public class MyProducerInterceptor implements ProducerInterceptor{ /** * 打印配置相关信息 */ public void configure(Map configs) { // TODO Auto-generated method stub System.out.println(configs.toString()); } /** * producer发送信息拦截方法 */ public ProducerRecord onSend(ProducerRecord record) { System.out.println("拦截处理前============="); String topic=record.topic(); String value=record.value(); System.out.println("拦截处理前的消息====:"+value); ProducerRecord record2=new ProducerRecord (topic, value+" (intercepted)"); System.out.println("拦截处理后的消息:"+record2.value()); System.out.println("拦截处理后==============="); return record2; } /** * 消息确认回调函数,和callback的onCompletion方法相似。 * 在kafkaProducer中,如果都设置,两者都会调用。 */ public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (metadata != null) System.out.println("MyProducerInterceptor onAcknowledgement:RecordMetadata=" + metadata.toString()); if (exception != null) exception.printStackTrace(); } /** * interceptor关闭回调 */ public void close() { System.out.println("MyProducerInterceptor is closed!"); }}
2.3、自定义消息路由规则
自定义路由规则,可以根据自己的需要定义消息发送到哪个分区。自定义路由规则需要实现Partitioner。
import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;/** * @author Thomas * @Description: * @date 22:24 2019-7-5 */public class MyPartition implements Partitioner { public void configure(Maparg0) { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) { // TODO Auto-generated method stub return 0; }}
3、Consumer
3.1、自动提交
import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;/** * @author Thomas * @Description: * @date 22:26 2019-7-5 */public class AutoCommitConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); props.put("group.id", "leixiang"); props.put("enable.auto.commit", "true"); //想要读取之前的数据,必须加上 //props.put("auto.offset.reset", "earliest"); /* 自动确认offset的时间间隔 */ props.put("auto.commit.interval.ms", "1000"); /* * 一旦consumer和kakfa集群建立连接, * consumer会以心跳的方式来高速集群自己还活着, * 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence */ props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。 //props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor"); @SuppressWarnings("resource") KafkaConsumerconsumer = new KafkaConsumer (props); try { /* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/ consumer.subscribe(Arrays.asList("leixiang")); while (true) { //轮询数据。如果缓冲区中没有数据,轮询等待的时间为毫秒。如果0,立即返回缓冲区中可用的任何记录,则返回空 ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } }}
3.2、手动提交
import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;/** * @author Thomas * @Description: * @date 22:28 2019-7-5 */public class ManualCommitConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092"); props.put("group.id", "leixiang"); props.put("enable.auto.commit", "false");//手动确认 /* 自动确认offset的时间间隔 */ props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest");//想要读取之前的数据,必须加上 /* * 一旦consumer和kakfa集群建立连接, * consumer会以心跳的方式来高速集群自己还活着, * 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence */ props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。 props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor"); KafkaConsumerconsumer = new KafkaConsumer (props); /* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/ consumer.subscribe(Arrays.asList("leixiang")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { //处理消息 saveMessage(record); //手动提交,并且设置Offset提交回调方法 //consumer.commitAsync(new MyOffsetCommitCallback()); consumer.commitAsync(); } } } public static void saveMessage(ConsumerRecord record){ System.out.printf("处理消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}
自定义Consumer拦截器
import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerInterceptor;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;/** * @author Thomas * @Description: * @date 22:29 2019-7-5 */public class MyConsumerInterceptor implements ConsumerInterceptor{public void configure(Map configs) { System.out.println("MyConsumerInterceptor configs>>>"+configs.toString()); }public ConsumerRecords onConsume(ConsumerRecords records) { System.out.println("onConsume"); return records; }public void onCommit(Map offsets) { System.out.println("onCommit"); }public void close() { System.out.println("MyConsumerInterceptor is closed!"); }}
自定义Offset提交回调方法
import java.util.Map;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.clients.consumer.OffsetCommitCallback;import org.apache.kafka.common.TopicPartition;/** * @author Thomas * @Description: * @date 22:31 2019-7-5 */public class MyOffsetCommitCallback implements OffsetCommitCallback { public void onComplete(Mapoffsets, Exception exception) { if (offsets != null) System.out.println("offsets>>>" + offsets.toString()); if (exception != null) exception.printStackTrace(); }}
到此,相信大家对"Java使用Kafka的方法"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
方法
消息
拦截器
处理
服务器
集群
服务
数据
规则
路由
订阅
手动
时间
逗号
配置
信息
内容
内心
功能
多个
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
java数据库时间
什么是网络安全接入控制
网络安全等级征求意见稿
steam服务器繁忙
江苏哪里招聘网络技术员
流放者柯南服务器倍率调不了
软件开发全流程文档
软件开发项目监理日志
网络安全工程师需要条件
软件开发哪个方式最好
数据库定义操作查询控制
互联网时代的技术科技论文
安卓手机网络安全
数据库开发技术介绍
计算机网络技术服务公司
uml是软件开发语言吗
软件开发工程师面经
网络安全分硬件和软件
互联网金融的科技条件
怎样导出数据库
网络安全等级征求意见稿
服务器网络受限
网络安全技术系统组成
集群服务器内存管理
计算机网络技术试题和答案
网络安全法与生活息息相关的规定
服务器中的文件夹
软件开发钱给了人不交货
网络安全分硬件和软件
海外服务器租用