千家信息网

Spark RDD转换成DataFrame的两种方式

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,Spark SQL支持两种方式将现有RDD转换为DataFrame。第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame。这种基于反射方法十分简便,但是前提
千家信息网最后更新 2025年01月23日Spark RDD转换成DataFrame的两种方式

Spark SQL支持两种方式将现有RDD转换为DataFrame。
第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame。这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型。
第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD。虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet

    方法如下         1.将RDD转换成Rows            2.按照第一步Rows的结构定义StructType           3.基于rows和StructType使用createDataFrame创建相应的DF

测试数据为order.data

1   小王  电视  12  2015-08-01 09:08:311   小王  冰箱  24  2015-08-01 09:08:142   小李  空调  12  2015-09-02 09:01:31

代码如下:

object RDD2DF {  /**    * 主要有两种方式    *   第一种是在已经知道schema已经知道的情况下,我们使用反射把RDD转换成DS,进而转换成DF    *   第二种是你不能提前定义好case class,例如数据的结构是以String类型存在的。我们使用接口自定义一个schema    * @param args    */  def main(args: Array[String]): Unit = {    val spark=SparkSession.builder()      .appName("DFDemo")      .master("local[2]")      .getOrCreate()//    rdd2DFFunc1(spark)    rdd2DFFunc2(spark)    spark.stop()  }  /**    * 提前定义好case class    * @param spark    */  def rdd2DFFunc1(spark:SparkSession): Unit ={    import spark.implicits._    val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")    val orderDF=orderRDD.map(_.split("\t"))      .map(attributes=>Order(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4)))      .toDF()    orderDF.show()    Thread.sleep(1000000)  }  /**    *总结:第二种方式就是通过最基础的DF接口方法,将    * @param spark    */  def rdd2DFFunc2(spark:SparkSession): Unit ={    //TODO:   1.将RDD转换成Rows   2.按照第一步Rows的结构定义StructType  3.基于rows和StructType使用createDataFrame创建相应的DF    val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")    //TODO:   1.将RDD转换成Rows    val rowsRDD=orderRDD//      .filter((str:String)=>{val arr=str.split("\t");val res=arr(1)!="小李";res})      .map(_.split("\t"))      .map(attributes=>Row(attributes(0).trim,attributes(1),attributes(2),attributes(3).trim,attributes(4)))    //TODO:   2.按照第一步Rows的结构定义StructTypeval schemaString="id|name|commodity|age|date"    val fields=schemaString.split("\\|")      .map(filedName=>StructField(filedName,StringType,nullable = true))    val schema=StructType(fields)    //TODO:   3.基于rows和StructType使用createDataFrame创建相应的DF   val orderDF= spark.createDataFrame(rowsRDD,schema)    orderDF.show()    orderDF.groupBy("name").count().show()    orderDF.select("name","commodity").show()    Thread.sleep(10000000)  }}case class Order(id:String,name:String,commodity:String,age:String,date:String)

生产中创建DataFrame代码举例

在实际生产环境中,我们其实选择的是方式二这种进行创建DataFrame的,因为我们生产中很难提前定义case class ,因为业务处理之后字段常常会发生意想不到的变化,所以一定要掌握这种方法。

测试数据

baidu   CN  A   E   [01/May/2018:02:15:52 +0800]    2   61.237.59.0 -   112.29.213.35:80    0   movieshow2000.edu.chinaren.com  GET http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4 HTTP/1.1    -   bytes 13869056-13885439/25136186    TCP_HIT/206 112.29.213.35   video/mp4   16374   16384   -:0 0   0   -   -   -   11451601    -   "JSP3/2.0.14"   "-" "-" "-" http    -   2   v1.go2yd.com    0.002   25136186    16384   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   1531818470104-11451601-112.29.213.66#2705261172 644514568baidu   CN  A   E   [01/May/2018:02:25:33 +0800]    2   61.232.37.228   -   112.29.213.35:80    0   github.com  GET http://github.com/user_upload/15316339776271/44y.mp4    HTTP/1.1    -   bytes 13869056-13885439/25136186    TCP_HIT/206 112.29.213.35   video/mp4   83552   16384   -:0 0   0   -   -   -   11451601    -   "JSP3/2.0.14"   "-" "-" "-" http    -   2   v1.go2yd.com    0.002   25136186    16384   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   -   1531818470104-11451601-112.29.213.66#2705261172 644514568

Schema方法类

import org.apache.spark.sql.Rowimport org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}object LogConverUtil {  privateval struct=StructType(    Array(      StructField("domain",StringType)      ,StructField("url",StringType)      ,StructField("pv",LongType)      ,StructField("traffic",LongType)      ,StructField("date",StringType)    )  )  def getStruct():StructType={    struct  }  def parseLog(logLine:String): Row ={    val sourceFormat=new SimpleDateFormat("[dd/MMM/yyyy:hh:mm:ss +0800]",Locale.ENGLISH)    val targetFormat=new SimpleDateFormat("yyyyMMddhh")    try{      val fields=logLine.split("\t")      val domain=fields(10)      val url=fields(12)      val pv=1L      val traffic=fields(19).trim.toLong      val date=getFormatedDate(fields(4),sourceFormat,targetFormat)      Row(domain,url,pv,traffic,date)    }catch {      case e:Exception=>Row(0)    }  }  /**    *    * @param sourceDate  Log中的未格式化日期   [01/May/2018:01:09:45 +0800]    * @return  按照需求格式化字段      2018050101    */  def getFormatedDate(sourceDate: String, sourceFormat: SimpleDateFormat, targetFormat: SimpleDateFormat) = {    val targetTime=targetFormat.format(sourceFormat.parse(sourceDate))    targetTime  }}

RDD2DataFrame主类

import org.apache.spark.sql.SparkSessionobject SparkCleanJob {  def main(args: Array[String]): Unit = {    val spark=SparkSession.builder()      .master("local[2]")      .appName("SparkCleanJob")      .getOrCreate()    val logRDD=spark.sparkContext.textFile("file:///D:/baidu.log")//    logRDD.take(2).foreach(println(_))    //调用LogConverUtil里的parseLog方法和getStruct方法获得Rows对象和StructType对象    val logDF=spark.createDataFrame(logRDD.map(LogConverUtil.parseLog(_)),LogConverUtil.getStruct())    logDF.show(false)    logDF.printSchema()  }}

结果

+------------------------------+-------------------------------------------------------------------------+---+-------+----------+|domain                        |url                                                                      |pv |traffic|date      |+------------------------------+-------------------------------------------------------------------------+---+-------+----------+|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4  |1  |16374  |2018050102||github.com                    |http://github.com/user_upload/15316339776271/44y.mp4                     |1  |83552  |2018050102||yooku.com                     |http://yooku.com/user_upload/15316339776271x0.html                       |1  |74986  |2018050101||rw.uestc.edu.cn               |http://rw.uestc.edu.cn/user_upload/15316339776271515.mp4                 |1  |55297  |2018050101||github.com                    |http://github.com/user_upload/15316339776271x05.mp4                      |1  |26812  |2018050102||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y4.html  |1  |50392  |2018050103||github.com                    |http://github.com/user_upload/15316339776271x15.html                     |1  |40092  |2018050101||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/153163397762714z.mp4   |1  |8368   |2018050102||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/5z.html |1  |29677  |2018050103||rw.uestc.edu.cn               |http://rw.uestc.edu.cn/user_upload/153163397762710w.mp4                  |1  |26124  |2018050102||yooku.com                     |http://yooku.com/user_upload/15316339776271yz.mp4                        |1  |32219  |2018050101||yooku.com                     |http://yooku.com/user_upload/153163397762713w.html                       |1  |90389  |2018050101||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271z/.html  |1  |15623  |2018050101||yooku.com                     |http://yooku.com/user_upload/1531633977627142.html                       |1  |53453  |2018050103||yooku.com                     |http://yooku.com/user_upload/15316339776271230.mp4                       |1  |20309  |2018050102||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/4w1.html|1  |87804  |2018050103||movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y5y.html |1  |69469  |2018050103||yooku.com                     |http://yooku.com/user_upload/15316339776271011/.mp4                      |1  |3782   |2018050103||github.com                    |http://github.com/user_upload/15316339776271wzw.mp4                      |1  |89642  |2018050102||github.com                    |http://github.com/user_upload/15316339776271/1/.mp4                      |1  |63551  |2018050103|+------------------------------+-------------------------------------------------------------------------+---+-------+----------+only showing top 20 rowsroot |-- domain: string (nullable = true) |-- url: string (nullable = true) |-- pv: long (nullable = true) |-- traffic: long (nullable = true) |-- date: string (nullable = true)Process finished with exit code 0

注:除了这种使用RDD读取文本进而转化成DataFrame之外,我们也会使用自定义DefaultSource来直接将text转化成DataFrame

0