消息队列之kafka(API)
发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,1.模拟实现kafka的生产者消费者(原生API)解决相关依赖:
千家信息网最后更新 2025年02月06日消息队列之kafka(API)
1.模拟实现kafka的生产者消费者(原生API)
解决相关依赖: org.apache.kafka kafka_2.12 2.1.0
生产者:
packagecom.zy.kafka;importjava.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;publicclassKafkaTest { publicstaticvoidmain(String[] args) { //1.加载配置文件 //1.1封装配置文件对象 Properties prps=newProperties(); //配置broker地址 prps.put("bootstrap.servers", "hadoop02:9092"); //配置ack级别:0 1 -1(all) prps.put("acks", "all"); //重试次数 prps.put("retries", 3); prps.put("batch.size", 16384); prps.put("linger.ms",1); prps.put("buffer.memory", 33554432); //指定(message的K-V)的序列化 prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //2.创建生产者对象(指定的key和value的泛型) Producerproducer=new KafkaProducer<>(prps); //生产者发送消息 for(inti=0;i<100;i++) { /** * ProducerRecord(topic, value) * topic:主题名称 * key: * value: */ //消息的封装对象 ProducerRecordpr=newProducerRecord("test_topic", "key"+i, "value"+i); producer.send(pr); }producer.close(); }}
消费者:
packagecom.zy.kafka;importjava.util.Arrays;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;publicclassKafkaTest { publicstaticvoidmain(String[] args) { //1.加载配置文件 //1.1封装配置文件对象 Properties prps=newProperties(); //配置broker地址 prps.put("bootstrap.servers", "hadoop02:9092"); //指定消费的组的ID prps.put("group.id", "test"); //是否启动自动提交(是否自动提交反馈信息,向zookeeper提交) prps.put("enable.auto.commit", "true"); //自动提交的时间间隔 prps.put("auto.commit.interval.ms", "1000"); //指定(message的K-V)的序列化 prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //创建kafka的消费者 KafkaConsumerconsumer=newKafkaConsumer<>(prps); //添加消费主题 consumer.subscribe(Arrays.asList("kafka_test")); //开始消费 while(true) { //设置从哪里开始消费,返回的是一个消费记录 ConsumerRecordspoll = consumer.poll(10); for(ConsumerRecordp:poll) { System.out.printf("offset=%d,key=%s,value=%s\n",p.offset(),p.key(),p.value()); } } }}
2.以shell命令的方式API
import java.io.IOException;import java.io.InputStream;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import kafka.admin.TopicCommand;public class KafkaAPI { public static void main(String[] args) throws IOException { /* kafka-topics.sh \ --create \ --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 \ --replication-factor 3 \ --partitions 10 \ --topic kafka_test11 */ //创建一个topic String ops[]=new String []{ "--create", "--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181", "--replication-factor","3", "--topic","zy_topic","--partitions","5" }; String list[]=new String[] { "--list", "--zookeeper", "hadoop01:2181,hadoop02:2181,hadoop03:2181" }; //以命令的方式提交 TopicCommand.main(list); }}
3. 高级API操作
shell中常用操作:
#!/usr/bin/env bash#查看kafka的topickafka-topics.sh --list --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181#查看kafkatopic的偏移量kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic kafka_api_r1p1#创建topickafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --partitions 3 --replication-factor 1 --topic kafka_api_r1p3#删除topickafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic act_inventory_r1p1_test1#查看具体的group 的偏移量kafka-consumer-groups.sh
①简单实现,kafka的消费者,并且将由kafka自动管理偏移量(单分区消费)
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/9 * * Time: 19:44 * * Description: 简单实现,kafka的消费者,并且将由kafka自动管理偏移量(单分区消费) */public class MyConsumer01 { private static Properties props = new Properties(); static { props.put("group.id", "kafka_api_group_2"); //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //开启offset自动提交 props.put("enable.auto.commit", "true"); //手动提交偏移量 //props.put("enable.auto.commit", "false"); //设置自动提交时间 props.put("auto.commit.interval.ms", "100"); //设置消费方式 props.put("auto.offset.reset","earliest"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } public static void main(String[] args) throws InterruptedException { String topic = "kafka_api_r1p1"; //实例化一个消费者 KafkaConsumer consumer = new KafkaConsumer<>(props); //消费者订阅主题,可以订阅多个主题// consumer.subscribe(Collections.singleton(topic)); consumer.subscribe(Arrays.asList(topic)); //死循环不停的从broker中拿数据 while(true){ ConsumerRecords records = consumer.poll(10); for(ConsumerRecord record : records){ System.out.printf("offset=%d,key=%s,value=%s",record.offset(), record.key(),record.value()); } Thread.sleep(2000); } //consumer.commitAsync(); 提交偏移量信息 }}
②实现多分区消费
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.Collections;import java.util.List;import java.util.Properties;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/10 * * Time: 8:55 * * Description: 实现多分区消费 */public class MyConsumer02 { private static Properties props = new Properties(); static{ //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "kafka_api_group_1"); //开启offset自动提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } public static void main(String[] args) { String topicName="kafka_api_r1p3"; //实例化一个消费者 KafkaConsumer consumer =new KafkaConsumer<>(props); //消费者订阅主题,可以订阅多个主题 consumer.subscribe(Arrays.asList(topicName)); while(true){ ConsumerRecords records = consumer.poll(Long.MAX_VALUE); //获取每个分区的数据 for(TopicPartition partition :records.partitions()){ System.out.println("开始消费第"+partition.partition()+"分区数据!"); List> partitionRecords = records.records(partition); //获取每个分区里的records for(ConsumerRecord partitionRecord:partitionRecords){ System.out.println("partition:"+partition.partition()+",key:"+partitionRecord.key()+",value" +partitionRecord.value()+",offset:"+partitionRecord.offset()); } //更新每个分区的偏移量(取分区中最后一个record的偏移量,就是这个分区的偏移量) long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset(); consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset +1))); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }}
③实现消费者从指定分区拉取数据
注意:
(1)kafka提供的消费者组内的协调功能就不再有效
(2)样的写法可能出现不同消费者分配了相同的分区,为了避免偏移量提交冲突,每个消费者实例的group_id要不重复
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.Collections;import java.util.List;import java.util.Properties;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/10 * * Time: 10:10 * * Description: 消费者从指定分区拉取数据 * 一旦指定特定的分区消费需要注意: * (1)kafka提供的消费者组内的协调功能就不再有效 * (2)样的写法可能出现不同消费者分配了相同的分区,为了避免偏移量提交冲突,每个消费者实例的group_id要不重复 */public class MyConsumer03 { private static Properties props = new Properties(); //实例化一个消费者 static KafkaConsumer consumer; static { //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "kafka_api_group_1"); //开启offset自动提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); } public static void main(String[] args) { //消费者订阅主题,并设置要拉取的分区 String topic="kafka_api_r1p3"; int partitionNum=0; //消费者订阅主题,并设置要拉取的分区 TopicPartition partition0 =new TopicPartition(topic,partitionNum); consumer.assign(Arrays.asList(partition0)); while(true){ ConsumerRecords records = consumer.poll(Long.MAX_VALUE); for(TopicPartition partition : records.partitions()){ List> partitionRecords = records.records(partition); for(ConsumerRecord partitionRecord:partitionRecords){ System.out.println("分区:"+partitionRecord.partition()+",key:"+partitionRecord.key()+",value:" +partitionRecord.value()+"offset:"+partitionRecord.offset()); } long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset(); consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset+1))); } } }}
④重置kafka组的offset
import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import java.text.SimpleDateFormat;import java.util.Arrays;import java.util.Date;import java.util.Properties;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/10 * * Time: 9:46 * * Description: 该API用于重置kafka组的offset */public class ReSetOffset { //用于重置的offset final private static String group="kafka_api_group_1"; final private static Properties props = new Properties(); static KafkaConsumer consumer; static{ props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); props.put("group.id",group); props.put("enable.auto.commit", "true"); //props.put("auto.offset.reset","earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer=new KafkaConsumer(props); } public static String resetOffset(String topic,long offset){ int partitionNums=getTopicPartitionNum(topic); for(int i=0;i(props); consumer_temp.assign(Arrays.asList(tp)); consumer_temp.seek(tp,offset); consumer_temp.close(); } consumer.close(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss "); return dateFormat.format(new Date())+ group +" ResetOffset Succeed!!"; } private static int getTopicPartitionNum(String topic){ int partitionNums=consumer.partitionsFor(topic).size(); return partitionNums; } public static void main(String[] args) { String topic="kafka_api_r1p1"; System.out.println(ReSetOffset.resetOffset(topic,0)); }}
⑤多线程版本的消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;import java.util.Arrays;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicBoolean;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/10 * * Time: 10:45 * * Description: 这是一个consumer的线程 */public class ConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; private final CountDownLatch latch; public ConsumerRunner(KafkaConsumer consumer, CountDownLatch latch) { this.consumer = consumer; this.latch = latch; } @Override public void run() { System.out.println("threadName....." + Thread.currentThread().getName()); try { consumer.subscribe(Arrays.asList("kafka_api_r1p1")); while (!closed.get()) { ConsumerRecords records = consumer.poll(150); for (ConsumerRecord record : records) System.out.printf("threadName= %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(), record.offset(), record.key(), record.value()); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } catch (WakeupException e) { if(!closed.get()){ throw e; } }finally { consumer.close(); latch.countDown(); } } public void shutdown(){ System.out.println("close ConsumerRunner"); closed.set(true); consumer.wakeup(); }}
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.ArrayList;import java.util.List;import java.util.Properties;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/10 * * Time: 10:52 * * Description: 这里主要测试多线程下的Consumer */public class RunConsumer { private static Properties props = new Properties(); static{ //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "kafka_api_group_1"); //开启offset自动提交 props.put("enable.auto.commit", "true"); //自动提交时间间隔 props.put("auto.commit.interval.ms", "1000"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } public static void main(String[] args) { //实例化一个消费者 final List consumers = new ArrayList<>(); final List> kafkaConsumers = new ArrayList<>(); for(int i=0;i<2;i++){ kafkaConsumers.add(new KafkaConsumer(props)); } //倒计时,利用await方法使主线程阻塞,利用countDown递减,当递减到0时,唤醒主线程,功能类似于join final CountDownLatch latch = new CountDownLatch(2); ExecutorService executor = Executors.newFixedThreadPool(2); for(int i=0;i<2;i++){ ConsumerRunner c= new ConsumerRunner(kafkaConsumers.get(i),latch); consumers.add(c); executor.submit(c); } /** * 这个方法的意思就是在jvm中增加一个关闭的钩子,当JVM关闭时,会执行系统中已经设置的所有 * 方法addShutdownHook添加的钩子,当系统执行完成这些钩子后,jvm才会关闭, * 所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁、关闭连接等操作。 */ Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { System.out.println("...................."); for(ConsumerRunner consumer:consumers){ consumer.shutdown(); } executor.shutdown(); try { executor.awaitTermination(5000, TimeUnit.MICROSECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } }}
消费
消费者
偏移
主题
配置
名字
地址
实例
序列
订阅
相同
对象
数据
线程
文件
生产者
钩子
集群
生产
功能
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
远程还原数据库
数据库时间带汉子要如何查询
临汾市移动网络安全宣传
服务器的硬盘数据怎么在普通电脑读取
小米平板显示服务器错误
依法加强网络安全管理
高性能数据处理服务器选择
零基础如何找网络安全工作
如何守住网络安全红线
海岛奇兵机器人升级数据库
ftp服务器文件复制
服务器处理速度跟什么关系最大
6月份网络安全
ibm 服务器 报警
东风本田云端数据库
php请求数据库
数据库导入提示主键
变电站网络安全验收
数据库建表约束为男或女测试
国内服务器市场占有率
网络安全检测系统论文
招一个数据库工程师
如何做一个家用文件服务器
山东应用网络技术有限公司
分布式数据库和云计算构架研究
服务器如何通过内网通信
图标引入两组数据库
查数据库所有表的件数
网络安全绘画中学生
基于构件的软件开发都是什么