千家信息网

Golang怎么监听日志文件并发送到kafka中

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要讲解了"Golang怎么监听日志文件并发送到kafka中",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Golang怎么监听日志文件并发送到
千家信息网最后更新 2025年01月23日Golang怎么监听日志文件并发送到kafka中

这篇文章主要讲解了"Golang怎么监听日志文件并发送到kafka中",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Golang怎么监听日志文件并发送到kafka中"吧!

涉及的golang库和可视化工具:

go-ini,sarama,tail其中:

  • go-ini:用于读取配置文件,统一管理配置项,有利于后其的维护

  • sarama:是一个go操作kafka的客户端。目前我用于向kefka发送消息

  • tail:类似于linux的tail命令了,读取文件的后几行。如果文件有追加数据,会检测到。就是通过它来监听日志文件

可视化工具:

offsetexplorer:是kafka的可视化工具,这里用来查看消息是否投递成功

工作的流程

  • 加载配置,初始化saramakafka

  • 起一个的协程,利用tail不断去监听日志文件的变化。

  • 主协程中一直阻塞等待tail发送消息,两者通过一个管道通讯。一旦主协程接收到新日志,组装格式,然后发送到kafka中

环境准备

环境的话,确保zookeeperkafka正常运行。因为还没有使用sarama读取数据,使用offsetexplorer来查看任务是否真的投递成功了。

代码分层

serve来存放写tail服务类和sarama服务类,conf存放ini配置文件

main函数为程序入口

关键的代码

main.go

main函数做的有:构建配置结构体,映射配置文件。调用和初始化tail,srama服务。

package mainimport (        "fmt"        "sarama/serve"        "github.com/go-ini/ini")type KafkaConfig struct {        Address     string `ini:"address"`        ChannelSize int    `ini:"chan_size"`}type TailConfig struct {        Path     string `ini:"path"`        Filename string `ini:"fileName"`        // 如果是结构体,则指明分区名        Children `ini:"tailfile.children"`}type Config struct {        KafkaConfig `ini:"kafka"`        TailConfig  `ini:"tailfile"`}type Children struct {        Name string `ini:"name"`}func main() {        // 加载配置        var cfg = new(Config)        err := ini.MapTo(cfg, "./conf/go-conf.ini")        if err != nil {                fmt.Print(err)        }        // 初始化kafka        ks := &serve.KafukaServe{}        // 启动kafka消息监听。异步        ks.InitKafka([]string{cfg.KafkaConfig.Address}, int64(cfg.KafkaConfig.ChannelSize))        // 关闭主协程时,关闭channel        defer ks.Destruct()        // 初始化tail        ts := &serve.TailServe{}        ts.TailInit(cfg.TailConfig.Path + "/" + cfg.TailConfig.Filename)        // 阻塞        ts.Listener(ks.MsgChan)}

kafka.go

有3个方法 :

  • InitKafka,组装配置项以及初始化接收消息的管道,

  • Listener,监听管道消息,收到消息后,将消息组装,发送到kafka

  • Destruct, 关闭管道

package serveimport (        "fmt"        "github.com/Shopify/sarama")type KafukaServe struct {        MsgChan chan string        //err         error}func (ks *KafukaServe) InitKafka(addr []string, chanSize int64) {        // 读取配置        config := sarama.NewConfig()        // 1. 初始化生产者配置        config.Producer.RequiredAcks = sarama.WaitForAll        // 选择分区        config.Producer.Partitioner = sarama.NewRandomPartitioner        // 成功交付的信息        config.Producer.Return.Successes = true        ks.MsgChan = make(chan string, chanSize)        go ks.Listener(addr, chanSize, config)}func (ks *KafukaServe) Listener(addr []string, chanSize int64, config *sarama.Config) {        //  连接kafka        var kafkaClient, _ = sarama.NewSyncProducer(addr, config)        defer kafkaClient.Close()        for {                select {                case content := <-ks.MsgChan:                        //                        msg := &sarama.ProducerMessage{                                Topic: "weblog",                                Value: sarama.StringEncoder(content),                        }                        partition, offset, err := kafkaClient.SendMessage(msg)                        if err != nil {                                fmt.Println(err)                        }                        fmt.Println("分区,偏移量:")                        fmt.Println(partition, offset)                        fmt.Println("___")                }        }}func (ks *KafukaServe) Destruct() {        close(ks.MsgChan)}

tail.go

主要包括了两个方法:

  • TailInit初始化,组装tail配置。Listener

  • Listener,保存kafka服务类初始化之后的管道。监听日志文件,如果有新日志,就往管道里发送

package serveimport (        "fmt"        "github.com/hpcloud/tail")type TailServe struct {        tails *tail.Tail}func (ts *TailServe) TailInit(filenName string) {        config := tail.Config{                ReOpen:    true,                Follow:    true,                Location:  &tail.SeekInfo{Offset: 0, Whence: 2},                MustExist: false,                Poll:      true,        }        // 打开文件开始读取数据        ts.tails, _ = tail.TailFile(filenName, config)        // if err != nil {        //         fmt.Println("tails %s failed,err:%v\n", filenName, err)        //         return nil, err        // }        fmt.Println("启动," + filenName + "监听")}func (ts *TailServe) Listener(MsgChan chan string) {        for {                msg, ok := <-ts.tails.Lines                if !ok {                        // todo                        fmt.Println("数据接收失败")                        return                }                fmt.Println(msg.Text)                MsgChan <- msg.Text        }}// 测试案例func Demo() {        filename := `E:\xx.log`        config := tail.Config{                ReOpen:    true,                Follow:    true,                Location:  &tail.SeekInfo{Offset: 0, Whence: 2},                MustExist: false,                Poll:      true,        }        // 打开文件开始读取数据        tails, err := tail.TailFile(filename, config)        if err != nil {                fmt.Println("tails %s failed,err:%v\n", filename, err)                return        }        var (                msg *tail.Line                ok  bool        )        fmt.Println("启动")        for {                msg, ok = <-tails.Lines                if !ok {                        fmt.Println("tails file close reopen,filename:$s\n", tails.Filename)                }                fmt.Println("msg:", msg.Text)        }}

感谢各位的阅读,以上就是"Golang怎么监听日志文件并发送到kafka中"的内容了,经过本文的学习后,相信大家对Golang怎么监听日志文件并发送到kafka中这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0