千家信息网

Kafka笔记整理(三):消费形式验证与性能测试

发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,[TOC]Kafka消费形式验证前面的《Kafka笔记整理(一)》中有提到消费者的消费形式,说明如下:1、每个consumer属于一个consumer group,可以指定组id。group.id2、
千家信息网最后更新 2024年12月13日Kafka笔记整理(三):消费形式验证与性能测试

[TOC]


Kafka消费形式验证

前面的《Kafka笔记整理(一)》中有提到消费者的消费形式,说明如下:

1、每个consumer属于一个consumer group,可以指定组id。group.id2、消费形式:   组内:组内的消费者消费同一份数据;同时只能有一个consumer消费一个Topic中的1个partition;   一个consumer可以消费多个partitions中的消息。所以,对于一个topic,同一个group中推荐不能有多于   partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。   组间:每个消费组消费相同的数据,互不影响。3、在一个consumer多个线程的情况下,一个线程相当于一个消费者。   例如:partition为3,一个consumer起了3个线程消费,另一个后来的consumer就无法消费。

下面就来验证Kafka的消费形式,不过需要说明的是,在消费者的程序代码中,可以指定消费者的group.id(我们下面将会在配置文件中指定)。

而在使用kafka的shell命令时,其实也是可以指定配置文件来指定消费者的group.id的,如果不指定,那么kafka将会随机生成一个group.id(kafka-console-consumer.sh中的kafka.tools.ConsoleConsumer类,如果没有指定group.id,其策略是随机生成)。

在后面的程序代码中,会使用同一group.id开启4个消费的线程(因为我们创建的topic有3个partition),然后在终端中通过kafka shell来开启另外一个消费者,进而达到验证kafka消费形式的目的。

另外,在测试中使用的topic如下:

$ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181Topic:hadoop    PartitionCount:3        ReplicationFactor:3     Configs:        Topic: hadoop   Partition: 0    Leader: 103     Replicas: 103,101,102   Isr: 103,101,102        Topic: hadoop   Partition: 1    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103        Topic: hadoop   Partition: 2    Leader: 102     Replicas: 102,103,101   Isr: 102,103,101

即partition为3,副本因为也为3.

程序代码

KafkaProducerOps.java
package com.uplooking.bigdata.kafka.producer;import com.uplooking.bigdata.kafka.constants.Constants;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.IOException;import java.io.InputStream;import java.util.Properties;import java.util.Random;/** * 通过这个KafkaProducerOps向Kafka topic中生产相关的数据 * 

* Producer */public class KafkaProducerOps { public static void main(String[] args) throws IOException { /** * 专门加载配置文件 * 配置文件的格式: * key=value * * 在代码中要尽量减少硬编码 * 不要将代码写死,要可配置化 */ Properties properties = new Properties(); InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties"); properties.load(in); /** * 两个泛型参数 * 第一个泛型参数:指的就是kafka中一条记录key的类型 * 第二个泛型参数:指的就是kafka中一条记录value的类型 */ String[] girls = new String[]{"姚慧莹", "刘向前", "周 新", "杨柳"}; Producer producer = new KafkaProducer(properties); Random random = new Random(); int start = 1; for (int i = start; i <= start + 20; i++) { String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC); String key = i + ""; String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~"; ProducerRecord producerRecord = new ProducerRecord(topic, key, value); producer.send(producerRecord); } producer.close(); }}

KafkaConsumerOps.java
package com.uplooking.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.io.IOException;import java.util.Arrays;import java.util.Collection;import java.util.Properties;import java.util.concurrent.*;/** * 从kafka topic中消费数据 */public class KafkaConsumerOps {    public static void main(String[] args) throws IOException {        //线程池        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);        System.out.println("外部开始时间:" + System.currentTimeMillis());        for (int i =0; i < 4; i++){            ScheduledFuture schedule = service.schedule(                    new ConsumerThread(),                    5L,                    TimeUnit.SECONDS);        }    }}class ConsumerThread implements Runnable {    public void run() {        System.out.println("线程ID:" + Thread.currentThread().getId() + "线程开始时间:" + System.currentTimeMillis());        /**         * 两个泛型参数         * 第一个泛型参数:指的就是kafka中一条记录key的类型         * 第二个泛型参数:指的就是kafka中一条记录value的类型         */        Properties properties = new Properties();        try {            properties.load(KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties"));        } catch (IOException e) {            e.printStackTrace();        }        Consumer consumer = new KafkaConsumer(properties);        Collection topics = Arrays.asList("hadoop");        //消费者订阅topic        consumer.subscribe(topics);        ConsumerRecords consumerRecords = null;        while (true) {            //接下来就要从topic中拉取数据            consumerRecords = consumer.poll(1000);            //遍历每一条记录            for (ConsumerRecord consumerRecord : consumerRecords) {                long offset = consumerRecord.offset();                Object key = consumerRecord.key();                Object value = consumerRecord.value();                int partition = consumerRecord.partition();                System.out.println("CurrentThreadID: " + Thread.currentThread().getId() + "\toffset: " + offset + "\tpartition: " + partition + "\tkey: " + key + "\tvalue: " + value);            }        }    }}
MyKafkaPartitioner.java
package com.uplooking.bigdata.kafka.partitioner;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;import java.util.Random;/** * 创建自定义的分区,根据数据的key来进行划分 * 

* 可以根据key或者value的hashCode * 还可以根据自己业务上的定义将数据分散在不同的分区中 * 需求: * 根据用户输入的key的hashCode值和partition个数求模 */public class MyKafkaPartitioner implements Partitioner { public void configure(Map configs) { } /** * 根据给定的数据设置相关的分区 * * @param topic 主题名称 * @param key key * @param keyBytes 序列化之后的key * @param value value * @param valueBytes 序列化之后的value * @param cluster 当前集群的元数据信息 */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Integer partitionNums = cluster.partitionCountForTopic(topic); int targetPartition = -1; if (key == null || keyBytes == null) { targetPartition = new Random().nextInt(10000) % partitionNums; } else { int hashCode = key.hashCode(); targetPartition = hashCode % partitionNums; System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition); } return targetPartition; } public void close() { }}

Constants.java
package com.uplooking.bigdata.kafka.constants;public interface Constants {    /**     * 生产的key对应的常量     */    String KAFKA_PRODUCER_TOPIC = "producer.topic";}
producer.properties
############################# Producer Basics ############################## list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092# specify the compression codec for all data generated: none, gzip, snappy, lz4compression.type=none# name of the partitioner class for partitioning events; default partition spreads data randomlypartitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner# the maximum amount of time the client will wait for the response of a request#request.timeout.ms=# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for#max.block.ms=# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together#linger.ms=# the maximum size of a request in bytes#max.request.size=# the default batch size in bytes when batching multiple records sent to a partition#batch.size=# the total bytes of memory the producer can use to buffer records waiting to be sent to the server#buffer.memory=#####设置自定义的topicproducer.topic=hadoopkey.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializer
consumer.properties
# Zookeeper connection string# comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092# timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000#consumer group idgroup.id=test-consumer-group#consumer timeout#consumer.timeout.ms=5000key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializer
pom.xml

主要是kafka-clients的依赖:

        org.apache.kafka    kafka-clients    0.10.0.1  

测试

先在终端启动一个消费者,注意由于没有指定配置文件,所以其group.id是随机生成的:

$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181

接下来分别执行消费者的代码和生产者的代码,然后观察各个终端的输出。

生产者程序的终端输出如下:

key: 1, value: 今天的<--刘向前-->很美很美哦~, hashCode: 49, partition: 1key: 2, value: 今天的<--刘向前-->很美很美哦~, hashCode: 50, partition: 2key: 3, value: 今天的<--刘向前-->很美很美哦~, hashCode: 51, partition: 0key: 4, value: 今天的<--杨柳-->很美很美哦~, hashCode: 52, partition: 1key: 5, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 53, partition: 2key: 6, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 54, partition: 0key: 7, value: 今天的<--杨柳-->很美很美哦~, hashCode: 55, partition: 1key: 8, value: 今天的<--刘向前-->很美很美哦~, hashCode: 56, partition: 2key: 9, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 57, partition: 0key: 10, value: 今天的<--杨柳-->很美很美哦~, hashCode: 1567, partition: 1key: 11, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1568, partition: 2key: 12, value: 今天的<--周  新-->很美很美哦~, hashCode: 1569, partition: 0key: 13, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1570, partition: 1key: 14, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1571, partition: 2key: 15, value: 今天的<--刘向前-->很美很美哦~, hashCode: 1572, partition: 0key: 16, value: 今天的<--刘向前-->很美很美哦~, hashCode: 1573, partition: 1key: 17, value: 今天的<--杨柳-->很美很美哦~, hashCode: 1574, partition: 2key: 18, value: 今天的<--刘向前-->很美很美哦~, hashCode: 1575, partition: 0key: 19, value: 今天的<--杨柳-->很美很美哦~, hashCode: 1576, partition: 1key: 20, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1598, partition: 2key: 21, value: 今天的<--杨柳-->很美很美哦~, hashCode: 1599, partition: 0

消费者程序的终端输出如下:

外部开始时间:1521991118178线程ID:20线程开始时间:1521991123182线程ID:21线程开始时间:1521991123182线程ID:23线程开始时间:1521991123182线程ID:22线程开始时间:1521991123182CurrentThreadID: 22 offset: 78  partition: 1    key: 1  value: 今天的<--刘向前-->很美很美哦~CurrentThreadID: 22 offset: 79  partition: 1    key: 4  value: 今天的<--杨柳-->很美很美哦~CurrentThreadID: 22 offset: 80  partition: 1    key: 7  value: 今天的<--杨柳-->很美很美哦~CurrentThreadID: 22 offset: 81  partition: 1    key: 10 value: 今天的<--杨柳-->很美很美哦~CurrentThreadID: 22 offset: 82  partition: 1    key: 13 value: 今天的<--姚慧莹-->很美很美哦~CurrentThreadID: 23 offset: 81  partition: 0    key: 3  value: 今天的<--刘向前-->很美很美哦~CurrentThreadID: 23 offset: 82  partition: 0    key: 6  value: 今天的<--姚慧莹-->很美很美哦~CurrentThreadID: 23 offset: 83  partition: 0    key: 9  value: 今天的<--姚慧莹-->很美很美哦~CurrentThreadID: 23 offset: 84  partition: 0    key: 12 value: 今天的<--周  新-->很美很美哦~CurrentThreadID: 23 offset: 85  partition: 0    key: 15 value: 今天的<--刘向前-->很美很美哦~CurrentThreadID: 23 offset: 86  partition: 0    key: 18 value: 今天的<--刘向前-->很美很美哦~CurrentThreadID: 22 offset: 83  partition: 1    key: 16 value: 今天的<--刘向前-->很美很美哦~CurrentThreadID: 23 offset: 87  partition: 0    key: 21 value: 今天的<--杨柳-->很美很美哦~CurrentThreadID: 21 offset: 78  partition: 2    key: 2  value: 今天的<--刘向前-->很美很美哦~CurrentThreadID: 22 offset: 84  partition: 1    key: 19 value: 今天的<--杨柳-->很美很美哦~CurrentThreadID: 21 offset: 79  partition: 2    key: 5  value: 今天的<--姚慧莹-->很美很美哦~CurrentThreadID: 21 offset: 80  partition: 2    key: 8  value: 今天的<--刘向前-->很美很美哦~CurrentThreadID: 21 offset: 81  partition: 2    key: 11 value: 今天的<--姚慧莹-->很美很美哦~CurrentThreadID: 21 offset: 82  partition: 2    key: 14 value: 今天的<--姚慧莹-->很美很美哦~CurrentThreadID: 21 offset: 83  partition: 2    key: 17 value: 今天的<--杨柳-->很美很美哦~CurrentThreadID: 21 offset: 84  partition: 2    key: 20 value: 今天的<--姚慧莹-->很美很美哦~

消费者shell的终端输出如下:

$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181今天的<--刘向前-->很美很美哦~今天的<--姚慧莹-->很美很美哦~今天的<--刘向前-->很美很美哦~今天的<--姚慧莹-->很美很美哦~今天的<--姚慧莹-->很美很美哦~今天的<--杨柳-->很美很美哦~今天的<--姚慧莹-->很美很美哦~今天的<--刘向前-->很美很美哦~今天的<--姚慧莹-->很美很美哦~今天的<--姚慧莹-->很美很美哦~今天的<--周  新-->很美很美哦~今天的<--刘向前-->很美很美哦~今天的<--刘向前-->很美很美哦~今天的<--杨柳-->很美很美哦~今天的<--刘向前-->很美很美哦~今天的<--杨柳-->很美很美哦~今天的<--杨柳-->很美很美哦~今天的<--杨柳-->很美很美哦~今天的<--姚慧莹-->很美很美哦~今天的<--刘向前-->很美很美哦~今天的<--杨柳-->很美很美哦~

分析

因为使用kafka shell的消费者的group.id是随机生成的,所以其肯定可以消费到topic下partition的消息,这是属于组间的消费。

而由于在消费者的程序代码中,4个线程都是使用同一个group.id的(都是使用consumer.properties这个配置文件),按照理论知识的理解,因为topic hadoop只有3个partition,所以只能有3个线程即3个consumer进行消息的消费,而观察输出,通过线程ID,发现确实只有三个线程消费了topic中的消息,这也验证了kafka组内消息的消费形式。

Kafka性能测试

参考文档:https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing

生产能力测试

在kafka的安装目录的bin里有性能的评估工具bin/kafka-producer-perf-test.sh,主要输出4项指标,总共发送消息量(以MB为单位),每秒发送消息量(MB/second),发送消息总数,每秒发送消息数(records/second)。

测试如下:

[uplooking@uplooking01 ~]$ kafka-producer-perf-test.sh --topic flume-kafka --num-records 1000000 --producer-props bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092 --throughput 10000 --record-size 10049972 records sent, 9994.4 records/sec (0.95 MB/sec), 3.1 ms avg latency, 258.0 max latency.50200 records sent, 10040.0 records/sec (0.96 MB/sec), 2.4 ms avg latency, 141.0 max latency.50020 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency.50010 records sent, 10000.0 records/sec (0.95 MB/sec), 2.3 ms avg latency, 127.0 max latency.50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 24.0 max latency.50020 records sent, 10004.0 records/sec (0.95 MB/sec), 2.4 ms avg latency, 186.0 max latency.50010 records sent, 10002.0 records/sec (0.95 MB/sec), 15.1 ms avg latency, 466.0 max latency.50020 records sent, 10002.0 records/sec (0.95 MB/sec), 11.1 ms avg latency, 405.0 max latency.50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency.50030 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 20.0 max latency.50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 30.0 max latency.50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency.49990 records sent, 9998.0 records/sec (0.95 MB/sec), 1.4 ms avg latency, 49.0 max latency.50033 records sent, 10006.6 records/sec (0.95 MB/sec), 37.9 ms avg latency, 617.0 max latency.50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.5 ms avg latency, 74.0 max latency.50007 records sent, 10001.4 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency.50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.8 ms avg latency, 132.0 max latency.50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 15.0 max latency.50020 records sent, 10000.0 records/sec (0.95 MB/sec), 1.9 ms avg latency, 121.0 max latency.1000000 records sent, 9999.200064 records/sec (0.95 MB/sec), 4.96 ms avg latency, 617.00 ms max latency, 1 ms 50th, 3 ms 95th, 105 ms 99th, 541 ms 99.9th.

参数说明如下:

--num-records 1000000   总共生产的消息数量--throughput 10000      每秒需要生产的消息数量--record-size 100       每条消息的大小,单位为字节

消费能力测试

[uplooking@uplooking01 ~]$ kafka-consumer-perf-test.sh --topic flume-kafka --messages 1000000 --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092 --threads 3 --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760

上面的测试为需要消费一百万条消息,输出的参数说明如下:

开始时间     结束时间     消费消息总大小   每秒消费大小    消费消息总条数    每秒消费条数start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760
消费 线程 消息 消费者 数据 时间 代码 参数 测试 输出 配置 形式 文件 程序 终端 生产 验证 就是 类型 生成 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 百度地图服务器架构 sql是哪一年成为关系数据库 pubg显示当前服务器繁忙 网络安全三同步重点工作包括 中金财富证券做软件开发 湖北app手机软件开发 大专生在培训班学软件开发 樟树软件开发有限公司 长宁区常规软件开发收费标准 医疗机构服务器已到期是什么意思 单位服务器架设云盘 浙江省公安厅网络技术处 华北理工大学网络安全考研 cadencn 数据库 软件开发与甲方交流ppt 如何在服务器删除原来的挡板配置 高新技术模拟考试数据库 河北app软件开发价钱是多少 宿迁工业网络技术保养 学校网络安全隐患排查报告博客 魔兽世界 人多的服务器 vpn 连接服务器失败 网络安全三同步重点工作包括 武汉市信息网络安全协会 刚毕业应聘网络安全公司怎么样 生动的网络技术课堂 软件开发权证 向数组中插数据库 代理服务器到期了怎么办 南方电网 网络安全整改
0