第97课:Spark Streaming 结合Spark SQL 案例
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,代码如下:package com.dt.spark.streamingimport org.apache.spark.sql.SQLContextimport org.apache.spark.{Sp
千家信息网最后更新 2025年01月24日第97课:Spark Streaming 结合Spark SQL 案例
代码如下:
package com.dt.spark.streamingimport org.apache.spark.sql.SQLContextimport org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.streaming.{StreamingContext, Duration}/** * 使用SparkStreaming结合SparkSQL对日志进行分析。 * 假设电商网站点击日志格式(简化)如下: * userid,itemId,clickTime * 需求:处理10分钟内item点击次数排序Top10,并且将商品名称显示出来。商品itemId与商品名称的对应关系存放在MySQL数据库中 * Created by dinglq on 2016/5/4. */object LogAnalyzerStreamingSQL { val WINDOW_LENGTH = new Duration(600 * 1000) val SLIDE_INTERVAL = new Duration(10 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local[4]") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ //从数据库中加载itemInfo表 val itemInfoDF = sqlContext.read.format("jdbc").options(Map( "url"-> "jdbc:mysql://spark-master:3306/spark", "driver"->"com.mysql.jdbc.Driver", "dbtable"->"iteminfo", "user"->"root", "password"-> "vincent" )).load() itemInfoDF.registerTempTable("itemInfo") val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming") val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs => { if (accessLogs.isEmpty()) { println("No logs received in this time interval") } else { accessLogs.toDF().registerTempTable("accessLogs") val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " + " (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " + " ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 " val topTenClickItemLast10Minus = sqlContext.sql(sqlStr) // Persist top ten table for this window to HDFS as parquet file topTenClickItemLast10Minus.show() } }) streamingContext.start() streamingContext.awaitTermination() }}case class AccessLog(userId: String, itemId: String, clickTime: String) {}object AccessLog { def parseLogLine(log: String): AccessLog = { val logInfo = log.split(",") if (logInfo.length == 3) { AccessLog(logInfo(0),logInfo(1), logInfo(2)) } else { AccessLog("0","0","0") } }}
MySQL中表的内容如下:
mysql> select * from spark.iteminfo;+--------+----------+| itemid | itemname |+--------+----------+| 001 | phone || 002 | computer || 003 | TV |+--------+----------+3 rows in set (0.00 sec)
在D创建目录logs_incoming
运行Spark Streaming 程序。
新建文件,内容如下:
0001,001,2016-05-04 22:10:200002,001,2016-05-04 22:10:210003,001,2016-05-04 22:10:220004,002,2016-05-04 22:10:230005,002,2016-05-04 22:10:240006,001,2016-05-04 22:10:250007,002,2016-05-04 22:10:260008,001,2016-05-04 22:10:270009,003,2016-05-04 22:10:280010,003,2016-05-04 22:10:290011,001,2016-05-04 22:10:300012,003,2016-05-04 22:10:310013,003,2016-05-04 22:10:32
将文件保存到目录logs_incoming 中,观察Spark程序的输出:
+------+--------+---+|itemid|itemname|cnt|+------+--------+---+| 001| phone| 6|| 003| TV| 4|| 002|computer| 3|+------+--------+---+
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
商品
数据
内容
名称
数据库
文件
日志
目录
程序
中表
代码
公众
备注
大数
实战
工厂
格式
次数
网站
需求
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
sql数据库怎么分割文件
北京科技互联网
黄维广洲游爱网络技术有限公司
数据上传服务器
网络安全的具体表现
如何提升个人网络安全意识
公司网络安全管理协议
网络安全管理 汇报材料
语音聊天的网络技术是什么
电子商务网络技术实践
瞬变电磁软件开发
个人收支数据库管理系统er
东软集团软件开发待遇怎么样
泗洪直销网络技术诚信合作
电脑对服务器的远程访问
app汽车软件开发多少钱
软件开发需要用到什么教材资料
mpv版本 软件开发
桃源软件开发职高
天地劫手游服务器获取失败
服务医药客户的网络安全股票
我的世界宝可梦多人服务器下载
mc服务器什么用处
靖江网络安全宣传语
数据库表查询子链接
软交换数据库备份
数据库为什么标识符无效
数据库加锁速度排序
泉州金石盾网络技术
数据库的类型和特点