spark-sql的进阶案例
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,(1)骨灰级案例--UDTF求wordcount数据格式:每一行都是字符串并且以空格分开。代码实现:object SparkSqlTest { def main(args: Array[Stri
千家信息网最后更新 2025年01月31日spark-sql的进阶案例
(1)骨灰级案例--UDTF求wordcount
数据格式:
每一行都是字符串并且以空格分开。
代码实现:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .enableHiveSupport() .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext val wordDF: DataFrame = sqlContext.read.text("C:\\z_data\\test_data\\ip.txt").toDF("line") wordDF.createTempView("lines") val sql= """ |select t1.word,count(1) counts |from ( |select explode(split(line,'\\s+')) word |from lines) t1 |group by t1.word |order by counts """.stripMargin spark.sql(sql).show() }}
结果:
(2)窗口函数求topN
数据格式:
取每门课程中成绩最好的前三
代码实现:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .enableHiveSupport() .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext val topnDF: DataFrame = sqlContext.read.json("C:\\z_data\\test_data\\score.json") topnDF.createTempView("student") val sql= """select |t1.course course, |t1.name name, |t1.score score |from ( |select |course, |name, |score, |row_number() over(partition by course order by score desc ) top |from student) t1 where t1.top<=3 """.stripMargin spark.sql(sql).show() }}
结果:
(3)SparkSQL去处理DataSkew数据倾斜的问题
思路: (使用两阶段的聚合)
- 找到发生数据倾斜的key
- 对发生倾斜的数据的key进行拆分
- 做局部聚合
- 去后缀
- 全局聚合
以上面的wordcount为例,找出相应的数据量比较大的单词
代码实现:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .enableHiveSupport() .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //注册UDF sqlContext.udf.register[String,String,Integer]("add_prefix",add_prefix) sqlContext.udf.register[String,String]("remove_prefix",remove_prefix) //创建sparkContext对象 val sc: SparkContext = spark.sparkContext val lineRDD: RDD[String] = sc.textFile("C:\\z_data\\test_data\\ip.txt") //找出数据倾斜的单词 val wordsRDD: RDD[String] = lineRDD.flatMap(line => { line.split("\\s+") }) val sampleRDD: RDD[String] = wordsRDD.sample(false,0.2) val sortRDD: RDD[(String, Int)] = sampleRDD.map(word=>(word,1)).reduceByKey(_+_).sortBy(kv=>kv._2,false) val hot_word = sortRDD.take(1)(0)._1 val bs: Broadcast[String] = sc.broadcast(hot_word) import spark.implicits._ //将数据倾斜的key打标签 val lineDF: DataFrame = sqlContext.read.text("C:\\z_data\\test_data\\ip.txt") val wordDF: Dataset[String] = lineDF.flatMap(row => { row.getAs[String](0).split("\\s+") }) //有数据倾斜的word val hotDS: Dataset[String] = wordDF.filter(row => { val hot_word = bs.value row.equals(hot_word) }) val hotDF: DataFrame = hotDS.toDF("word") hotDF.createTempView("hot_table") //没有数据倾斜的word val norDS: Dataset[String] = wordDF.filter(row => { val hot_word = bs.value !row.equals(hot_word) }) val norDF: DataFrame = norDS.toDF("word") norDF.createTempView("nor_table") var sql= """ |(select |t3.word, |sum(t3.counts) counts |from (select |remove_prefix(t2.newword) word, |t2.counts |from (select |t1.newword newword, |count(1) counts |from |(select |add_prefix(word,3) newword |from hot_table) t1 |group by t1.newword) t2) t3 |group by t3.word) |union |(select | word, | count(1) counts |from nor_table |group by word) """.stripMargin spark.sql(sql).show() } //自定义UDF加前缀 def add_prefix(word:String,range:Integer): String ={ val random=new Random() random.nextInt(range)+"_"+word } //自定义UDF去除后缀 def remove_prefix(word:String): String ={ word.substring(word.indexOf("_")+1) }}
结果:
数据
对象
入口
日志
结果
编程
代码
单词
后缀
格式
案例
一行
全局
函数
前缀
字符
字符串
局部
思路
成绩
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
计算机网络技术要不要学数学
戴尔刀片服务器远程管理卡
中化互联网科技
网络安全最新资讯任子行
淮安付霸互联网科技有限公司
光明新零售系统软件开发
php数据库图片
飞机信息数据库
禁毒网络安全手抄报大全
数据库查询树形结构
管家婆a8 数据库分区
大唐保镖服务器怎么样
我的世界多人服务器怎么输密码
发射台网络安全事故通报制度
聊城智慧社区软件开发系统
计算机网络技术职业发展环境
实时数据库的数据源
ctf网络安全大赛条件
网络安全先进集体申报表
我的世界贝爷服务器分享
电脑软件开发行业发展趋势
pubg中服务器维护什么意思
数据库规划
思科网络技术学院报名
鸿特互联网科技有限公司
网络安全工作会议小结
网站服务器监控软件
学习手机软件开发需要多久
手机微信网络安全保密测试题答案
把数据库连接到内存卡