SparkStreaming的实现和使用方法
发表于:2025-01-21 作者:千家信息网编辑
千家信息网最后更新 2025年01月21日,这篇文章主要讲解了"SparkStreaming的实现和使用方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"SparkStreaming的实现和使用
千家信息网最后更新 2025年01月21日SparkStreaming的实现和使用方法
这篇文章主要讲解了"SparkStreaming的实现和使用方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"SparkStreaming的实现和使用方法"吧!
一.DStream 整合RDD
1.官网算子
2.使用案例
生产中使用多的是一个文件中有很多域名,另一个中是黑名单,要进行剔除数据一:日志信息 DStream domain,traffic xinlang.com xinlang.com baidu.com数据二:已有的文件 黑名单 RDD domain baidu.com
3.RDD实现上述需求
package sparkstreaming02import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject Demo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Demo1").setMaster("local[2]") val sc = new SparkContext(conf) val input1 = new ListBuffer[(String,Long)] input1.append(("www.xinlang.com", 8888)) input1.append(("www.xinalng.com", 9999)) input1.append(("www.baidu.com", 7777)) val data1 = sc.parallelize(input1) //进行join一定要是key,value形式的 val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = sc.parallelize(input2) data1.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true }).map(x => (x._1,x._2._1)) .collect().foreach(println) }}
4.SparkStreaming实现
package sparkstreaming02import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutable.ListBufferobject Streaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Streaming").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(10)) val lines = ssc.socketTextStream("s201",9999) // 数据二: rdd val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = ssc.sparkContext.parallelize(input2) lines.map(x=>(x.split(",")(0), x)).transform( rdd => { rdd.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true //注意 join之后过滤 }).map(x => (x._1,x._2._1)) } ).print() ssc.start() ssc.awaitTermination() }}
二.SparkStreaming插入外部数据源
1.插入外部数据源用的,但是使用这个有几个坑
、
2.错误一官网例子
3.原因
connect 在Driver端创建,record在executor,发过去序列化错误
4.解决
解决:第一种把connect放到executor端这样弊端是每条记录会生成一个connect太耗费资源 words.foreachRDD { rdd => rdd.foreach { record => val connection = createConnection() // executed at the driver val word = record._1 val count = record._2.toInt val sql = s"insert into wc (wc,count) values($word,$count)" connection.createStatement().execute(sql) }
5.最终解决办法
package sparkstreaming02import java.sql.DriverManagerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}object MysqlStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("MysqlStreaming") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream("s201",9999) val words = lines.flatMap(x => x.split(",")).map((_,1)).reduceByKey(_+_)// words.foreachRDD { rdd =>// val connection = createConnection() // executed at the driver// rdd.foreach { record =>// val word = record._1// val count = record._2// val sql = s"insert into wc (word,count) values($word,$count)"// connection.createStatement().execute(sql)// }// }// words.foreachRDD { rdd =>// rdd.foreach { record =>// val connection = createConnection() // executed at the driver// val word = record._1// val count = record._2.toInt// val sql = s"insert into wc (wc,count) values($word,$count)"// connection.createStatement().execute(sql)// }// } //最终的写法 words.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createConnection() partitionOfRecords.foreach( record =>{ val word = record._1 val count = record._2 val sql = s"insert into wc (wc,count) values('$word',$count)" connection.createStatement().execute(sql)} ) } } ssc.start() ssc.awaitTermination() } def createConnection() = { Class.forName("com.mysql.cj.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://localhost:3306/hive?serverTimezone=UTC&useSSL=false","root","123456") }}
6.出现问题
错误,插入数据库时,你要插入字符串要用''例如:val sql = s"insert into wc (wc,count) values($word,$count)"word是字符串,你要不加双引号就报这个错误正确val sql = s"insert into wc (wc,count) values('$word',$count)"
感谢各位的阅读,以上就是"SparkStreaming的实现和使用方法"的内容了,经过本文的学习后,相信大家对SparkStreaming的实现和使用方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
使用方法
方法
错误
学习
内容
字符
字符串
数据源
文件
问题
黑名单
黑名
例子
信息
写法
办法
原因
域名
就是
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
维护网络安全扎实推进
软件开发案例步骤
西门子编程数据库导入
常州存储服务器散热器加工厂
软件开发行业分析全球
六安市第二人民医院网络安全
三峡大学数据库
因特网网络技术
怎么看自己的原神是哪个服务器的
厦门游戏软件开发公司
电子交易中的网络安全
数据库共享的技术方案
指定ace管理服务器
区块链研究与软件开发
cnrds数据库要钱吗
个人委托公司软件开发
甲骨文数据库建表
mc服务器租用
招标文件服务器最大内存
有关数据库的国外论文全文
dpl数据库中dpl是什么意思
网络安全项目建设安全管理原则
服务器怎么开通第二个用户
游戏服务器 共享内存 利弊
pdb cpm 数据库
数据库连接代码的作用
浦东新区技术软件开发咨询热线
sql改数据库密码代码
多规合一基础数据库构建标准
东莞普金科技互联网有限公司