千家信息网

怎么让Kafka达到最佳吞吐量

发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,本篇内容介绍了"怎么让Kafka达到最佳吞吐量"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!上手使用
千家信息网最后更新 2024年12月13日怎么让Kafka达到最佳吞吐量

本篇内容介绍了"怎么让Kafka达到最佳吞吐量"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

上手使用

func main() {  // 1. 初始化        pusher := kq.NewPusher([]string{                "127.0.0.1:19092",                "127.0.0.1:19092",                "127.0.0.1:19092",        }, "kq")        ticker := time.NewTicker(time.Millisecond)        for round := 0; round < 3; round++ {                select {                case <-ticker.C:                        count := rand.Intn(100)                        m := message{                                Key:     strconv.FormatInt(time.Now().UnixNano(), 10),                                Value:   fmt.Sprintf("%d,%d", round, count),                                Payload: fmt.Sprintf("%d,%d", round, count),                        }                        body, err := json.Marshal(m)                        if err != nil {                                log.Fatal(err)                        }                        fmt.Println(string(body))      // 2. 写入                        if err := pusher.Push(string(body)); err != nil {                                log.Fatal(err)                        }                }        }}

kafka cluster 配置以及 topic 传入,你就得到一个操作 kafkapush operator

至于写入消息,简单的调用 pusher.Push(msg) 就行。是的,就这么简单!

> 当然,目前只支持单个 msg 写入。可能有人会疑惑,那就继续往下看,为什么只能一条一条写入?

初始化

一起看看 pusher 初始化哪些步骤:

NewPusher(clusterAddrs, topic, opts...)        |- kafka.NewWriter(kfConfig)                                                               // 与 kf 之前的连接        |- executor = executors.NewChunkExecutor()  // 设置内部写入的executor为字节数定量写入
  1. 建立与 kafka cluster 的连接。此处肯定就要传入 kafka config

  2. 设置内部暂存区的写入函数以及刷新规则。

使用 chunkExecutor 作用不言而喻:将随机写 -> 批量写,减少 I/O 消耗;同时保证单次写入不能超过默认的 1M 或者自己设定的最大写入字节数。

其实再往 chunkExecutor 内部看,其实每次触发插入有两个指标:

  • maxChunkSize:单次最大写入字节数

  • flushInterval:刷新暂存消息插入的间隔时间

在触发写入,只要满足任意一个指标都会执行写入。同时在 executors 都有设置插入间隔时间,以防暂存区写入阻塞而暂存区内消息一直不被刷新清空。

> 更多关于 executors 可以参看以下:https://zeromicro.github.io/go-zero/executors.html

生产者插入

根据上述初始化对 executors 介绍,插入过程中也少不了它的配合:

func (p *Pusher) Push(v string) error {  // 1. 将 msg -> kafka 内部的 Message        msg := kafka.Message{                Key:   []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),                Value: []byte(v),        }    // 使用 executor.Add() 插入内部的 container  // 当 executor 初始化失败或者是内部发生错误,也会将 Message 直接插入 kafka        if p.executor != nil {                return p.executor.Add(msg, len(v))        } else {                return p.produer.WriteMessages(context.Background(), msg)        }}

过程其实很简单。那 executors.Add(msg, len(msg)) 是怎么把 msg 插入到 kafka 呢?

插入的逻辑其实在初始化中就声明了:

pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {                chunk := make([]kafka.Message, len(tasks))      // 1                for i := range tasks {                        chunk[i] = tasks[i].(kafka.Message)                }      // 2                if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {                        logx.Error(err)                }        }, newOptions(opts)...)
  1. 触发插入时,将暂存区中存储的 []msg 依次拿出,作为最终插入消息集合;

  2. 将上一步的消息集合,作为一个批次插入 kafkatopic

这样 pusher -> chunkExecutor -> kafka 一个链路就出现了。

"怎么让Kafka达到最佳吞吐量"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

消息 存区 字节 更多 过程 吞吐量 吞吐 最大 内容 同时 指标 时间 知识 实用 不言而喻 学有所成 少不了 接下来 两个 作用 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 鼓楼区工商软件开发创新服务 阿里云数据库主备切换 什么是互联网金融科技公司 守护网络安全构建和谐家园手抄报 淘宝上的数据库怎么做营销 唯一索引数据库设置 服务器硬盘取下安装上开机报警 岑溪租房软件开发 榆林网络技术价目表 人力怎么招软件开发岗位 36岁学网络技术 软件开发程序员简历 网络安全培训班出来一般工资多少 红桥区企业网络技术创造辉煌 服务器行业属于什么产业 数据库的详细设计及查询 石家庄软件开发就找驰宇网络 韩国免备案服务器 腾讯的服务器一天多少钱 oracle没有导入数据库 鼓楼区工商软件开发创新服务 上海 cn2 服务器 软件数据库在哪里打开 数据库基础期末作品 c 数据库操作类 战地5自建服务器怎么进人 数据库字典怎么下载 计算机网络技术用人标准 深圳财务软件开发公司电话 朝阳网络安全大队电话号码
0