千家信息网

如何从零开始搭建Kafka+SpringBoot分布式消息系统

发表于:2024-11-13 作者:千家信息网编辑
千家信息网最后更新 2024年11月13日,这期内容当中小编将会给大家带来有关如何从零开始搭建Kafka+SpringBoot分布式消息系统,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。前言由于kafka强依
千家信息网最后更新 2024年11月13日如何从零开始搭建Kafka+SpringBoot分布式消息系统

这期内容当中小编将会给大家带来有关如何从零开始搭建Kafka+SpringBoot分布式消息系统,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

前言

由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境。 (ps:默认您的centos系统可联网,本教程就不教配置ip什么的了) (ps2:没有wget的先装一下:yum install wget) (ps3:人啊,就是要条理。东边放一点,西边放一点,过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local目录下) (ps4:kafka可能有内置zookeeper,感觉可以越过zookeeper教程,但是这里也配置出来了。我没试过)

一、配置jdk

因为oracle 公司不允许直接通过wget 下载官网上的jdk包。所以你直接wget以下地址下载下来的是一个只有5k的网页文件而已,并不是需要的jdk包。(垄断地位就是任性)。 (请通过java -version判断是否自带jdk,我的没带)

1、官网下载

下面是jdk8的官方下载地址:

https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html

2、上传解压

这里通过xftp上传到服务器指定位置:/usr/local

运行命令使环境生效

source /etc/profile

等待下载完成之后解压:

tar -zxvf zookeeper-3.4.6.tar.gz

从零开始搭建Kafka+SpringBoot分布式消息系统

重命名为zookeeper1

mv zookeeper-3.4.6 zookeeper1cp -r zookeeper1 zookeeper2cp -r zookeeper1 zookeeper3

2、创建data、logs文件夹

在zookeeper1目录下创建

在data目录下新建myid文件。内容为1

3、修改zoo.cfg文件

cd /usr/local/zookeeper/zookeeper1/conf/cp zoo_sample.cfg zoo.cfg

进行过上面两步之后,有zoo.cfg文件了,现在修改内容为:

从零开始搭建Kafka+SpringBoot分布式消息系统

dataDir=/usr/local/zookeeper/zookeeper1/datadataLogDir=/usr/local/zookeeper/zookeeper1/logsserver.1=192.168.233.11:2888:3888server.2=192.168.233.11:2889:3889server.3=192.168.233.11:2890:3890

4、搭建zookeeper2

首先,复制改名。

cd /usr/local/zookeeper/cp -r zookeeper1 zookeeper2

然后修改具体的某些配置:

vim zookeeper2/conf/zoo.cfg

将下图三个地方1改成2

vim zookeeper2/data/myid

同时将myid中的值改成2

vim zookeeper3/conf/zoo.cfg

修改为3

6、测试zookeeper集群

cd /usr/local/zookeeper/zookeeper1/bin/

由于启动所需代码比较多,这里简单写了一个启动脚本:

vim start

start的内容如下

cd /usr/local/zookeeper/zookeeper1/bin/./zkServer.sh start ../conf/zoo.cfgcd /usr/local/zookeeper/zookeeper2/bin/./zkServer.sh start ../conf/zoo.cfgcd /usr/local/zookeeper/zookeeper3/bin/./zkServer.sh start ../conf/zoo.cfg

下面是连接脚本:

vim login

login内容如下:

./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183

脚本编写完成,接下来启动:

sh startsh login

启动集群成功,如下图:

三、搭建kafka集群

1、下载kafka

首先创建kafka目录:

mkdir /usr/local/kafka

然后在该目录下载

cd /usr/local/kafka/wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

下载成功之后解压:

tar -zxvf kafka_2.11-1.1.0.tgz

2、修改集群配置

首先进入conf目录下:

cd /usr/local/kafka/kafka_2.11-1.1.0/config

修改server.properties 修改内容:

broker.id=0log.dirs=/tmp/kafka-logslisteners=PLAINTEXT://192.168.233.11:9092

复制两份server.properties

cp server.properties server2.propertiescp server.properties server3.properties

修改server2.properties

vim server2.properties

修改主要内容为:

broker.id=1log.dirs=/tmp/kafka-logs1listeners=PLAINTEXT://192.168.233.11:9093

如上,修改server3.properties 修改内容为:

broker.id=2log.dirs=/tmp/kafka-logs2listeners=PLAINTEXT://192.168.233.11:9094

3、启动kafka

这里还是在bin目录编写一个脚本:

cd ../bin/vim start

脚本内容为:

./kafka-server-start.sh ../config/server.properties &./kafka-server-start.sh ../config/server2.properties &./kafka-server-start.sh ../config/server3.properties &

通过jps命令可以查看到,共启动了3个kafka。

从零开始搭建Kafka+SpringBoot分布式消息系统

4、创建Topic

cd /usr/local/kafka/kafka_2.11-1.1.0bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

kafka打印了几条日志

查看kafka状态

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

6、启动消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

可以看出,启动消费者之后就会自动消费。

消费者自动捕获成功!

不满足的话启动springboot的时候会抛异常的!!!ps:该走的岔路我都走了o(╥﹏╥)o (我的kafka-clients是1.1.0,spring-kafka是2.2.2,中间那列暂时不用管)

从零开始搭建Kafka+SpringBoot分布式消息系统

回归正题,搞了两个小时,终于搞好了,想哭… 遇到的问题基本就是jar版本不匹配。 上面的步骤我也都会相应的去修改,争取大家按照本教程一遍过!!!

1、pom文件

    4.0.0            org.springframework.boot        spring-boot-starter-parent        2.1.1.RELEASE                 com.gzky    study    0.0.1-SNAPSHOT    study    Demo project for Spring Boot            1.8                            org.springframework.boot            spring-boot-starter-web                            org.springframework.boot            spring-boot-starter-test            test                                                org.junit.vintage                    junit-vintage-engine                                                        org.springframework.boot            spring-boot-starter-redis            1.3.8.RELEASE                            redis.clients            jedis                                    org.springframework.kafka            spring-kafka            2.2.0.RELEASE                                    org.apache.kafka            kafka-clients                                                    org.springframework.boot                spring-boot-maven-plugin                        

pom文件中,重点是下面这两个版本。

       org.springframework.boot       spring-boot-starter-parent       2.1.1.RELEASE                 org.springframework.kafka      spring-kafka      2.2.0.RELEASE

2、application.yml

spring:  redis:    cluster:      #设置key的生存时间,当key过期时,它会被自动删除;      expire-seconds: 120      #设置命令的执行时间,如果超过这个时间,则报错;      command-timeout: 5000      #设置redis集群的节点信息,其中namenode为域名解析,通过解析域名来获取相应的地址;      nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006  kafka:    # 指定kafka 代理地址,可以多个    bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094    producer:      retries: 0      # 每次批量发送消息的数量      batch-size: 16384      buffer-memory: 33554432      # 指定消息key和消息体的编解码方式      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      # 指定默认消费者group id      group-id: test-group      auto-offset-reset: earliest      enable-auto-commit: true      auto-commit-interval: 100      # 指定消息key和消息体的编解码方式      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializerserver:  port: 8085  servlet:    #context-path: /redis    context-path: /kafka

没有配置Redis的可以把Redis部分删掉,也就是下图: 想学习配置Redis集群的可以参考:《Redis集群redis-cluster的搭建及集成springboot》

3、生产者

package com.gzky.study.utils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;/** * kafka生产者工具类 * * @author biws * @date 2019/12/17 **/@Componentpublic class KfkaProducer {    private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);    @Autowired    private KafkaTemplate kafkaTemplate;    /**     * 生产数据     * @param str 具体数据     */    public void send(String str) {        logger.info("生产数据:">  4、消费者package com.gzky.study.utils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/** * kafka消费者监听消息 * * @author biws * @date 2019/12/17 **/@Componentpublic class KafkaConsumerListener {    private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);    @KafkaListener(topics = "testTopic")    public void onMessage(String str){        //insert(str);//这里为插入数据库代码        logger.info("监听到:" + str);        System.out.println("监听到:" + str);    }}  5、对外接口package com.gzky.study.controller;import com.gzky.study.utils.KfkaProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;/** * kafka对外接口 * * @author biws * @date 2019/12/17 **/@RestControllerpublic class KafkaController {    @Autowired    KfkaProducer kfkaProducer;    /**     * 生产消息     * @param str     * @return     */    @RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET)    @ResponseBody    public boolean sendTopic(@RequestParam String str){        kfkaProducer.send(str);        return true;    }}  6、postman测试这里首先应该在服务器启动监听器(kafka根目录),下面命令必须是具体的服务器ip,不能是localhost,是我踩过的坑:推荐此处重启一下集群 关闭kafka命令:cd /usr/local/kafka/kafka_2.11-1.1.0/bin./kafka-server-stop.sh ../config/server.properties &./kafka-server-stop.sh ../config/server2.properties &./kafka-server-stop.sh ../config/server3.properties & 此处应该jps看一下,等待所有的kafka都关闭(关不掉的kill掉),再重新启动kafka:./kafka-server-start.sh ../config/server.properties &./kafka-server-start.sh ../config/server2.properties &./kafka-server-start.sh ../config/server3.properties & 等待kafka启动成功后,启动消费者监听端口:cd /usr/local/kafka/kafka_2.11-1.1.0bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic  曾经我乱输的测试信息全部被监听过来了!启动springboot服务 然后用postman生产消息: 然后享受成果,服务器端监听成功。 项目中也监听成功!

上述就是小编为大家分享的如何从零开始搭建Kafka+SpringBoot分布式消息系统了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

消息 内容 集群 消费 文件 消费者 目录 配置 成功 生产 监听 命令 脚本 服务 地址 就是 教程 数据 时间 服务器 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 宁夏企业党建软件开发专业制作 网络安全应急响应机制研究 超微服务器开机卡在b9 广州数智软件开发有限公司 panabit 云服务器 车站售票软件开发 ios开发搭建服务器搭建 芜湖通信软件开发要多少钱 静安区个人软件开发管理制度 腾讯穿越火线怎么连接不到服务器 网络安全记心中画画 网络安全和信息化杂志发表论文 中国储存服务器市场格局 关系型数据库知乎 饥荒联机版加服务器mod变卡 网络安全公司的股东 绝地求生哪个区服务器好 表格中怎么选中一列数据库 虞城软件开发有限公司 配电网 网络安全态势感知 信息管理系统软件开发合同 jsp 数据库添加数据 软件开发行业做账科目定义 沧州商城软件开发 兴化网络技术诚信合作 前端界面如何调用数据库 达梦数据库还原时读取文件失败 大学国家网络安全宣传周加学分吗 mysql数据库建立视图 软件开发中地图设计
0