千家信息网

部署Kafka群集

发表于:2025-02-12 作者:千家信息网编辑
千家信息网最后更新 2025年02月12日,前言关于kafka的工作机制,已经在上篇博文:Kafka原理及单机部署中详细写出来,这里只是将kafka的一个群集部署写了出来。博文大纲:一、环境准备二、部署zookeeper服务三、部署kafka集
千家信息网最后更新 2025年02月12日部署Kafka群集

前言

关于kafka的工作机制,已经在上篇博文:Kafka原理及单机部署中详细写出来,这里只是将kafka的一个群集部署写了出来。

博文大纲:
一、环境准备
二、部署zookeeper服务
三、部署kafka集群

一、环境准备

部署kafka群集所需的安装包,可以从我的网盘链接中下载。

二、部署zookeeper服务

1、主机kafka01配置如下

#部署zookeeper[root@kafka01 src]# tar zxf zookeeper-3.4.9.tar.gz[root@kafka01 src]# mv zookeeper-3.4.9 /usr/local/zookeeper#修改配置文件[root@kafka01 src]# cd /usr/local/zookeeper/conf/[root@kafka01 conf]# cp -p zoo_sample.cfg zoo.cfg[root@kafka01 conf]# sed -i 's/dataDir=\/tmp\/zookeeper/dataDir=\/usr\/local\/zookeeper\/data/g' zoo.cfg#直接群集节点信息,2888和3888端口用于群集内部通信[root@kafka01 conf]# echo "server.1 192.168.20.2:2888:3888" >> zoo.cfg [root@kafka01 conf]# echo "server.2 192.168.20.3:2888:3888" >> zoo.cfg[root@kafka01 conf]# echo "server.3 192.168.20.4:2888:3888" >> zoo.cfg[root@kafka01 conf]# egrep -v '^$|^#' zoo.cfg   #更改后的配置文件如下tickTime=2000   #节点之间的心跳检测时间单位为毫秒initLimit=10      #达到5个访问进行同步数据syncLimit=5      #节点之间检查失败次数超过后断开相应的节点dataDir=/usr/local/zookeeper/data    #日志文件存放路径clientPort=2181#声明参与集群的主机server.1 192.168.20.2:2888:3888server.2 192.168.20.3:2888:3888server.3 192.168.20.4:2888:3888#创建所需目录及设置节点的ID号[root@kafka01 conf]# mkdir /usr/local/zookeeper/data[root@kafka01 conf]# echo 1 > /usr/local/zookeeper/data/myid#将配置好的zookeeper目录复制到群集内的其他节点[root@kafka01 conf]# scp -r /usr/local/zookeeper/ root@192.168.20.3:/usr/local/[root@kafka01 conf]# scp -r /usr/local/zookeeper/ root@192.168.20.4:/usr/local/#启动zookeeper服务[root@kafka01 conf]# /usr/local/zookeeper/bin/zkServer.sh start[root@kafka01 bin]# netstat -antp | egrep '2181|2888|3888'   #确认群集端口在监听

2、主机kafka02配置如下

#修改ID号为2[root@kafka02 ~]# echo 2 > /usr/local/zookeeper/data/myid #启动zookeeper[root@kafka02 ~]# /usr/local/zookeeper/bin/zkServer.sh start

3、主机kafka03配置如下

#修改ID号为3[root@kafka03 ~]# echo 3 > /usr/local/zookeeper/data/myid#启动zookeeper[root@kafka03 ~]# /usr/local/zookeeper/bin/zkServer.sh start

4、查看zookeeper群集内节点的角色

#kafka01上如下:[root@kafka01 conf]# /usr/local/zookeeper/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /usr/local/zookeeper/bin/../conf/zoo.cfgMode: follower        #角色为follower#kafka02上如下:[root@kafka02 ~]# /usr/local/zookeeper/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /usr/local/zookeeper/bin/../conf/zoo.cfgMode: leader      #角色为leader#kafka03上如下:[root@kafka03 ~]# /usr/local/zookeeper/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /usr/local/zookeeper/bin/../conf/zoo.cfgMode: follower         #角色为follower

三、部署kafka集群

1、主机kafka01上配置如下

#解压至指定目录[root@kafka01 src]# tar zxf kafka_2.11-2.2.1.tgz [root@kafka01 src]# mv kafka_2.11-2.2.1 /usr/local/kafka#修改配置文件[root@kafka01 src]# cd /usr/local/kafka/config/[root@kafka01 config]# sed -i 's/broker.id=0/broker.id=1/g' server.properties [root@kafka01 config]# sed -i 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/192.168.20.2:9092/g' server.properties [root@kafka01 config]# sed -i 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/192.168.20.2:9092/g' server.properties[root@kafka01 config]# sed -i 's/log.dirs=\/tmp\/kafka-logs/log.dirs=\/usr\/local\/zookeeper\/data/g' server.properties[root@kafka01 config]# sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect=192.168.20.2:2181,192.168.20.3:2181,192.168.20.4:2181/g' server.properties[root@kafka01 config]# sed -i 's/zookeeper.connection.timeout.ms=6000/zookeeper.connection.timeout.ms=600000/g' server.properties [root@kafka01 config]# egrep -v '^$|^#' server.properties    #修改后的配置文件如下broker.id=1     #kafka的ID号,这里为1,其他节点依次是2、3listeners=PLAINTEXT://192.168.20.2:9092   #节点监听地址,填写每个节点自己的IP地址advertised.listeners=PLAINTEXT://192.168.20.2:9092    #集群中节点内部交流使用的端口,填写每个节点自己的IP地址num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/usr/local/zookeeper/datanum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000#声明链接zookeeper节点的地址zookeeper.connect=192.168.20.2:2181,192.168.20.3:2181,192.168.20.4:2181zookeeper.connection.timeout.ms=600000    #修改这的时间,单位是毫秒,为了防止连接zookeeper超时group.initial.rebalance.delay.ms=0#将修改后的kafka目录发送至其他节点[root@kafka01 config]# scp -r /usr/local/kafka root@192.168.20.3:/usr/local/[root@kafka01 config]# scp -r /usr/local/kafka root@192.168.20.4:/usr/local/#启动kafka[root@kafka01 config]# cd ../bin/[root@kafka01 bin]# ./kafka-server-start.sh ../config/server.properties &

2、主机kafka02配置如下:

#修改与kafka01冲突之处[root@kafka02 ~]# cd /usr/local/kafka/[root@kafka02 kafka]# sed -i 's/192.168.20.2/192.168.20.3/g' config/server.properties[root@kafka02 kafka]# sed -i 's/broker.id=1/broker.id=2/g' config/server.properties#启动kafka服务[root@kafka02 kafka]# cd bin/[root@kafka02 bin]# ./kafka-server-start.sh ../config/server.properties &[root@kafka02 bin]# netstat -anpt | grep 9092   #确定端口在监听

3、主机kafka03配置如下:

#修改kafka配置文件中冲突之处[root@kafka03 ~]#  cd /usr/local/kafka/[root@kafka03 kafka]# sed -i 's/192.168.20.2/192.168.20.4/g' config/server.properties[root@kafka03 kafka]# sed -i 's/broker.id=1/broker.id=3/g' config/server.properties#启动kafka服务[root@kafka03 kafka]# cd bin/[root@kafka03 bin]#  ./kafka-server-start.sh ../config/server.properties &[root@kafka03 bin]# netstat -anpt | grep 9092   #确定端口在监听

4、发布与订阅消息测试

#创建名为my-replicated-topic的topic[root@kafka01 bin]# ./kafka-topics.sh --create --bootstrap-server 192.168.20.2:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic#查看topic的状态和leader[root@kafka01 bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.20.2:9092 --topic my-replicated-topicTopic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:segment.bytes=1073741824    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,1,3 Isr: 2,1,3#返回的信息表示partition数量为1,副本数量为3,segment字节数为1073741824#名称为"my-replicated-topic",ID为2的节点为leader[root@kafka01 bin]# ./kafka-console-producer.sh --broker-list 192.168.20.2:9092 --topic my-replicated-topic#随便写入几行数据>aaaaa>bbbbb>ccccc>ddddd#在其他节点上订阅消息[root@kafka02 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.20.3:9092 --from-beginning --topic my-replicated-topic       ................#省略部分内容aaaaabbbbbcccccddddd

5、模拟leader宕机,查看topic的状态及新的leader

#可以看到当前leader是ID为2的节点[root@kafka01 bin]#  ./kafka-topics.sh --describe --bootstrap-server 192.168.20.2:9092 --topic my-replicated-topicTopic:my-replicated-topic   PartitionCount:1       ReplicationFactor:3  Configs:segment.bytes=1073741824    Topic: my-replicated-topic  Partition: 0   Leader: 2    Replicas: 2,1,3 Isr: 2,1,3#到kafka02主机上停止kafka服务[root@kafka02 bin]# ./kafka-server-stop.sh #再次查看leader是哪个节点?(可以发现leader换成了ID为1的节点)[root@kafka01 bin]#  ./kafka-topics.sh --describe --bootstrap-server 192.168.20.2:9092 --topic my-replicated-topicTopic:my-replicated-topic   PartitionCount:1       ReplicationFactor:3  Configs:segment.bytes=1073741824    Topic: my-replicated-topic  Partition: 0   Leader: 1    Replicas: 2,1,3 Isr: 1,3

-------- 本文至此结束,感谢阅读 --------

0