python是怎么实现skywalking的trace模块过滤和报警
发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,python是怎么实现skywalking的trace模块过滤和报警,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。skywalkin
千家信息网最后更新 2025年01月19日python是怎么实现skywalking的trace模块过滤和报警
')def send_mail(receiver): """ 发送报错接口邮件 :return: """ server = "mail.test.com" sender = "sa@test.com" sender_pwd = "1qaz@WSX" send_addr = "sa@test.com" receiver = receiver with open("mail.html","r",encoding="utf-8") as f: content = f.read() if re.search("",content) == None: print("无报错接口!",content) return 0 print("邮件前",content) msg_mail = MIMEText(content,"html","utf-8") msg_mail["Subject"] = "Skywalking报错接口统计" msg_mail["From"] = sender msg_mail["To"] = receiver server_obj = smtplib.SMTP_SSL(server) server_obj.connect(server,465) server_obj.login(sender,sender_pwd) server_obj.sendmail(send_addr,receiver,msg_mail.as_string())if __name__ == "__main__": # 设定查询时间间隔,默认900s(15min) end_time = time.time() start_time = end_time - 900 start_time=time.strftime("%Y-%m-%d %H%M",time.localtime(start_time)) end_time = time.strftime("%Y-%m-%d %H%M", time.localtime(end_time)) print(start_time) print(end_time) sw_url = "http://172.16.53.232:9412/graphql" # skywalking的前端服务的地址和端口 per_page_size = 5000 #指定一次获取endpoint接口的数目 trace_detail_addr = "127.0.0.1:5000" #指定查询指定trace_id详细日志 receiver = "shy@test.com" #报警邮件接收人地址 trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr) send_mail(receiver) # interface_filter() # interface_content_filter("3c4212dd2dd548d394ba312c4619405d.104.16390380592724487")
python是怎么实现skywalking的trace模块过滤和报警,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
skywalking本身的报警功能,用起来视乎不是特别好用,目前想实现对skywalking的trace中的错误接口进行过滤并报警通知管理员和开发。所以自己就用python对skywalking做了二次数据清洗实现。项目方在了自己了github(https://github.com/shygit-dev/skywalking-cli-python)上了,有兴趣的同学可以做二次改造,共同学习。下面简单列出了代码内容:
sw-trace.py
#!/usr/bin/env python# _*_ coding: utf-8 _*_# Tile:# Author:shyimport requestsimport timeimport smtplibfrom email.mime.text import MIMETextimport redef interface_content_filter(trace_id): ''' 对详细日志内容(业务逻辑报错)进行过滤 :param trace_id: :return: 【1|0】 ''' url = "http://172.16.53.232:50001/query" params = { "trace_id": trace_id } detail_trace_id_log = requests.request(method="GET",url=url,params=params) detail_trace_id_log = detail_trace_id_log.text print(detail_trace_id_log) print(type(detail_trace_id_log)) with open("blackname_keyword_list","r",encoding="utf-8") as f: for line in f: print(line) result = re.search(line.strip(),detail_trace_id_log) print(result) if result != None: print("哥们匹配到日志黑名单关键字了:%s" % line) return 0 print("提示:%s不在关键字黑名单中" % trace_id) return 1def interface_filter(endpointName): """ 设置接口黑名单 :param endpointName: :return: 【1|0】 """ endpointName = re.sub("\(|\)",".",endpointName) with open("blackname_list","r",encoding="utf-8") as f: bn_list = f.read() match_result = re.search(endpointName.strip(),bn_list) if match_result == None: print("提示:接口不存在黑名单中") return 1 print("提示:接口在黑名单中") return 0def trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr): """ skywalking trace功能对错误接口进行过滤,默认最大一次获取2000条数据,每分钟执行一次 :param start_time: :param end_time: :return: """ url = sw_url data = { "query": "query queryTraces($condition: TraceQueryCondition) {\n data: queryBasicTraces(condition: $condition) {\n traces {\n key: segmentId\n endpointNames\n duration\n start\n isError\n traceIds\n }\n total\n }}", "variables": { "condition": { "queryDuration": { "start": start_time, #"2021-12-07 1734" "end": end_time, "step": "MINUTE" }, "traceState": "ERROR", "paging": { "pageNum": 1, "pageSize": per_page_size, "needTotal": "true" }, "queryOrder": "BY_START_TIME" # "traceId": "b669d0069be84fce82261901de412e7c.430.16388637511348105" } } } result = requests.request(method="post",url=url,json=data) i = 0 # print(result.content) # print(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:]))))) with open("mail.html","w",encoding="utf-8") as f: f.write('Title 最近15分钟统计:
持续时长 | 接口名称 | 追踪ID | |
---|---|---|---|
%s | %s | %s | %s |
sw-trace-id.py
#!/usr/bin/env python# _*_ coding: utf-8 _*_# Tile:# Author:shyimport requestsimport timefrom flask import Flask,requestapp = Flask(__name__)@app.route("/query",methods=["get"])def trace_id_query(): """ 查询指定trace_id详细日志信息 :return: f.read() """ trace_id = request.args.get("trace_id") url="http://172.16.53.232:9412/graphql" # url="http://skywalking.roulw.com/graphql" data = { "query": "query queryTrace($traceId: ID!) {\n trace: queryTrace(traceId: $traceId) {\n spans {\n traceId\n segmentId\n spanId\n parentSpanId\n refs {\n traceId\n parentSegmentId\n parentSpanId\n type\n }\n serviceCode\n serviceInstanceName\n startTime\n endTime\n endpointName\n type\n peer\n component\n isError\n layer\n tags {\n key\n value\n }\n logs {\n time\n data {\n key\n value\n }\n }\n }\n }\n }", "variables": { "traceId": trace_id } } result = requests.request(method="post",url=url,json=data) with open("detail_log", "w", encoding="utf-8") as f: f.write("生产Skywalking报错接口跟踪日志日志:") for trace_id in result.json()["data"]["trace"]["spans"]: if trace_id["isError"]: # print(trace_id) print("服务名称:%s\n" % trace_id["serviceCode"], "开始时间:%s\n" % trace_id["startTime"], "接口名称:%s\n" % trace_id["endpointName"], "peer名称:%s\n" % trace_id["peer"], "tags名称:%s\n" % trace_id["tags"], "详细日志:%s" % trace_id["logs"]) content = "服务名称:%s
开始时间:%s
接口名称:%s
peer名称:%s
tags名称:%s" % (trace_id["serviceCode"],trace_id["startTime"],trace_id["endpointName"],trace_id["peer"],trace_id["tags"]) with open("detail_log","a",encoding="utf-8") as f: f.write(content) f.write("
********详细日志**********
") for logs in trace_id["logs"]: for log in logs["data"]: if log["key"] == "message": print(log["value"]) with open("detail_log","a",encoding="utf-8") as f: f.write(log["value"]) # return log["value"] elif log["key"] == "stack": print(log["value"]) with open("detail_log","a",encoding="utf-8") as f: f.write(log["value"]) with open("detail_log", "a", encoding="utf-8") as f: f.write("") with open("detail_log","r",encoding="utf-8") as f: return f.read()if __name__ == "__main__": # trace_id = "14447ae7199c40a2b9862411daba180b.2142.16388920322367785" # trace_id_query(trace_id) app.run()
========下一个接口信息=========
关于python是怎么实现skywalking的trace模块过滤和报警问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
接口
utf-8
名称
黑名单
黑名
日志
报警
时间
关键
关键字
功能
内容
哥们
邮件
问题
提示
服务
查询
模块
信息
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
云服务器能装win10镜像吗
数据库压缩失败
sql数据库写代码找不到u盘
论文数据库都有啥
wow数据库科多兽
网络安全攻防论坛
网络安全防护需要两大能力
怎么确认数据库有没有这张表
电信网络安全工程师好找工作吗
伟通讯联网络技术有限公司
如何找回qq群未审核数据库
前端服务器s6和c6区别
海南游爱网络技术有限公司官网
数据库如何区分几个范式
软件开发赴日
公司与服务器管理
浙江优火网络技术有限公司
数据库暂时没有数据哦
为什么主题商店显示服务器正在忙
海南上天下网络技术有限公司
数据库工具正式完整版
联想服务器的管理页面
腾讯云自动删除数据库
c 数据库语句是
数据库怎么删除单行
信息安全及网络技术报告
数据库系统和数据库是一个东西吗
wind万得数据库学生版
立体推进网络安全工作
武汉众乐商通网络技术靠谱吗