千家信息网

Kafka的使用和错误解决

发表于:2025-01-27 作者:千家信息网编辑
千家信息网最后更新 2025年01月27日,Kafka的使用和错误解决一、下载kafka解压缩:配置环境变量vim /etc/profileexport KAFKA_HOME=/root/kafka_2.11-1.0.0export PATH=
千家信息网最后更新 2025年01月27日Kafka的使用和错误解决

Kafka的使用和错误解决

一、下载kafka解压缩:配置环境变量

vim /etc/profileexport KAFKA_HOME=/root/kafka_2.11-1.0.0export PATH=$PATH:$KAFKA_HOME/binsource /etc/profile

二 、kafka中需要使用zookeeper

(一)使用kafka自带的zookeeper

  1. 先将zookeeper启动,如果在伪分布式下,kafka已经集成了zk,在kafka中的config目录下。

       可以编辑config/zookeeper.properties修改zookeeper的端口号。

    后台启动zookeeper:
    [root@mail bin]# nohup zookeeper-server-start.sh ../config/zookeeper.properties &

  2. 启动broker
`[root@mail bin]# nohup kafka-server-start.sh ../config/server.properties &`

3.测试:模拟消息的消费和生产

(1)创建主题

[root@mail bin]# kafka-topics.sh --create --zookeeper localhost:2281 --topic KafkaTestTopic --partitions 1 --replication-factor 1Created topic "KafkaTestTopic".

(2)创建生产者

[root@mail bin]# kafka-console-producer.sh --topic KafkaTestTopic --broker-list localhost:9092

查看server.properties中的#listeners=PLAINTEXT://:9092,获取kafka的端口

(3)创建消费者

[root@mail bin]# kafka-console-consumer.sh --topic KafkaTestTopic --zookeeper localhost:2281

(二)使用非kafka自带的zookeeper

使用zookeeper(非kafka自带)[root@mail zookeeper-3.4.10]# bin/zkServer.sh start conf/zoo.cfg ZooKeeper JMX enabled by defaultUsing config: conf/zoo.cfgStarting zookeeper ... STARTED(1) 创建主题[root@mail kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic secondTopic --partitions 1 --replication-factor 1Created topic "secondTopic".(2)kafka启动[root@mail kafka_2.11-1.0.0]# nohup bin/kafka-server-start.sh config/server.properties &(3)kafka生产者[root@mail kafka_2.11-1.0.0]# kafka-console-producer.sh --topic KafkaTestTopic --broker-list localhost:9092(4)kafka消费者[root@mail kafka_2.11-1.0.0]#  bin/kafka-console-consumer.sh --topic KafkaTestTopic --zookeeper localhost:2181Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].(5)查看kafka中的数据[root@mail kafka_2.11-1.0.0]# lsbin  config  libs  LICENSE  logs  logs-kafka  nohup.out  NOTICE  site-docs[root@mail kafka_2.11-1.0.0]# cd logs-kafka/             #kafka中的数据存储目录##这个目录是在kafka的config/server.properties文件中进行配置的log.dirs=/root/kafka/kafka_2.11-1.0.0/logs-kafka[root@mail logs-kafka]# ls             #查看kafka中的主题cleaner-offset-checkpoint  __consumer_offsets-20  __consumer_offsets-33  __consumer_offsets-46  kafka_test-0__consumer_offsets-0       __consumer_offsets-21  __consumer_offsets-34  __consumer_offsets-47  KafkaTestTopic-0__consumer_offsets-1       __consumer_offsets-22  __consumer_offsets-35  __consumer_offsets-48  log-start-offset-checkpoint__consumer_offsets-10      __consumer_offsets-23  __consumer_offsets-36  __consumer_offsets-49  meta.properties__consumer_offsets-11      __consumer_offsets-24  __consumer_offsets-37  __consumer_offsets-5   My_LOVE_TOPIC-0__consumer_offsets-12      __consumer_offsets-25  __consumer_offsets-38  __consumer_offsets-6   mytopic-0__consumer_offsets-13      __consumer_offsets-26  __consumer_offsets-39  __consumer_offsets-7   recovery-point-offset-checkpoint__consumer_offsets-14      __consumer_offsets-27  __consumer_offsets-4   __consumer_offsets-8   replication-offset-checkpoint__consumer_offsets-15      __consumer_offsets-28  __consumer_offsets-40  __consumer_offsets-9   stock-quotation-0__consumer_offsets-16      __consumer_offsets-29  __consumer_offsets-41  hello-0                stock-quotation-avro-0__consumer_offsets-17      __consumer_offsets-3   __consumer_offsets-42  hello-1                stock-quotation-partition-0__consumer_offsets-18      __consumer_offsets-30  __consumer_offsets-43  hello-2                TEST-TOPIC-0__consumer_offsets-19      __consumer_offsets-31  __consumer_offsets-44  hello-3__consumer_offsets-2       __consumer_offsets-32  __consumer_offsets-45  hello-4[root@mail logs-kafka]# cd KafkaTestTopic-0/       #查看kakfa的主题为KafkaTestTopic的0号分区[root@mail KafkaTestTopic-0]# ls00000000000000000000.index  00000000000000000000.timeindex  leader-epoch-checkpoint00000000000000000000.log    00000000000000000063.snapshot[root@mail KafkaTestTopic-0]# tail -f 000000000000000000.log      #kafka中的数据存储文件(6)修改kafka的分区数,观察kafka的变化## 修改kafka分区数[root@mail kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic KafkaTestTopic --partitions 3WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectedAdding partitions succeeded![root@mail kafka_2.11-1.0.0]# lsbin  config  libs  LICENSE  logs  logs-kafka  nohup.out  NOTICE  site-docs[root@mail kafka_2.11-1.0.0]# cd logs-kafka/#发现出现kakfa的主题为KafkaTestTopic的0号分区,1号分区,2号分区,总共3个分区[root@mail logs-kafka]# lscleaner-offset-checkpoint  __consumer_offsets-20  __consumer_offsets-33  __consumer_offsets-46  kafka_test-0__consumer_offsets-0       __consumer_offsets-21  __consumer_offsets-34  __consumer_offsets-47  KafkaTestTopic-0__consumer_offsets-1       __consumer_offsets-22  __consumer_offsets-35  __consumer_offsets-48  KafkaTestTopic-1__consumer_offsets-10      __consumer_offsets-23  __consumer_offsets-36  __consumer_offsets-49  KafkaTestTopic-2__consumer_offsets-11      __consumer_offsets-24  __consumer_offsets-37  __consumer_offsets-5   log-start-offset-checkpoint__consumer_offsets-12      __consumer_offsets-25  __consumer_offsets-38  __consumer_offsets-6   meta.properties__consumer_offsets-13      __consumer_offsets-26  __consumer_offsets-39  __consumer_offsets-7   My_LOVE_TOPIC-0__consumer_offsets-14      __consumer_offsets-27  __consumer_offsets-4   __consumer_offsets-8   mytopic-0__consumer_offsets-15      __consumer_offsets-28  __consumer_offsets-40  __consumer_offsets-9   recovery-point-offset-checkpoint__consumer_offsets-16      __consumer_offsets-29  __consumer_offsets-41  hello-0                replication-offset-checkpoint__consumer_offsets-17      __consumer_offsets-3   __consumer_offsets-42  hello-1                stock-quotation-0__consumer_offsets-18      __consumer_offsets-30  __consumer_offsets-43  hello-2                stock-quotation-avro-0__consumer_offsets-19      __consumer_offsets-31  __consumer_offsets-44  hello-3                stock-quotation-partition-0__consumer_offsets-2       __consumer_offsets-32  __consumer_offsets-45  hello-4                TEST-TOPIC-0[root@mail KafkaTestTopic-1]# ls                  #查看kakfa的主题为KafkaTestTopic的1号分区00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint[root@mail KafkaTestTopic-1]# tail -f 00000000000000000000.log

三、可能出现的错误:
(1)
[root@mail bin]# kafka-topics.sh --create --zookeeper localhost:2281 --topic KafkaTestTopic --partitions 1 --replication-factor 1
Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
[2018-11-20 16:44:16,269] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.
(kafka.admin.TopicCommand$)

解决:修改server.properties中的:zookeeper.connect=localhost:2281,让2281端口号和zookeeper.properties中的zookeeper端口号一致,然后重启kafka。**

(2)
kafka.common.KafkaException: fetching topic metadata for topics [Set(KafkaTestTopic)] from broker [ArrayBuffer(BrokerEndPoint(0,123.125.50.7,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
(3)
[2018-11-20 17:28:53,411] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 52 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,513] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 53 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,617] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 54 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-11-20 17:28:53,721] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 55 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

解决(2)和(3)的错误:
修改server.properties中的
I、listeners=PLAINTEXT://localhost:9092
II、 advertised.listeners=PLAINTEXT://localhost:9092

(4) [2018-11-29 09:44:35,275] WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

解决:可能的原因:kafka未启动,重启启动kafka。

kafka中查看zookeeper状态:

bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0"

(5)Failed to find leader for Set(KafkaTestTopic-0) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)

解决:修改kafka配置文件

[root@mail config]# vim server.properties

修改:advertised.host.name=正确的IP地址

四、Kafka相关操作

(1)查看有哪些主题[root@mail ~]# kafka-topics.sh --describe --zookeeper localhost:2281Topic:KafkaTestTopic    PartitionCount:1    ReplicationFactor:1 Configs:    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0在kafka中每个分区都有一个编号,从0开始;在kafka中如果有多个副本的话,就会存在leader与follower的关系;Leader表示领导,Leader:0表示当前这个副本为leader所在的broker是哪一个。(2)只看主题名称[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281KafkaTestTopic(3)查看指定主题的信息[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopic(4)查看指定的topic是否存在[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281 --topic KafkaTestTopic或[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --list --zookeeper localhost:2281 | grep KafkaTestTopic(5) 修改主题的分区数[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --partitions 3WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectedAdding partitions succeeded![root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopicTopic:KafkaTestTopic    PartitionCount:3    ReplicationFactor:1 Configs:    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0    Topic: KafkaTestTopic   Partition: 1    Leader: 0   Replicas: 0 Isr: 0    Topic: KafkaTestTopic   Partition: 2    Leader: 0   Replicas: 0 Isr: 0(6)修改配置项[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --config flush.messages=1WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.         Going forward, please use kafka-configs.sh for this functionalityUpdated config for topic "KafkaTestTopic".[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopicTopic:KafkaTestTopic    PartitionCount:3    ReplicationFactor:1 Configs:flush.messages=1    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0    Topic: KafkaTestTopic   Partition: 1    Leader: 0   Replicas: 0 Isr: 0    Topic: KafkaTestTopic   Partition: 2    Leader: 0   Replicas: 0 Isr: 0(7)删除配置项[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281 --alter --topic KafkaTestTopic --delete-config flush.messagesWARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.         Going forward, please use kafka-configs.sh for this functionalityUpdated config for topic "KafkaTestTopic".[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --describe --zookeeper localhost:2281 --topic KafkaTestTopicTopic:KafkaTestTopic    PartitionCount:3    ReplicationFactor:1 Configs:    Topic: KafkaTestTopic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0    Topic: KafkaTestTopic   Partition: 1    Leader: 0   Replicas: 0 Isr: 0    Topic: KafkaTestTopic   Partition: 2    Leader: 0   Replicas: 0 Isr: 0(8)删除主题[root@mail kafka_2.11-1.0.0]# kafka-topics.sh --zookeeper localhost:2281  --delete  --topic KafkaTestTopic注意:删除只是标记删除;当服务器重启就会删除已标记的topic,这个和kafka的版本有关。
五、Java中Kakfa配置(1).Java中使用kafka的Producer端配置        Properties props=new Properties();        // Kafka服务端的主机名和端口号,多个的话,使用逗号分隔        props.put("bootstrap.servers","ip:9092");        // 等待所有副本节点的应答        props.put("acks", "all");        // 消息发送最大尝试次数        props.put("retries",0);        // 一批消息处理大小        props.put("batch.size","16384");        // 请求延时        props.put("linger.ms",1);        // 发送缓存区内存大小        props.put("buffer.memory", 33554430);        // key序列化        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // value序列化        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");(2).Java中使用kafka的Consumer端配置        Properties props=new Properties();        // 定义kakfa 服务的地址,不需要将所有broker指定上        props.put("bootstrap.servers","ip:9092");        // 制定consumer group        props.put("group.id","test1");        // 是否自动确认offset        props.put("enable.auto.commit", "true");        // 自动确认offset的时间间隔        props.put("auto.commit.interval.ms", "1000");        // key的序列化类        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        // value的序列化类        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
0