千家信息网

如何接入异步任务及使用log

发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,这篇文章主要讲解了"如何接入异步任务及使用log",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何接入异步任务及使用log"吧!Delay Job日常
千家信息网最后更新 2025年01月19日如何接入异步任务及使用log

这篇文章主要讲解了"如何接入异步任务及使用log",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何接入异步任务及使用log"吧!

Delay Job

日常任务开放中,我们会有很多异步、批量、定时、延迟任务要处理,go-zero中有 go-queue,推荐使用 go-queue 去处理,go-queue 本身也是基于 go-zero 开发的,其本身是有两种模式:

  • dq : 依赖于 beanstalkd ,分布式,可存储,延迟、定时设置,关机重启可以重新执行,消息会丢失,使用非常简单,go-queue中使用了redis setnx保证了每个消息只被消费一次,使用场景主要是用来做日常任务使用

  • kq:依赖于 kafka ,这个就不多介绍啦,大名鼎鼎的 kafka ,使用场景主要是做日志用

我们主要说一下dq,kq使用也一样的,只是依赖底层不同,如果没使用过beanstalkd,没接触过beanstalkd的可以先google一下,使用起来还是挺容易的。

我在jobs下使用goctl新建了一个message-job.api服务

info(        title: //消息任务        desc: // 消息任务        author: "Mikael"        email: "13247629622@163.com")type BatchSendMessageReq {}type BatchSendMessageResp {}service message-job-api {        @handler batchSendMessageHandler // 批量发送短信        post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)}

因为不需要使用路由,所以handler下的routes.go被我删除了,在handler下新建了一个jobRun.go,内容如下:

package handlerimport (        "fishtwo/lib/xgo"        "fishtwo/app/jobs/message/internal/svc")/*** @Description 启动job* @Author Mikael* @Date 2021/1/18 12:05* @Version 1.0**/func JobRun(serverCtx *svc.ServiceContext)  {        xgo.Go(func() {                batchSendMessageHandler(serverCtx)    //...many job        })}

其实xgo.Go就是 go batchSendMessageHandler(serverCtx) ,封装了一下go携程,防止野生goroutine panic

然后修改一下启动文件message-job.go

package mainimport (   "flag"   "fmt"   "fishtwo/app/jobs/message/internal/config"   "fishtwo/app/jobs/message/internal/handler"   "fishtwo/app/jobs/message/internal/svc"   "github.com/tal-tech/go-zero/core/conf"   "github.com/tal-tech/go-zero/rest")var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file")func main() {   flag.Parse()   var c config.Config   conf.MustLoad(*configFile, &c)   ctx := svc.NewServiceContext(c)   server := rest.MustNewServer(c.RestConf)   defer server.Stop()   handler.JobRun(ctx)   fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)   server.Start()}

主要是handler.RegisterHandlers(server, ctx) 修改为handler.JobRun(ctx)

接下来,我们就可以引入dq了,首先在etc/xxx.yaml下添加dqConf

.....DqConf:  Beanstalks:    - Endpoint: 127.0.0.1:7771      Tube: tube1    - Endpoint: 127.0.0.1:7772      Tube: tube2  Redis:    Host: 127.0.0.1:6379    Type: node

我这里本地用不同端口,模拟开了2个节点,7771、7772

在internal/config/config.go添加配置解析对象

type Config struct {        ....        DqConf dq.DqConf}

修改handler/batchsendmessagehandler.go

package handlerimport (        "context"        "fishtwo/app/jobs/message/internal/logic"        "fishtwo/app/jobs/message/internal/svc"        "github.com/tal-tech/go-zero/core/logx")func batchSendMessageHandler(ctx *svc.ServiceContext){        rootCxt:= context.Background()        l := logic.NewBatchSendMessageLogic(context.Background(), ctx)        err := l.BatchSendMessage()        if err != nil{                logx.WithContext(rootCxt).Error("【JOB-ERR】 : %+v ",err)        }}

修改logic下batchsendmessagelogic.go,写我们的consumer消费逻辑

package logicimport (   "context"   "fishtwo/app/jobs/message/internal/svc"   "fmt"   "github.com/tal-tech/go-zero/core/logx")type BatchSendMessageLogic struct {   logx.Logger   ctx    context.Context   svcCtx *svc.ServiceContext}func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {   return BatchSendMessageLogic{         Logger: logx.WithContext(ctx),         ctx:    ctx,         svcCtx: svcCtx,   }}func (l *BatchSendMessageLogic) BatchSendMessage() error {   fmt.Println("job BatchSendMessage start")   l.svcCtx.Consumer.Consume(func(body []byte) {         fmt.Printf("job BatchSendMessage %s \n" + string(body))   })   fmt.Printf("job BatchSendMessage finish \n")   return nil}

这样就大功告成了,启动message-job.go就ok课

go run message-job.go

之后我们就可以在业务代码中向dq添加任务,它就可以自动消费了

producer.Delay 向dq中投递5个延迟任务:

      producer := dq.NewProducer([]dq.Beanstalk{                {                        Endpoint: "localhost:7771",                        Tube:     "tube1",                },                {                        Endpoint: "localhost:7772",                        Tube:     "tube2",                },        })        for i := 1000; i < 1005; i++ {                _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)                if err != nil {                        fmt.Println(err)                }        }

producer.At 可以指定某个时间执行,非常好用,感兴趣的朋友自己可以研究下。

错误日志

在前面说到gateway改造时候,如果眼神好的童鞋,在上面的httpresult.go中已经看到了log的身影:

我们在来看下rpc中怎么处理的

是的,我在每个rpc启动的main中加入了grpc拦截器 https://www.yuque.com/tal-tech/go-zero/ttzlo1,那让我们看看grpc拦截器里面做了什么

然后我代码里面使用github/pkg/errors这个包去处理错误的,这个包还是很好用的

所以呢:

我们在 grpc 中打印日志 logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v",err)

api 中打印日志 logx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】 : %+v ",err)

go-zero 中打印日志,使用logx.WithContext会把trace-id带入,这样一个请求下来,比如

user-api --> user-srv --> message-srv

那如果 messsage-srv 出错,他们三个是同一个 trace-id ,是不是就可以在elk通过输入这个trace-id一次性搜索出来这条请求报错堆栈信息呢?当然你也可以接入 jaeger、zipkin、skywalking 等,这个我暂时还没接入。

感谢各位的阅读,以上就是"如何接入异步任务及使用log"的内容了,经过本文的学习后,相信大家对如何接入异步任务及使用log这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

任务 接入 日志 消息 处理 内容 学习 延迟 消费 不同 代码 场景 就是 还是 错误 好用 拦截器 研究 大功告成 大名鼎鼎 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 有哪些新的软件开发框架 济南考勤一卡通软件开发 网络安全工程师找工作看文凭吗 网络安全27条 服务器管理员账号忘记密码 加强网络安全防范通知 消防网络安全讲座 开展中小学生网络安全宣传周 2021网络安全大会在哪里举行 网易互娱软件开发怎么样 天津特色软件开发市场报价 广州市网络安全竞赛 违法犯罪数据库留指纹吗 DNF数据库技术支持 软件开发公司装修效果图 光纤一般接在服务器什么口上 国庆网络安全保卫工作情况 关于网络安全宣传周小提示 怎么攻击私人服务器 戴尔服务器如何玩 如何了解服务器 网络安全培训方案供应商 河北星系互联网科技有限公司 北理工数据库试题 服务器与路由器连接方法 场站网络安全防护预案 邯郸全国软件开发培训班 网络安全法 培训 邀请 网络安全法》第四章确立了 中级软件开发工程师认证证书
0