mysql慢查询和错误日志分析
发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,mysql慢查询和错误日志分析和告警查看比较麻烦,目前的慢查询告警都是仅仅反应慢查询数量的。我们做了一个慢查询日志告警和分析的程序后台使用filebeat日志文件托运工具,将日志传输到redis数据库
千家信息网最后更新 2025年01月22日mysql慢查询和错误日志分析mysql慢查询和错误日志分析和告警查看比较麻烦,目前的慢查询告警都是仅仅反应慢查询数量的。
我们做了一个慢查询日志告警和分析的程序
后台使用filebeat日志文件托运工具,将日志传输到redis数据库。filebeat默认使用es。定时器1分钟执行一次。
vi /etc/filebeat/filebeat.yml
filebeat.prospectors: paths: - /data/mysql/xxx/tmp/slow.log document_type: syslog fields: app: mysql_slowlog port: xxx ip: xxxx scan_frequency: 30s tail_files: true multiline.pattern: '^\#\ Time' multiline.negate: true multiline.match: after app: mysql_slowlog output.redis: enabled: true hosts: ["IP:port"] port: 2402 key: filebeat keys: - key: "%{[fields.app]}" mapping: "mysql_slowlog": "mysql_slowlog" "mysql_errorlog": "mysql_errorlog" db: 0 datatype: list logging.to_files: true
在监控端读取redis数据,并通过正则处理到mysql数据库。
vi /data/mysql_slowLog.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import json
import pymysql
import re
import time
import threading # redis connect info
redisHost = 'xxx'
redisPort = 2402
redisDB = '0'
redisKey = 'mysql_slowlog' # mysql connect info
mysqlHost = 'xxx'
mysqlPort = 2001
# mysqlPort = 23306
mysqlUser = ''
mysqlPasswd = ''
# mysqlPasswd = 'open'
mysqlDB = ''
mysqlTablePrefix = 'mysql_slowlog_'
collectStep = 60
def time_log():
return '[' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ']'
def gather_log(redisConn):
data_list = []
logList = []
keyState = redisConn.exists(redisKey)
if keyState:
logLen = redisConn.llen(redisKey)
if logLen > 0:
redisKeyNew = redisKey + '-bak'
redisConn.renamenx(redisKey, redisKeyNew)
logList = redisConn.lrange(redisKeyNew,0,logLen)
redisConn.delete(redisKeyNew)
else:
pass
else:
pass if len(logList) > 0:
for item in logList:
data_dict = {}
slowLogJson = json.loads(item)
#print(slowLogJson['message'])
data_dict['hostname'] = slowLogJson['beat']['hostname']
#print(slowLogJson['beat']['hostname'])
data_dict['ip'] = slowLogJson['fields']['ip']
#print(slowLogJson['fields']['ip'])
data_dict['port'] = slowLogJson['fields']['port']
#print(slowLogJson['fields']['port'])
logContent = slowLogJson['message'] #Regex
timeRe = r'# Time: (.*)\n# User@Host:'
userRe = r'# User@Host:.*\[(.*?)\]\s+@ '
hostRe = r'# User@Host: .*\[(.*?)\] Id:'
schemaRe = r'# Schema:\s+(.*?)\s+Last_errno:'
queryRe = r'# Query_time:\s+(.*?)\s+Lock_time:'
locklRe = r'# Query_time:.*?Lock_time:\s+(.*?)\s+Rows_sent:'
rowsRe = r'# Query_time:.*?Lock_time:.*?Rows_sent:\s+(\d+)\s+Rows_examined:'
bytesRe = r'# Bytes_sent:\s+(\d+)'
timestampRe = r'SET\s+timestamp=(.*?);'
commandRe = r'SET\s+timestamp=.*?;\n(.*?)(?=$)'
if re.findall(timeRe, logContent):
data_dict['sys_time'] = u'20' + re.findall(timeRe, logContent)[0]
data_dict['sys_time'] = data_dict['sys_time'][:4] + '-' + data_dict['sys_time'][4:6] + '-' + data_dict['sys_time'][6:]
data_dict['cli_user'] = re.findall(userRe, logContent)[0]
data_dict['cli_ip'] = re.findall(hostRe,logContent)[0]
data_dict['schema'] = re.findall(schemaRe,logContent)[0]
data_dict['query_time'] = re.findall(queryRe,logContent)[0]
data_dict['lock_time'] = re.findall(locklRe,logContent)[0]
data_dict['rows_sent'] = re.findall(rowsRe,logContent)[0]
data_dict['bytes_sent'] = re.findall(bytesRe,logContent)[0]
data_dict['timestamp'] = re.findall(timestampRe,logContent)[0]
data_dict['command'] = re.findall(commandRe,logContent,re.M)[0]
data_list.append(data_dict)
else:
pass
#print('Not slowlog data')
else:
pass
#print('No data')
return data_list def send_data(data,mysql_pool):
mysqlTableDate = time.strftime('%Y%m', time.localtime(time.time()))
mysqlTable = mysqlTablePrefix + mysqlTableDate
cursor = mysql_pool.cursor()
data_list = []
createTableSql = "create table mysql_slowlog_000000 (`id` int(11) NOT NULL AUTO_INCREMENT," \
"hostname varchar(64) NOT NULL," \
"ip varchar(20) NOT NULL," \
"port int(11) NOT NULL," \
"sys_time datetime NOT NULL," \
"cli_user varchar(32) NOT NULL," \
"cli_ip varchar(32) NOT NULL," \
"`schema` varchar(32) NOT NULL," \
"query_time float(6,3) NOT NULL," \
"lock_time float(6,3) NOT NULL," \
"rows_sent int(11) NOT NULL," \
"bytes_sent int(11) NOT NULL," \
"`timestamp` varchar(40) NOT NULL," \
"command varchar(2048) DEFAULT NULL," \
"PRIMARY KEY (`id`)," \
"KEY `idx_slowlog_000000_user` (`cli_user`)," \
"KEY `idx_slowlog_000000_query_time` (`query_time`)," \
"KEY `idx_slowlog_000000_timestamp` (`timestamp`)) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8"
createTableSql = createTableSql.replace('000000',mysqlTableDate) # Create slow log table if not exist
try:
cursor.execute("show tables like '%s'" % mysqlTable)
res = cursor.fetchone()
if not res:
cursor.execute(createTableSql)
mysql_pool.commit()
except Exception as e:
print(time_log() +'Error:', e)
mysql_pool.rollback()
mysql_pool.close()
slowLogInsertSql ="insert into %s" % mysqlTable + "(hostname," \
"ip," \
"port," \
"sys_time," \
"cli_user," \
"cli_ip," \
"`schema`," \
"query_time," \
"lock_time," \
"rows_sent," \
"bytes_sent," \
"`timestamp`," \
"command) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
if len(data) > 0:
for item in data:
row = (item['hostname'].encode('utf-8'),
item['ip'].encode('utf-8'),
item['port'],
item['sys_time'].encode('utf-8'),
item['cli_user'].encode('utf-8'),
item['cli_ip'].encode('utf-8'),
item['schema'].encode('utf-8'),
item['query_time'].encode('utf-8'),
item['lock_time'].encode('utf-8'),
item['rows_sent'].encode('utf-8'),
item['bytes_sent'].encode('utf-8'),
item['timestamp'].encode('utf-8'),
pymysql.escape_string(item['command']).encode('utf-8'))
data_list.append(row)
print(len(data_list)) # Insert slow log data
try:
cursor.executemany(slowLogInsertSql , data_list)
mysql_pool.commit()
mysql_pool.close()
except Exception as e:
print(time_log() +'Error:',e)
mysql_pool.rollback()
mysql_pool.close()
else:
print(time_log() + 'No data')
def main():
try:
redis_pool = redis.ConnectionPool(host=redisHost, port=redisPort, db=redisDB)
redisConn= redis.Redis(connection_pool=redis_pool)
except:
print(time_log() + 'Error! Can not connect to redis!') try:
mysql_pool = pymysql.connect(host=mysqlHost, port=mysqlPort, user=mysqlUser, password=mysqlPasswd, db=mysqlDB)
except:
print(time_log() + 'Error! Can not connect to mysql!')
print(time_log())
data = gather_log(redisConn)
send_data(data,mysql_pool)
print(time_log()) # time scheduler
timeSchedule = collectStep
global timer
timer = threading.Timer(timeSchedule, main)
timer.start() if __name__ == '__main__':
timer = threading.Timer(1, main)
timer.start()
前端使用django展示慢查询数据,同时每周通过将响应的业务慢查询数据发送给开发人员。
mysql错误日志也是同样进行处理。
我们做了一个慢查询日志告警和分析的程序
后台使用filebeat日志文件托运工具,将日志传输到redis数据库。filebeat默认使用es。定时器1分钟执行一次。
vi /etc/filebeat/filebeat.yml
filebeat.prospectors: paths: - /data/mysql/xxx/tmp/slow.log document_type: syslog fields: app: mysql_slowlog port: xxx ip: xxxx scan_frequency: 30s tail_files: true multiline.pattern: '^\#\ Time' multiline.negate: true multiline.match: after app: mysql_slowlog output.redis: enabled: true hosts: ["IP:port"] port: 2402 key: filebeat keys: - key: "%{[fields.app]}" mapping: "mysql_slowlog": "mysql_slowlog" "mysql_errorlog": "mysql_errorlog" db: 0 datatype: list logging.to_files: true
在监控端读取redis数据,并通过正则处理到mysql数据库。
vi /data/mysql_slowLog.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import json
import pymysql
import re
import time
import threading # redis connect info
redisHost = 'xxx'
redisPort = 2402
redisDB = '0'
redisKey = 'mysql_slowlog' # mysql connect info
mysqlHost = 'xxx'
mysqlPort = 2001
# mysqlPort = 23306
mysqlUser = ''
mysqlPasswd = ''
# mysqlPasswd = 'open'
mysqlDB = ''
mysqlTablePrefix = 'mysql_slowlog_'
collectStep = 60
def time_log():
return '[' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ']'
def gather_log(redisConn):
data_list = []
logList = []
keyState = redisConn.exists(redisKey)
if keyState:
logLen = redisConn.llen(redisKey)
if logLen > 0:
redisKeyNew = redisKey + '-bak'
redisConn.renamenx(redisKey, redisKeyNew)
logList = redisConn.lrange(redisKeyNew,0,logLen)
redisConn.delete(redisKeyNew)
else:
pass
else:
pass if len(logList) > 0:
for item in logList:
data_dict = {}
slowLogJson = json.loads(item)
#print(slowLogJson['message'])
data_dict['hostname'] = slowLogJson['beat']['hostname']
#print(slowLogJson['beat']['hostname'])
data_dict['ip'] = slowLogJson['fields']['ip']
#print(slowLogJson['fields']['ip'])
data_dict['port'] = slowLogJson['fields']['port']
#print(slowLogJson['fields']['port'])
logContent = slowLogJson['message'] #Regex
timeRe = r'# Time: (.*)\n# User@Host:'
userRe = r'# User@Host:.*\[(.*?)\]\s+@ '
hostRe = r'# User@Host: .*\[(.*?)\] Id:'
schemaRe = r'# Schema:\s+(.*?)\s+Last_errno:'
queryRe = r'# Query_time:\s+(.*?)\s+Lock_time:'
locklRe = r'# Query_time:.*?Lock_time:\s+(.*?)\s+Rows_sent:'
rowsRe = r'# Query_time:.*?Lock_time:.*?Rows_sent:\s+(\d+)\s+Rows_examined:'
bytesRe = r'# Bytes_sent:\s+(\d+)'
timestampRe = r'SET\s+timestamp=(.*?);'
commandRe = r'SET\s+timestamp=.*?;\n(.*?)(?=$)'
if re.findall(timeRe, logContent):
data_dict['sys_time'] = u'20' + re.findall(timeRe, logContent)[0]
data_dict['sys_time'] = data_dict['sys_time'][:4] + '-' + data_dict['sys_time'][4:6] + '-' + data_dict['sys_time'][6:]
data_dict['cli_user'] = re.findall(userRe, logContent)[0]
data_dict['cli_ip'] = re.findall(hostRe,logContent)[0]
data_dict['schema'] = re.findall(schemaRe,logContent)[0]
data_dict['query_time'] = re.findall(queryRe,logContent)[0]
data_dict['lock_time'] = re.findall(locklRe,logContent)[0]
data_dict['rows_sent'] = re.findall(rowsRe,logContent)[0]
data_dict['bytes_sent'] = re.findall(bytesRe,logContent)[0]
data_dict['timestamp'] = re.findall(timestampRe,logContent)[0]
data_dict['command'] = re.findall(commandRe,logContent,re.M)[0]
data_list.append(data_dict)
else:
pass
#print('Not slowlog data')
else:
pass
#print('No data')
return data_list def send_data(data,mysql_pool):
mysqlTableDate = time.strftime('%Y%m', time.localtime(time.time()))
mysqlTable = mysqlTablePrefix + mysqlTableDate
cursor = mysql_pool.cursor()
data_list = []
createTableSql = "create table mysql_slowlog_000000 (`id` int(11) NOT NULL AUTO_INCREMENT," \
"hostname varchar(64) NOT NULL," \
"ip varchar(20) NOT NULL," \
"port int(11) NOT NULL," \
"sys_time datetime NOT NULL," \
"cli_user varchar(32) NOT NULL," \
"cli_ip varchar(32) NOT NULL," \
"`schema` varchar(32) NOT NULL," \
"query_time float(6,3) NOT NULL," \
"lock_time float(6,3) NOT NULL," \
"rows_sent int(11) NOT NULL," \
"bytes_sent int(11) NOT NULL," \
"`timestamp` varchar(40) NOT NULL," \
"command varchar(2048) DEFAULT NULL," \
"PRIMARY KEY (`id`)," \
"KEY `idx_slowlog_000000_user` (`cli_user`)," \
"KEY `idx_slowlog_000000_query_time` (`query_time`)," \
"KEY `idx_slowlog_000000_timestamp` (`timestamp`)) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8"
createTableSql = createTableSql.replace('000000',mysqlTableDate) # Create slow log table if not exist
try:
cursor.execute("show tables like '%s'" % mysqlTable)
res = cursor.fetchone()
if not res:
cursor.execute(createTableSql)
mysql_pool.commit()
except Exception as e:
print(time_log() +'Error:', e)
mysql_pool.rollback()
mysql_pool.close()
slowLogInsertSql ="insert into %s" % mysqlTable + "(hostname," \
"ip," \
"port," \
"sys_time," \
"cli_user," \
"cli_ip," \
"`schema`," \
"query_time," \
"lock_time," \
"rows_sent," \
"bytes_sent," \
"`timestamp`," \
"command) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
if len(data) > 0:
for item in data:
row = (item['hostname'].encode('utf-8'),
item['ip'].encode('utf-8'),
item['port'],
item['sys_time'].encode('utf-8'),
item['cli_user'].encode('utf-8'),
item['cli_ip'].encode('utf-8'),
item['schema'].encode('utf-8'),
item['query_time'].encode('utf-8'),
item['lock_time'].encode('utf-8'),
item['rows_sent'].encode('utf-8'),
item['bytes_sent'].encode('utf-8'),
item['timestamp'].encode('utf-8'),
pymysql.escape_string(item['command']).encode('utf-8'))
data_list.append(row)
print(len(data_list)) # Insert slow log data
try:
cursor.executemany(slowLogInsertSql , data_list)
mysql_pool.commit()
mysql_pool.close()
except Exception as e:
print(time_log() +'Error:',e)
mysql_pool.rollback()
mysql_pool.close()
else:
print(time_log() + 'No data')
def main():
try:
redis_pool = redis.ConnectionPool(host=redisHost, port=redisPort, db=redisDB)
redisConn= redis.Redis(connection_pool=redis_pool)
except:
print(time_log() + 'Error! Can not connect to redis!') try:
mysql_pool = pymysql.connect(host=mysqlHost, port=mysqlPort, user=mysqlUser, password=mysqlPasswd, db=mysqlDB)
except:
print(time_log() + 'Error! Can not connect to mysql!')
print(time_log())
data = gather_log(redisConn)
send_data(data,mysql_pool)
print(time_log()) # time scheduler
timeSchedule = collectStep
global timer
timer = threading.Timer(timeSchedule, main)
timer.start() if __name__ == '__main__':
timer = threading.Timer(1, main)
timer.start()
前端使用django展示慢查询数据,同时每周通过将响应的业务慢查询数据发送给开发人员。
mysql错误日志也是同样进行处理。
查询
日志
数据
错误
分析
数据库
处理
业务
人员
前端
同时
后台
定时器
工具
数量
文件
正则
程序
麻烦
utf-8
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
深圳市时速达网络技术有限公司
黄山市文旅局网络安全培训
d2p2数据库
数据库的安全性是基于
数据库 连接表
网络安全工程师北上广工资
网络安全等级保护顶级指南
十五派网络安全教程下载
网络安全前辈给的建议心得
跳板服务器哪里租
北京智能广播系统服务器
服务器安全管理比赛
网络安全手抄报二年级作文
网易版有什么著名服务器
数据库解码集
cmds数据库2019
传统关系型数据库已被淘汰
网络安全法自那天起施行
服务器是服务的提供方吗
加强网络安全防护教育
sql数据库安装后找不到右键
提高员工的网络安全意识
软件开发工程师工资调查
金山区智能网络技术使用方法
十五派网络安全教程下载
民族药物资源数据库
软件开发收费国家规范
网络安全可以围绕几点讲
网络安全法自那天起施行
福州网络安全就业