千家信息网

使用spark分析mysql慢日志

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,熟悉oracle的童鞋都知道,在oracle中,有很多视图记录着sql执行的各项指标,我们可以根据自己的需求编写相应脚本,从oracle中获取sql的性能开销。作为开源数据库,mysql不比oracl
千家信息网最后更新 2025年01月23日使用spark分析mysql慢日志

熟悉oracle的童鞋都知道,在oracle中,有很多视图记录着sql执行的各项指标,我们可以根据自己的需求编写相应脚本,从oracle中获取sql的性能开销。作为开源数据库,mysql不比oracle,分析慢sql只能通过slow.log。slow.log看起来不够直观,而且同一条慢sql执行多次的话就会在slow.log中被记录多次,可阅读性较差。
最近,部门开发的数据库审计平台上线mysql审计模块,需要为客户提供一键化提取slow.log中慢sql的功能。由于本人之前研究过spark,在分析慢日志的文本结构后,使用scala语言,利用spark core相关技术,编写了能够去重slow.log中重复sql,并将按执行时间排序的top sql输入到hive表中的小程序。
话不多说,上菜!

开发环境:
1、CentOS 6.5
2、JDK 1.7
3、Hadoop 2.4.1
4、Hive 0.13
5、Spark 1.5.1
6、scala 2.11.4
hadoop及spark集群环境的搭建方法就不多说了哈,网上资料很多,对大数据感兴趣的童鞋可以尝试搭建。

step 1 使用scala ide for eclipse编写应用程序
analyzeSlowLog.scala:

package cn.spark.study.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport scala.util.matching.Regeximport scala.collection.mutable.ArrayBufferimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.DoubleTypeimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveContextobject SlowLogAnalyze {  def main(args: Array[String]): Unit = {    //创建SparkConf,SparkContext和HiveContext    val conf=new SparkConf()      .setAppName("SlowLogAnalyze");    val sc=new SparkContext(conf)    val hiveContext=new HiveContext(sc)    //读取hdfs文件,获取logRDD    val logRDD=sc.textFile("hdfs://spark1:9000/files/slow.log", 5)    //创建正则表达式,用来过滤slow.log中的无效信息    val pattern1="# Time:".r    val pattern2="# User@Host:".r    val pattern3="SET timestamp=".r     //对logRDD进行filter,过滤无效信息    val filteredLogRDD=logRDD.filter { str =>           //正则返回的是option类型,只有Some和None两种类型          if(pattern1.findFirstIn(str)!=None){            false          }else if(pattern2.findFirstIn(str)!=None){            false          }else if(pattern3.findFirstIn(str)!=None){            false          }else{            true          }         }    /**     * 将filteredLogRDD转换为格式为(execute_time,sql_text)的tuple类RDD KV_RDD     */    //将filteredLogRDD转换为数组    val logArray=filteredLogRDD.toArray()    //定义正则表达式pattern,用于识别Query_timeval pattern="# Query_time:".r     //定义数组KV_Array,用于存放循环映射后的tuple,tuple为(query_time所在行,sql_text)    val KV_Array=ArrayBuffer[(String,String)]()          for (i<-0 until logArray.length){             if(pattern.findFirstIn(logArray(i))!=None){               val key=logArray(i)               var flag=true                var value=""               if(i             val timeSplit=tuple._1.split(" ")             //注意这里是toDouble,不是toInt!!!!因为日志中的时间是Double类型!!!!             (tuple._2,timeSplit(2).toDouble)         }     /**      * 由于慢日志中保存了较多相同sql,需进行去重处理      * 对相同的sql的execute_time取均值,最后输出unique的(sql_text,execute_time)      */     val groupBySqlRDD=sql_time_RDD.groupByKey()         .map{tuple=>             val timeArray=tuple._2.toArray             var totalTime=0.0             for(i<-0 until timeArray.length){               totalTime=totalTime + timeArray(i)             }             val avgTime=totalTime/timeArray.length             (tuple._1,avgTime)         }     val sortedRowRDD=groupBySqlRDD         .map{tuple=>(tuple._2,tuple._1)}         .sortByKey(false, 1)         .map{tuple=>Row(tuple._2,tuple._1)}     val top10Array=sortedRowRDD.take(10)     val top10RDD=sc.parallelize(top10Array, 1)     //将sortedRDD转换为dataframeval structType=new StructType(Array(           StructField("sql_text",StringType,true),           StructField("executed_time",DoubleType,true)           )         )     val top10DF=hiveContext.createDataFrame(top10RDD, structType)      hiveContext.sql("drop table if exists sql_top10")     top10DF.saveAsTable("sql_top10")  }}

将代码打成jar包并上传至linux。
step 2 编写执行脚本
analyzeSlowLog.sh:

/var/software/spark-1.5.1-bin-hadoop2.4/bin/spark-submit \--class cn.spark.study.sql.SlowLogAnalyze \--num-executors 3 \--driver-memory 100m \--executor-memory 100m \--executor-cores 3 \--files /var/software/hive/conf/hive-site.xml \--driver-class-path /var/software/hive/lib/mysql-connector-java-5.1.17.jar \/var/software/spark_study/scala/SlowLogAnalyze.jar

step 3 执行analyzeSlowLog.sh,并进入hive查看分析结果:
hive> show tables;
OK
daily_top3_keywords_uvs
good_students
sql_top10 -- 这张表就是scala程序中定义的表名,程序运行时会在hive中创建
student_infos
student_scores
Time taken: 0.042 seconds, Fetched: 5 row(s)

查看sql_top10中的内容:
这里由于长度限制,截断了sql文本,所以看起来部分sql是一样的,实际是两条不同的sql(where 条件不同)。
hive> select substr(sql_text,1,50),executed_time from sql_top10;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
...
Execution completed successfully
MapredLocal task succeeded
OK
select 'true' as QUERYID, ID_GARAG 0.0252804
select count() from pms_garage_vitri_info 0.0048902
select count(
) from information_schema.PROCESSLIS 3.626E-4
select 'true' as QUERYID, e_survey 2.39E-4
select 'true' as QUERYID, e_survey 2.34E-4
SELECT account_code AS um 2.2360000000000001E-4
select 'true' as QUERYID, e_survey 2.19E-4
select 'true' as QUERYID, e_survey 2.18E-4
select 'true' as QUERYID, e_survey 2.15E-4
SELECT account_code AS um 2.1419999999999998E-4
Time taken: 8.501 seconds, Fetched: 10 row(s)

至此,对mysql slow.log的提取完毕!

关于在mysql中创建相关视图的思考:
hadoop和spark一般用于处理大数据,这里用来处理mysql的慢日志实在是大材小用。不过,要想在mysql中提供查看数据库top sql的v$Topsql视图,对slow.log的实时分析是必须的,此时,spark streaming便可派上用场。
思路如下:
1.编写crontab定时任务以定时拷贝slow.log至hdfs
2.编写crontab定时任务以调用spark streaming程序分析hdfs上的最新slow.log ->通过jdbc将将top sql输出到对应mysql数据库中的v$Topsql视图中,并覆盖之前的数据。
ps:在分析slow.log时,可在程序中executor,timestamp等字段(本文中并未提取这两个字段),以提供更详细的信息。

数据 分析 程序 日志 数据库 视图 信息 正则 类型 处理 不同 相同 任务 字段 数组 文本 时间 环境 童鞋 脚本 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 内蒙古财务管理微服务架构数据库 芒市服务器云存储多少钱 广州市网络安全宣传周活动 哈尔滨银行软件开发工作 如何通过网络查看服务器 怎样快速找到数据库密码 网络安全法确立了谁收集 宁德时代软件开发薪资 服务器管理楼配置 互联网新科技会议 网络安全的股票山东 腾讯领航者网络安全怎么样 软件开发运营开发方向面试 互联网股票是科技股吗 武装部网络安全管理规定 华为高级网络安全工程师薪资待遇 向党说句心里话网络安全 服务器代码错误400 网络运维与网络安全课本 深圳云端互联网科技 征信系统软件开发哪家公司好 控制网络技术的内容 数据库登录提示18456 激光切割控制软件开发 浙江2米服务器机柜 软件开发培训学校学点什么好 企业网络安全漏洞管理之晋级路线 中天科技软件开发 深圳云端互联网科技 政府部门网络安全市场投入
0