千家信息网

消息队列之kafka(API)

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,1.模拟实现kafka的生产者消费者(原生API)解决相关依赖:
千家信息网最后更新 2025年02月06日消息队列之kafka(API)

1.模拟实现kafka的生产者消费者(原生API)

解决相关依赖:    org.apache.kafka    kafka_2.12    2.1.0

生产者:

packagecom.zy.kafka;importjava.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;publicclassKafkaTest {    publicstaticvoidmain(String[] args) {        //1.加载配置文件        //1.1封装配置文件对象        Properties prps=newProperties();        //配置broker地址        prps.put("bootstrap.servers", "hadoop02:9092");        //配置ack级别:0 1 -1(all)        prps.put("acks", "all");        //重试次数        prps.put("retries", 3);        prps.put("batch.size", 16384);        prps.put("linger.ms",1);        prps.put("buffer.memory", 33554432);        //指定(message的K-V)的序列化        prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        //2.创建生产者对象(指定的key和value的泛型)        Producerproducer=new KafkaProducer<>(prps);        //生产者发送消息        for(inti=0;i<100;i++) {            /**             * ProducerRecord(topic, value)             * topic:主题名称             * key:             * value:             */            //消息的封装对象            ProducerRecordpr=newProducerRecord("test_topic", "key"+i, "value"+i);            producer.send(pr);        }producer.close();    }}

消费者:

packagecom.zy.kafka;importjava.util.Arrays;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;publicclassKafkaTest {    publicstaticvoidmain(String[] args) {        //1.加载配置文件        //1.1封装配置文件对象        Properties prps=newProperties();        //配置broker地址        prps.put("bootstrap.servers", "hadoop02:9092");        //指定消费的组的ID        prps.put("group.id", "test");        //是否启动自动提交(是否自动提交反馈信息,向zookeeper提交)        prps.put("enable.auto.commit", "true");        //自动提交的时间间隔        prps.put("auto.commit.interval.ms", "1000");        //指定(message的K-V)的序列化        prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        //创建kafka的消费者        KafkaConsumerconsumer=newKafkaConsumer<>(prps);        //添加消费主题        consumer.subscribe(Arrays.asList("kafka_test"));        //开始消费        while(true) {            //设置从哪里开始消费,返回的是一个消费记录            ConsumerRecordspoll = consumer.poll(10);            for(ConsumerRecordp:poll) {                System.out.printf("offset=%d,key=%s,value=%s\n",p.offset(),p.key(),p.value());            }        }    }}

2.以shell命令的方式API

import java.io.IOException;import java.io.InputStream;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import kafka.admin.TopicCommand;public class KafkaAPI {    public static void main(String[] args) throws IOException {        /*             kafka-topics.sh \            --create \            --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 \            --replication-factor 3 \            --partitions 10 \            --topic kafka_test11         */        //创建一个topic        String ops[]=new String []{            "--create",            "--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181",            "--replication-factor","3",            "--topic","zy_topic","--partitions","5"        };        String list[]=new String[] {                "--list",                "--zookeeper",                "hadoop01:2181,hadoop02:2181,hadoop03:2181"        };        //以命令的方式提交        TopicCommand.main(list);    }}

3. 高级API操作

shell中常用操作:

#!/usr/bin/env bash#查看kafka的topickafka-topics.sh --list --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181#查看kafkatopic的偏移量kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic kafka_api_r1p1#创建topickafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --partitions 3  --replication-factor 1  --topic kafka_api_r1p3#删除topickafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic act_inventory_r1p1_test1#查看具体的group 的偏移量kafka-consumer-groups.sh 

①简单实现,kafka的消费者,并且将由kafka自动管理偏移量(单分区消费)

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;/** *  * Created with IntelliJ IDEA. *  * User: ZZY *  * Date: 2019/9/9 *  * Time: 19:44 *  * Description:  简单实现,kafka的消费者,并且将由kafka自动管理偏移量(单分区消费) */public class MyConsumer01 {    private static Properties props = new Properties();    static {        props.put("group.id", "kafka_api_group_2");        //设置kafka集群的地址        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");        //开启offset自动提交        props.put("enable.auto.commit", "true");        //手动提交偏移量        //props.put("enable.auto.commit", "false");        //设置自动提交时间        props.put("auto.commit.interval.ms", "100");        //设置消费方式        props.put("auto.offset.reset","earliest");        //序列化器        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    }    public static void main(String[] args) throws InterruptedException {        String topic = "kafka_api_r1p1";        //实例化一个消费者        KafkaConsumer consumer = new KafkaConsumer<>(props);        //消费者订阅主题,可以订阅多个主题//        consumer.subscribe(Collections.singleton(topic));        consumer.subscribe(Arrays.asList(topic));        //死循环不停的从broker中拿数据        while(true){            ConsumerRecords records = consumer.poll(10);            for(ConsumerRecord record : records){                System.out.printf("offset=%d,key=%s,value=%s",record.offset(),                        record.key(),record.value());            }            Thread.sleep(2000);        }        //consumer.commitAsync(); 提交偏移量信息    }}

②实现多分区消费

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.Collections;import java.util.List;import java.util.Properties;/** *  * Created with IntelliJ IDEA. *  * User: ZZY *  * Date: 2019/9/10 *  * Time: 8:55 *  * Description: 实现多分区消费 */public class MyConsumer02 {    private static Properties props = new Properties();    static{        //设置kafka集群的地址        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");        //设置消费者组,组名字自定义,组名字相同的消费者在一个组        props.put("group.id", "kafka_api_group_1");        //开启offset自动提交        props.put("enable.auto.commit", "false");        //序列化器        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    }    public static void main(String[] args) {        String topicName="kafka_api_r1p3";        //实例化一个消费者        KafkaConsumer consumer =new KafkaConsumer<>(props);        //消费者订阅主题,可以订阅多个主题        consumer.subscribe(Arrays.asList(topicName));        while(true){            ConsumerRecords records  = consumer.poll(Long.MAX_VALUE);            //获取每个分区的数据            for(TopicPartition partition :records.partitions()){                System.out.println("开始消费第"+partition.partition()+"分区数据!");                List> partitionRecords  = records.records(partition);                //获取每个分区里的records                for(ConsumerRecord partitionRecord:partitionRecords){                    System.out.println("partition:"+partition.partition()+",key:"+partitionRecord.key()+",value"                    +partitionRecord.value()+",offset:"+partitionRecord.offset());                }                //更新每个分区的偏移量(取分区中最后一个record的偏移量,就是这个分区的偏移量)                long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset();                consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset +1)));            }            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}

③实现消费者从指定分区拉取数据
注意:
  (1)kafka提供的消费者组内的协调功能就不再有效
  (2)样的写法可能出现不同消费者分配了相同的分区,为了避免偏移量提交冲突,每个消费者实例的group_id要不重复

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.Collections;import java.util.List;import java.util.Properties;/** *  * Created with IntelliJ IDEA. *  * User: ZZY *  * Date: 2019/9/10 *  * Time: 10:10 *  * Description: 消费者从指定分区拉取数据 *      一旦指定特定的分区消费需要注意: *          (1)kafka提供的消费者组内的协调功能就不再有效 *          (2)样的写法可能出现不同消费者分配了相同的分区,为了避免偏移量提交冲突,每个消费者实例的group_id要不重复 */public class MyConsumer03 {    private static Properties props = new Properties();    //实例化一个消费者    static KafkaConsumer consumer;    static {        //设置kafka集群的地址        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");        //设置消费者组,组名字自定义,组名字相同的消费者在一个组        props.put("group.id", "kafka_api_group_1");        //开启offset自动提交        props.put("enable.auto.commit", "false");        //序列化器        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        consumer = new KafkaConsumer<>(props);    }    public static void main(String[] args) {        //消费者订阅主题,并设置要拉取的分区        String topic="kafka_api_r1p3";        int partitionNum=0;        //消费者订阅主题,并设置要拉取的分区        TopicPartition partition0 =new TopicPartition(topic,partitionNum);        consumer.assign(Arrays.asList(partition0));        while(true){            ConsumerRecords records  = consumer.poll(Long.MAX_VALUE);            for(TopicPartition partition : records.partitions()){                List> partitionRecords  = records.records(partition);                for(ConsumerRecord partitionRecord:partitionRecords){                    System.out.println("分区:"+partitionRecord.partition()+",key:"+partitionRecord.key()+",value:"                            +partitionRecord.value()+"offset:"+partitionRecord.offset());                }                long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset();                consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset+1)));            }        }    }}

④重置kafka组的offset

import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import java.text.SimpleDateFormat;import java.util.Arrays;import java.util.Date;import java.util.Properties;/** *  * Created with IntelliJ IDEA. *  * User: ZZY *  * Date: 2019/9/10 *  * Time: 9:46 *  * Description:  该API用于重置kafka组的offset */public class ReSetOffset {    //用于重置的offset    final private static String group="kafka_api_group_1";    final private static Properties props = new Properties();    static KafkaConsumer consumer;    static{        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");        props.put("group.id",group);        props.put("enable.auto.commit", "true");    //props.put("auto.offset.reset","earliest");        props.put("auto.commit.interval.ms", "1000");        props.put("key.deserializer",                "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer",                "org.apache.kafka.common.serialization.StringDeserializer");        consumer=new KafkaConsumer(props);    }    public static String resetOffset(String topic,long offset){        int partitionNums=getTopicPartitionNum(topic);        for(int i=0;i(props);            consumer_temp.assign(Arrays.asList(tp));            consumer_temp.seek(tp,offset);            consumer_temp.close();        }        consumer.close();        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss ");        return dateFormat.format(new Date())+ group +" ResetOffset Succeed!!";    }    private  static int  getTopicPartitionNum(String topic){        int partitionNums=consumer.partitionsFor(topic).size();        return partitionNums;    }    public static void main(String[] args) {        String topic="kafka_api_r1p1";        System.out.println(ReSetOffset.resetOffset(topic,0));    }}

⑤多线程版本的消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;import java.util.Arrays;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicBoolean;/** *  * Created with IntelliJ IDEA. *  * User: ZZY *  * Date: 2019/9/10 *  * Time: 10:45 *  * Description: 这是一个consumer的线程 */public class ConsumerRunner implements Runnable {    private final AtomicBoolean closed = new AtomicBoolean(false);    private final KafkaConsumer consumer;    private final CountDownLatch latch;    public ConsumerRunner(KafkaConsumer consumer, CountDownLatch latch) {        this.consumer = consumer;        this.latch = latch;    }    @Override    public void run() {        System.out.println("threadName....." + Thread.currentThread().getName());        try {            consumer.subscribe(Arrays.asList("kafka_api_r1p1"));            while (!closed.get()) {                ConsumerRecords records = consumer.poll(150);                for (ConsumerRecord record : records)                    System.out.printf("threadName= %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(), record.offset(), record.key(), record.value());            }            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }        } catch (WakeupException e) {            if(!closed.get()){                throw e;            }        }finally {            consumer.close();            latch.countDown();        }    }    public void shutdown(){        System.out.println("close ConsumerRunner");        closed.set(true);        consumer.wakeup();    }}
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.ArrayList;import java.util.List;import java.util.Properties;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;/** *  * Created with IntelliJ IDEA. *  * User: ZZY *  * Date: 2019/9/10 *  * Time: 10:52 *  * Description:  这里主要测试多线程下的Consumer */public class RunConsumer {    private static Properties props = new Properties();    static{        //设置kafka集群的地址        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");        //设置消费者组,组名字自定义,组名字相同的消费者在一个组        props.put("group.id", "kafka_api_group_1");        //开启offset自动提交        props.put("enable.auto.commit", "true");        //自动提交时间间隔        props.put("auto.commit.interval.ms", "1000");        //序列化器        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    }    public static void main(String[] args) {        //实例化一个消费者        final List consumers = new ArrayList<>();        final List> kafkaConsumers = new ArrayList<>();        for(int i=0;i<2;i++){            kafkaConsumers.add(new KafkaConsumer(props));        }        //倒计时,利用await方法使主线程阻塞,利用countDown递减,当递减到0时,唤醒主线程,功能类似于join        final CountDownLatch latch = new CountDownLatch(2);        ExecutorService executor = Executors.newFixedThreadPool(2);        for(int i=0;i<2;i++){            ConsumerRunner  c= new ConsumerRunner(kafkaConsumers.get(i),latch);            consumers.add(c);            executor.submit(c);        }        /**         * 这个方法的意思就是在jvm中增加一个关闭的钩子,当JVM关闭时,会执行系统中已经设置的所有         * 方法addShutdownHook添加的钩子,当系统执行完成这些钩子后,jvm才会关闭,         *  所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁、关闭连接等操作。         */        Runtime.getRuntime().addShutdownHook(new Thread(){            @Override            public void run() {                System.out.println("....................");                for(ConsumerRunner consumer:consumers){                    consumer.shutdown();                }                executor.shutdown();                try {                    executor.awaitTermination(5000, TimeUnit.MICROSECONDS);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        });        try {            latch.await();        } catch (InterruptedException e) {            e.printStackTrace();        }    }}
0