千家信息网

Kafka笔记整理(二):Kafka Java API使用

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,[TOC]下面的测试代码使用的都是下面的topic:$ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking
千家信息网最后更新 2025年01月23日Kafka笔记整理(二):Kafka Java API使用

[TOC]


下面的测试代码使用的都是下面的topic:

$ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181Topic:hadoop    PartitionCount:3        ReplicationFactor:3     Configs:        Topic: hadoop   Partition: 0    Leader: 103     Replicas: 103,101,102   Isr: 103,101,102        Topic: hadoop   Partition: 1    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103        Topic: hadoop   Partition: 2    Leader: 102     Replicas: 102,103,101   Isr: 102,103,101

Kafka Java API之producer

关于producer API的使用说明,可以查看org.apache.kafka.clients.producer.KafkaProducer这个类的代码注释,有非常详细的说明,下面就直接给出程序代码及测试。

程序代码

KafkaProducerOps.java
package com.uplooking.bigdata.kafka.producer;import com.uplooking.bigdata.kafka.constants.Constants;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.IOException;import java.io.InputStream;import java.util.Properties;import java.util.Random;/** * 通过这个KafkaProducerOps向Kafka topic中生产相关的数据 * 

* Producer */public class KafkaProducerOps { public static void main(String[] args) throws IOException { /** * 专门加载配置文件 * 配置文件的格式: * key=value * * 在代码中要尽量减少硬编码 * 不要将代码写死,要可配置化 */ Properties properties = new Properties(); InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties"); properties.load(in); /** * 两个泛型参数 * 第一个泛型参数:指的就是kafka中一条记录key的类型 * 第二个泛型参数:指的就是kafka中一条记录value的类型 */ String[] girls = new String[]{"姚慧莹", "刘向前", "周 新", "杨柳"}; Producer producer = new KafkaProducer(properties); String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC); String key = "1"; String value = "今天的姑娘们很美"; ProducerRecord producerRecord = new ProducerRecord(topic, key, value); producer.send(producerRecord); producer.close(); }}

Constants.java
package com.uplooking.bigdata.kafka.constants;public interface Constants {    /**     * 生产的key对应的常量     */    String KAFKA_PRODUCER_TOPIC = "producer.topic";}
producer.properties
############################# Producer Basics ############################## list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092# specify the compression codec for all data generated: none, gzip, snappy, lz4compression.type=none# name of the partitioner class for partitioning events; default partition spreads data randomly# partitioner.class=# the maximum amount of time the client will wait for the response of a request#request.timeout.ms=# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for#max.block.ms=# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together#linger.ms=# the maximum size of a request in bytes#max.request.size=# the default batch size in bytes when batching multiple records sent to a partition#batch.size=# the total bytes of memory the producer can use to buffer records waiting to be sent to the server#buffer.memory=#####设置自定义的topicproducer.topic=hadoopkey.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializer

其实这个配置文件就是kafka conf目录下的配置文件,只是这里要做相应的修改,关于每个字段的含义,可以查看org.apache.kafka.clients.producer.KafkaProducer这个类的代码注释。

测试

在终端中启动消费者监听topic的消息:

[uplooking@uplooking02 ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181

然后执行生产者程序,再查看终端输出:

[uplooking@uplooking02 ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181 今天的姑娘们很美

Kafka Java API之consumer

程序代码

KafkaConsumerOps.java
package com.uplooking.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.io.IOException;import java.io.InputStream;import java.util.Arrays;import java.util.Collection;import java.util.Properties;public class KafkaConsumerOps {    public static void main(String[] args) throws IOException {        Properties properties = new Properties();        InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");        properties.load(in);        Consumer consumer = new KafkaConsumer(properties);        Collection topics = Arrays.asList("hadoop");        // 消费者订阅topic        consumer.subscribe(topics);        ConsumerRecords consumerRecords = null;        while (true) {            // 接下来就要从topic中拉取数据            consumerRecords = consumer.poll(1000);            // 遍历每一条记录            for (ConsumerRecord consumerRecord : consumerRecords) {                long offset = consumerRecord.offset();                int partition = consumerRecord.partition();                Object key = consumerRecord.key();                Object value = consumerRecord.value();                System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, value);            }        }    }}
consumer.properties
# Zookeeper connection string# comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092# timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000#consumer group idgroup.id=test-consumer-group#consumer timeout#consumer.timeout.ms=5000key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializer

测试

先执行消费者的代码,然后再执行生产者的代码,在消费者终端可以看到如下输出:

2   0   1   今天的姑娘们很美(分别是:offset partition key value)

Kafka Java API之partition

可以通过自定义partitioner来决定我们的消息应该存到哪个partition上,只需要在我们的代码上实现Partitioner接口即可。

程序代码

MyKafkaPartitioner.java
package com.uplooking.bigdata.kafka.partitioner;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;import java.util.Random;/** * 创建自定义的分区,根据数据的key来进行划分 * 

* 可以根据key或者value的hashCode * 还可以根据自己业务上的定义将数据分散在不同的分区中 * 需求: * 根据用户输入的key的hashCode值和partition个数求模 */public class MyKafkaPartitioner implements Partitioner { public void configure(Map configs) { } /** * 根据给定的数据设置相关的分区 * * @param topic 主题名称 * @param key key * @param keyBytes 序列化之后的key * @param value value * @param valueBytes 序列化之后的value * @param cluster 当前集群的元数据信息 */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Integer partitionNums = cluster.partitionCountForTopic(topic); int targetPartition = -1; if (key == null || keyBytes == null) { targetPartition = new Random().nextInt(10000) % partitionNums; } else { int hashCode = key.hashCode(); targetPartition = hashCode % partitionNums; System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition); } return targetPartition; } public void close() { }}

KafkaProducerOps.java
package com.uplooking.bigdata.kafka.producer;import com.uplooking.bigdata.kafka.constants.Constants;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.IOException;import java.io.InputStream;import java.util.Properties;import java.util.Random;/** * 通过这个KafkaProducerOps向Kafka topic中生产相关的数据 * 

* Producer */public class KafkaProducerOps { public static void main(String[] args) throws IOException { /** * 专门加载配置文件 * 配置文件的格式: * key=value * * 在代码中要尽量减少硬编码 * 不要将代码写死,要可配置化 */ Properties properties = new Properties(); InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties"); properties.load(in); /** * 两个泛型参数 * 第一个泛型参数:指的就是kafka中一条记录key的类型 * 第二个泛型参数:指的就是kafka中一条记录value的类型 */ String[] girls = new String[]{"姚慧莹", "刘向前", "周 新", "杨柳"}; Producer producer = new KafkaProducer(properties); Random random = new Random(); int start = 1; for (int i = start; i <= start + 9; i++) { String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC); String key = i + ""; String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~"; ProducerRecord producerRecord = new ProducerRecord(topic, key, value); producer.send(producerRecord); } producer.close(); }}

继续使用前面的消费者的代码,同时需要在producer.properties中指定我们定义的partitioner,如下:

partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner

测试

先执行消费者代码,然后再执行生产者代码,查看终端输出。

生产者终端输出(主要是自定义partitioner中的输出):

key: 1, value: 今天的<--刘向前-->很美很美哦~, hashCode: 49, partition: 1key: 2, value: 今天的<--杨柳-->很美很美哦~, hashCode: 50, partition: 2key: 3, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 51, partition: 0key: 4, value: 今天的<--周  新-->很美很美哦~, hashCode: 52, partition: 1key: 5, value: 今天的<--刘向前-->很美很美哦~, hashCode: 53, partition: 2key: 6, value: 今天的<--周  新-->很美很美哦~, hashCode: 54, partition: 0key: 7, value: 今天的<--周  新-->很美很美哦~, hashCode: 55, partition: 1key: 8, value: 今天的<--刘向前-->很美很美哦~, hashCode: 56, partition: 2key: 9, value: 今天的<--杨柳-->很美很美哦~, hashCode: 57, partition: 0key: 10, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1567, partition: 1

消费者终端输出:

3   0   3   今天的<--姚慧莹-->很美很美哦~4   0   6   今天的<--周  新-->很美很美哦~5   0   9   今天的<--杨柳-->很美很美哦~0   2   2   今天的<--杨柳-->很美很美哦~1   2   5   今天的<--刘向前-->很美很美哦~2   2   8   今天的<--刘向前-->很美很美哦~1   1   1   今天的<--刘向前-->很美很美哦~2   1   4   今天的<--周  新-->很美很美哦~3   1   7   今天的<--周  新-->很美很美哦~4   1   10  今天的<--姚慧莹-->很美很美哦~(分别是:offset partition key value)
0