千家信息网

如何使用消息队列

发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,本篇内容介绍了"如何使用消息队列"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1 概述1.1 基本概
千家信息网最后更新 2025年01月19日如何使用消息队列

本篇内容介绍了"如何使用消息队列"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1 概述

1.1 基本概念
1.1.1 Broker 代理

已发布的消息保存在一组服务器中,称为Kafka集群。集群中的每个服务器都是一个Broker。

1.1.2 Topic 主题

通过Topic机制对消息进行分类,可以认为每个Topic就是一个队列。

1.1.3 Partition 分区

每个Topic可以有多个分区,主要为了提高并发而设计。相同Topic下不同Partition可以并发接收消息,同时也能供消费者并发拉取消息。有多少Partition就有多少并发量。

在Kafka服务器上,分区是以文件目录的形式存在的。每个分区目录中,Kafka会按配置大小及配置周期将分区拆分成多个段文件(LogSegment),每个段由三部分组成:

- 日志文件:*.log- 位移索引文件:*.index- 时间索引文件:*.timeindex

其中*.log用于存储消息本身的数据内容,*.index存储消息在文件中的位置(包括消息的逻辑offset和物理存储offset),*.timeindex存储消息创建时间和对应逻辑地址的映射关系。

将分区拆分成多个段是为了控制存储文件大小。可以很方便的通过操作系统mmap机制映射到内存中,提高写入和读取效率。同时还有一个好处就是,当系统要清除过期数据时,可以直接将过期的段文件删除。

如果每个消息都要在index中保存位置信息,index文件自身大小也很容易变的很大。所以Kafka将index设计为稀疏索引来减小index文件的大小。

1.1.4 Replication 副本

消息冗余数量。不能超过集群中Broker的数量。

1.2 基本操作
1.2.1 Topic相关
# 创建Topic # --topic 主题名称 避免使用[_]及[.]号# --replication-factor 副本数量(不能超过broker节点数)# --partitions 分区数量(并发)./bin/kafka-topics.sh --create \--topic UserDataQueue \--replication-factor 3 \--partitions 5 \--bootstrap-server localhost:9092,localhost:9093,localhost:9094# 查看Topic./bin/kafka-topics.sh --list \--bootstrap-server localhost:9092,localhost:9093,localhost:9094# 修改Topic# 删除Topic
1.2.2 Message相关
# 发送消息# --topic 指定目标Topic./bin/kafka-console-producer.sh \--topic UserDataQueue \--bootstrap-server localhost:9092,localhost:9093,localhost:9094# 拉取消息# --from-beginning 从头开始(获取现有的全量数据)./bin/kafka-console-consumer.sh \--topic UserDataQueue \--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \--from-beginning

2 集群配置

Kafka集群依赖于Zookeeper。

2.1 Zookeeper配置及启动
# 需要修改的参数# the directory where the snapshot is stored.dataDir=/kafka/zkdata# the port at which the clients will connectclientPort=2182
# 启动./bin/zookeeper-server-start.sh -daemon /kafka/zookeeper.properties
2.2 Kafka配置及启动
# 需修改参数# The id of the broker. This must be set to a unique integer for each broker.broker.id=1  # 同一集群内ID必须唯一# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured.#   FORMAT:#     listeners = listener_name://host_name:port#   EXAMPLE:#     listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://localhost:9092  # 同一主机的话,端口号不能相同# A comma separated list of directories under which to store log fileslog.dirs=/kafka/data01  # 日志存储目录,需做隔离# Zookeeper connection string (see zookeeper docs for details).# This is a comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.zookeeper.connect=localhost:2182  # Zookeeper连接地址,参见2.1 zk配置
# Kafka启动# broker-1./bin/kafka-server-start.sh -daemon /kafka/server01.properties# broker-2./bin/kafka-server-start.sh -daemon /kafka/server02.properties# broker-3./bin/kafka-server-start.sh -daemon /kafka/server03.properties
2.3 Zookeeper可视化

PrettyZoo 是一个基于 Apache Curator 和 JavaFX 实现的 Zookeeper 图形化管理客户端。

由下图可以看到,集群3个Broker均正常启动。

2.4 Kafka可视化及监控
2.4.1 AKHQ

管理Topic,Topic消息,消费组等的Kafka可视化系统,相关文档:https://akhq.io/

2.4.2 Kafka Eagle

一个简单且高效的监控系统。相关文档:http://www.kafka-eagle.org/index.html

Kafka Eagle 自带监控大屏。

3 与Spring Boot集成

Spring Boot版本:2.4.4。

官方示例:https://github.com/spring-projects/spring-kafka/tree/main/samples

3.1 Spring Boot
3.1.1 添加依赖
implementation 'org.springframework.kafka:spring-kafka'
3.1.2 配置文件
spring:  kafka:    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094    producer:      client-id: kfk-demo      retries: 3
3.1.3 消息发送
@RestControllerpublic class IndexController {    @Autowired    KafkaTemplate kafkaTemplate;    @GetMapping    public String index() {        int rdm = new Random().nextInt(1000);        kafkaTemplate.send("UserDataQueue", new UserData("", rdm));        return "hello world";    }    @GetMapping("index2")    public String index2() {                // 发送字符串方式        kafkaTemplate.send("UserDataTopic", new Gson().toJson(new UserData("apple", 23)));        return "ok";    }}
3.1.4 消息接收
@Component@KafkaListener(        id = "kfk-demo-userdata",        topics = {"UserDataQueue"},        groupId = "kfk-demo-group",        clientIdPrefix = "kfk-demo-client")public class KfkListener {    @KafkaHandler    public void process(@Payload UserData ud,                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {        System.out.println(String.format("topic: %s, partition: %d, userData: %s", topic, partition, ud));    }    @KafkaHandler(isDefault = true)    public void process(Object obj) {        System.out.println(obj);    }}// 接收字符串方式@Slf4j@Component@KafkaListener(id = "kfk-demo2", topics = {"UserDataTopic"})public class KfkUserDataTopicListener {    @KafkaHandler    public void process(String userDataStr) {        UserData userData = new Gson().fromJson(userDataStr, UserData.class);        log.info("username: {}, age: {}", userData.getUsername(), userData.getAge());    }}
3.1.5 Topic自动创建
@Configurationpublic class KafkaConfig {    @Bean    public NewTopic userDataTopic() {        return new NewTopic("UserDataTopic", 3, (short) 1);    }}

"如何使用消息队列"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0