spark rdd转dataframe 写入mysql的实例讲解
发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍。spark在离线批处理或者
千家信息网最后更新 2024年12月12日spark rdd转dataframe 写入mysql的实例讲解
dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍。spark在离线批处理或者实时计算中都可以将rdd转成dataframe进而通过简单的sql命令对数据进行操作,对于熟悉sql的人来说在转换和过滤过程很方便,甚至可以有更高层次的应用,比如在实时这一块,传入kafka的topic名称和sql语句,后台读取自己配置好的内容字段反射成一个class并利用出入的sql对实时数据进行计算,这种情况下不会spark streaming的人也都可以方便的享受到实时计算带来的好处。
下面的示例为读取本地文件成rdd并隐式转换成dataframe对数据进行查询,最后以追加的形式写入mysql表的过程,scala代码示例如下
import java.sql.Timestampimport org.apache.spark.sql.{SaveMode, SQLContext}import org.apache.spark.{SparkContext, SparkConf}object DataFrameSql { case class memberbase(data_date:Long,memberid:String,createtime:Timestamp,sp:Int)extends Serializable{ override def toString: String="%d\t%s\t%s\t%d".format(data_date,memberid,createtime,sp) } def main(args:Array[String]): Unit ={ val conf = new SparkConf() conf.setMaster("local[2]")// ---------------------- //参数 spark.sql.autoBroadcastJoinThreshold 设置某个表是否应该做broadcast,默认10M,设置为-1表示禁用 //spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果 // spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom //spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩// ---------------------- conf.set("spark.sql.shuffle.partitions","20") //默认partition是200个 conf.setAppName("dataframe test") val sc = new SparkContext(conf) val sqc = new SQLContext(sc) val ac = sc.accumulator(0,"fail nums") val file = sc.textFile("src\\main\\resources\\000000_0") val log = file.map(lines => lines.split(" ")).filter(line => if (line.length != 4) { //做一个简单的过滤 ac.add(1) false } else true) .map(line => memberbase(line(0).toLong, line(1),Timestamp.valueOf(line(2)), line(3).toInt)) // 方法一、利用隐式转换 import sqc.implicits._ val dftemp = log.toDF() // 转换 /* 方法二、利用createDataFrame方法,内部利用反射获取字段及其类型 val dftemp = sqc.createDataFrame(log) */ val df = dftemp.registerTempTable("memberbaseinfo") /*val sqlcommand ="select date_format(createtime,'yyyy-MM')as mm,count(1) as nums " + "from memberbaseinfo group by date_format(createtime,'yyyy-MM') " + "order by nums desc,mm asc "*/ val sqlcommand="select * from memberbaseinfo" val sel = sqc.sql(sqlcommand) val prop = new java.util.Properties prop.setProperty("user","etl") prop.setProperty("password","xxx") // 调用DataFrameWriter将数据写入mysql val dataResult = sqc.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // 表可以不存在 println(ac.name.get+" "+ac.value) sc.stop() }}
上面代码textFile中的示例数据如下,数据来自hive,字段信息分别为 分区号、用户id、注册时间、第三方号
20160309 45386477 2012-06-12 20:13:15 90143820160309 45390977 2012-06-12 22:38:06 90103620160309 45446677 2012-06-14 21:57:39 90143820160309 45464977 2012-06-15 13:42:55 90143820160309 45572377 2012-06-18 14:55:03 90260620160309 45620577 2012-06-20 00:21:09 90260620160309 45628377 2012-06-20 10:48:05 90118120160309 45628877 2012-06-20 11:10:15 90260620160309 45667777 2012-06-21 18:58:34 90252420160309 45680177 2012-06-22 01:49:55 20160309 45687077 2012-06-22 11:23:22 902607
这里注意字段类型映射,即case class类到dataframe映射,从官网的截图如下
更多明细可以查看官方文档 Spark SQL and DataFrame Guide
以上这篇spark rdd转dataframe 写入mysql的实例讲解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
数据
字段
实时
方法
示例
代码
内容
类型
过程
反射
处理
实例
明细
频繁
内存
前提
参数
名称
后台
命令
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
海云安网络安全
学生网络安全事件案例分析
数据库数据关系模型
为什么要使用大二层网络技术
服务器内存条怎么插算一个通道
数据库中表与表的关系
node连接云端服务器
网络安全音译
presto分布式大数据库
sap软件开发需要英文很好吗
数据库教学管理系统结果分析
电力行业重大网络安全事件案例
没有英语基础能学软件开发专业吗
路由器vpn服务器
2018网络安全大会
信息工程计算机网络技术
考研东南大学网络安全
5g网络技术架构论文
网络技术专升本学什么专业
网络安全心得200字
劲舞团代码软件开发
数据库学生表下载
如何进行网络技术提升
市场监测数据库
校园网络安全专题
德乐生软件开发怎么样?
软件开发是什么类目
前端css样式在服务器没有改变
nist语音数据库
基础教育信息管理系统数据库