千家信息网

第97课:Spark Streaming 结合Spark SQL 案例

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,代码如下:package com.dt.spark.streamingimport org.apache.spark.sql.SQLContextimport org.apache.spark.{Sp
千家信息网最后更新 2025年01月24日第97课:Spark Streaming 结合Spark SQL 案例

代码如下:

package com.dt.spark.streamingimport org.apache.spark.sql.SQLContextimport org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.streaming.{StreamingContext, Duration}/** * 使用SparkStreaming结合SparkSQL对日志进行分析。 * 假设电商网站点击日志格式(简化)如下: * userid,itemId,clickTime * 需求:处理10分钟内item点击次数排序Top10,并且将商品名称显示出来。商品itemId与商品名称的对应关系存放在MySQL数据库中 * Created by dinglq on 2016/5/4. */object LogAnalyzerStreamingSQL {  val WINDOW_LENGTH = new Duration(600 * 1000)  val SLIDE_INTERVAL = new Duration(10 * 1000)  def main(args: Array[String]) {    val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local[4]")    val sc = new SparkContext(sparkConf)    val sqlContext = new SQLContext(sc)    import sqlContext.implicits._    //从数据库中加载itemInfo表    val itemInfoDF = sqlContext.read.format("jdbc").options(Map(      "url"-> "jdbc:mysql://spark-master:3306/spark",      "driver"->"com.mysql.jdbc.Driver",      "dbtable"->"iteminfo",      "user"->"root",      "password"-> "vincent"      )).load()    itemInfoDF.registerTempTable("itemInfo")    val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)    val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming")    val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache()    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)    windowDStream.foreachRDD(accessLogs => {      if (accessLogs.isEmpty()) {        println("No logs received in this time interval")      } else {        accessLogs.toDF().registerTempTable("accessLogs")        val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " +          " (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " +          " ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 "        val topTenClickItemLast10Minus = sqlContext.sql(sqlStr)        // Persist top ten table for this window to HDFS as parquet file        topTenClickItemLast10Minus.show()      }    })    streamingContext.start()    streamingContext.awaitTermination()  }}case class AccessLog(userId: String, itemId: String, clickTime: String) {}object AccessLog {  def parseLogLine(log: String): AccessLog = {    val logInfo = log.split(",")    if (logInfo.length == 3) {      AccessLog(logInfo(0),logInfo(1), logInfo(2))    }    else {      AccessLog("0","0","0")    }  }}


MySQL中表的内容如下:

mysql> select * from spark.iteminfo;+--------+----------+| itemid | itemname |+--------+----------+| 001    | phone    || 002    | computer || 003    | TV       |+--------+----------+3 rows in set (0.00 sec)


在D创建目录logs_incoming


运行Spark Streaming 程序。


新建文件,内容如下:

0001,001,2016-05-04 22:10:200002,001,2016-05-04 22:10:210003,001,2016-05-04 22:10:220004,002,2016-05-04 22:10:230005,002,2016-05-04 22:10:240006,001,2016-05-04 22:10:250007,002,2016-05-04 22:10:260008,001,2016-05-04 22:10:270009,003,2016-05-04 22:10:280010,003,2016-05-04 22:10:290011,001,2016-05-04 22:10:300012,003,2016-05-04 22:10:310013,003,2016-05-04 22:10:32

将文件保存到目录logs_incoming 中,观察Spark程序的输出:

+------+--------+---+|itemid|itemname|cnt|+------+--------+---+|   001|   phone|  6||   003|      TV|  4||   002|computer|  3|+------+--------+---+



备注:

1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


0