千家信息网

Java使用Kafka的方法

发表于:2024-10-24 作者:千家信息网编辑
千家信息网最后更新 2024年10月24日,本篇内容主要讲解"Java使用Kafka的方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Java使用Kafka的方法"吧!1、maven依赖 o
千家信息网最后更新 2024年10月24日Java使用Kafka的方法

本篇内容主要讲解"Java使用Kafka的方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Java使用Kafka的方法"吧!

1、maven依赖

    org.apache.kafka    kafka-clients    0.11.0.0

2、Producer

2.1、producer发送消息

import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;/** * @author Thomas * @Description:最简单的kafka producer * @date 22:18 2019-7-5 */public class ProducerDemo {    public static void main(String[] args) {        Properties properties =new Properties();        //zookeeper服务器集群地址,用逗号隔开        properties.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");        properties.put("acks", "all");        properties.put("retries", 0);        properties.put("batch.size", 16384);        properties.put("linger.ms", 1);        properties.put("buffer.memory", 33554432);        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        //自定义producer拦截器        properties.put("interceptor.classes", "com.lt.kafka.producer.MyProducerInterceptor");        //自定义消息路由规则(消息发送到哪一个Partition中)        //properties.put("partitioner.class", "com.lt.kafka.producer.MyPartition");        Producer producer = null;        try {            producer = new KafkaProducer(properties);            for (int i = 20; i < 40; i++) {                String msg = "This is Message:" + i;                /**                 * kafkaproducer中会同时调用自己的callback的onCompletion方法和producerIntercepter的onAcknowledgement方法。                 * 关键源码:Callback interceptCallback = this.interceptors == null                 * callback : new InterceptorCallback<>(callback,                 * this.interceptors, tp);                 */                producer.send(new ProducerRecord("leixiang", msg),new MyCallback());            }        } catch (Exception e) {            e.printStackTrace();        } finally {            if(producer!=null)                producer.close();        }    }}

2.2、自定义producer拦截器

import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;/** * @author Thomas * @Description:自定义producer拦截器 * @date 22:21 2019-7-5 */public class MyProducerInterceptor implements ProducerInterceptor{    /**     * 打印配置相关信息     */    public void configure(Map configs) {        // TODO Auto-generated method stub        System.out.println(configs.toString());    }    /**     * producer发送信息拦截方法     */    public ProducerRecord onSend(ProducerRecord record) {        System.out.println("拦截处理前=============");        String topic=record.topic();        String value=record.value();        System.out.println("拦截处理前的消息====:"+value);        ProducerRecord record2=new ProducerRecord(topic, value+" (intercepted)");        System.out.println("拦截处理后的消息:"+record2.value());        System.out.println("拦截处理后===============");        return record2;    }    /**     * 消息确认回调函数,和callback的onCompletion方法相似。     * 在kafkaProducer中,如果都设置,两者都会调用。     */    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {        if (metadata != null)            System.out.println("MyProducerInterceptor onAcknowledgement:RecordMetadata=" + metadata.toString());        if (exception != null)            exception.printStackTrace();    }    /**     * interceptor关闭回调     */    public void close() {        System.out.println("MyProducerInterceptor is closed!");    }}

2.3、自定义消息路由规则

自定义路由规则,可以根据自己的需要定义消息发送到哪个分区。自定义路由规则需要实现Partitioner。

import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;/** * @author Thomas * @Description: * @date 22:24 2019-7-5 */public class MyPartition implements Partitioner {    public void configure(Map arg0) {        // TODO Auto-generated method stub    }    public void close() {        // TODO Auto-generated method stub    }    public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) {        // TODO Auto-generated method stub        return 0;    }}

3、Consumer

3.1、自动提交

import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;/** * @author Thomas * @Description: * @date 22:26 2019-7-5 */public class AutoCommitConsumerDemo {    public static void main(String[] args) {        Properties props = new Properties();        props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");        props.put("group.id", "leixiang");        props.put("enable.auto.commit", "true");        //想要读取之前的数据,必须加上        //props.put("auto.offset.reset", "earliest");        /* 自动确认offset的时间间隔 */        props.put("auto.commit.interval.ms", "1000");        /*         * 一旦consumer和kakfa集群建立连接,         * consumer会以心跳的方式来高速集群自己还活着,         * 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence         */        props.put("session.timeout.ms", "30000");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        //配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。        //props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor");        @SuppressWarnings("resource")        KafkaConsumer consumer = new KafkaConsumer(props);        try {            /* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/            consumer.subscribe(Arrays.asList("leixiang"));            while (true) {                //轮询数据。如果缓冲区中没有数据,轮询等待的时间为毫秒。如果0,立即返回缓冲区中可用的任何记录,则返回空                ConsumerRecords records = consumer.poll(100);                for (ConsumerRecord record : records)                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),                        record.value());            }        } catch (Exception e) {            // TODO: handle exception            e.printStackTrace();        }    }}

3.2、手动提交

import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;/** * @author Thomas * @Description: * @date 22:28 2019-7-5 */public class ManualCommitConsumerDemo {    public static void main(String[] args) {        Properties props = new Properties();        props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");        props.put("group.id", "leixiang");        props.put("enable.auto.commit", "false");//手动确认        /* 自动确认offset的时间间隔 */        props.put("auto.commit.interval.ms", "1000");        props.put("auto.offset.reset", "earliest");//想要读取之前的数据,必须加上        /*         * 一旦consumer和kakfa集群建立连接,         * consumer会以心跳的方式来高速集群自己还活着,         * 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence         */        props.put("session.timeout.ms", "30000");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        //配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。        props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor");        KafkaConsumer consumer = new KafkaConsumer(props);        /* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/        consumer.subscribe(Arrays.asList("leixiang"));        while (true) {            ConsumerRecords records = consumer.poll(100);            for (ConsumerRecord record : records) {                //处理消息                saveMessage(record);                //手动提交,并且设置Offset提交回调方法                //consumer.commitAsync(new MyOffsetCommitCallback());                consumer.commitAsync();            }        }    }    public static void saveMessage(ConsumerRecord record){        System.out.printf("处理消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(),                record.value());    }}

自定义Consumer拦截器

import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerInterceptor;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;/** * @author Thomas * @Description: * @date 22:29 2019-7-5 */public class MyConsumerInterceptor implements ConsumerInterceptor{public void configure(Map configs) {        System.out.println("MyConsumerInterceptor configs>>>"+configs.toString());        }public ConsumerRecords onConsume(ConsumerRecords records) {        System.out.println("onConsume");        return records;        }public void onCommit(Map offsets) {        System.out.println("onCommit");        }public void close() {        System.out.println("MyConsumerInterceptor is closed!");        }}

自定义Offset提交回调方法

import java.util.Map;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.clients.consumer.OffsetCommitCallback;import org.apache.kafka.common.TopicPartition;/** * @author Thomas * @Description: * @date 22:31 2019-7-5 */public class MyOffsetCommitCallback implements OffsetCommitCallback {    public void onComplete(Map offsets, Exception exception) {        if (offsets != null)            System.out.println("offsets>>>" + offsets.toString());        if (exception != null)            exception.printStackTrace();    }}

到此,相信大家对"Java使用Kafka的方法"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0