千家信息网

Spark-SQL的具体编程场景

发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,入门案例:object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger
千家信息网最后更新 2024年09月22日Spark-SQL的具体编程场景

入门案例:

object SparkSqlTest {    def main(args: Array[String]): Unit = {        //屏蔽多余的日志        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)        Logger.getLogger("org.project-spark").setLevel(Level.WARN)        //构建编程入口        val conf: SparkConf = new SparkConf()        conf.setAppName("SparkSqlTest")            .setMaster("local[2]")        val spark: SparkSession = SparkSession.builder().config(conf)            .getOrCreate()        /**          * 注意在spark 2.0之后:          * val sqlContext = new SQLContext(sparkContext)          * val hiveContext = new HiveContext(sparkContext)          * 主构造器被私有化,所以这里只能使用SparkSession对象创建          */        //创建sqlcontext对象        val sqlContext: SQLContext = spark.sqlContext        //加载数据为DataFrame,这里加载的是json数据        //数据格式:{name:'',age:18}        val perDF: DataFrame = sqlContext.read.json("hdfs://zzy/data/person.json")        //查看二维表结构        perDF.printSchema()        //查看数据,默认显示20条记录        perDF.show()        //复杂查询        perDF.select("name").show() //指定字段进行查询        perDF.select(new Column("name"),new Column("age").>(18)).show()  //指定查询条件进行查询        perDF.select("name","age").where(new Column("age").>(18)).show() //指定查询条件进行查询        perDF.select("age").groupBy("age").avg("age") //聚合操作    }}

如果对入门案例不太了解的话,接下来分步骤的介绍:

(1)RDD/DataSet//DataFrame/list 之间的转化

   通过RDD转换为DataFrame/DataSet,有两种方式:
    - 通过反射的方式将RDD或者外部的集合转化为dataframe/datasets
    - 要通过编程动态的来将外部的集合或者RDD转化为dataframe或者dataset
   注意:如果是dataFrame对应的是java bean ,如果是dataSet对应的是case class

通过反射的方式将RDD或者外部的集合转化为dataframe/datasets

数据准备

case class Student(name:String, birthday:String, province:String)val stuList = List(      new Student("委xx", "1998-11-11", "山西"),      new Student("吴xx", "1999-06-08", "河南"),      new Student("戚xx", "2000-03-08", "山东"),      new Student("王xx", "1997-07-09", "安徽"),      new Student("薛xx", "2002-08-09", "辽宁")    )

list --> DataFrame:

        //屏蔽多余的日志        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)        Logger.getLogger("org.project-spark").setLevel(Level.WARN)        //构建编程入口        val conf: SparkConf = new SparkConf()        conf.setAppName("SparkSqlTest")            .setMaster("local[2]")            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")            .registerKryoClasses(Array(classOf[Student]))        val spark: SparkSession = SparkSession.builder().config(conf)            .getOrCreate()        //创建sqlcontext对象        val sqlContext: SQLContext = spark.sqlContext        /**          * list--->DataFrame          * 将scala集合转换为java集合          */        val javaList: util.List[Student] = JavaConversions.seqAsJavaList(stuList)        val stuDF: DataFrame = sqlContext.createDataFrame(javaList,classOf[Student])        val count = stuDF.count()        println(count)

RDD --> DataFrame:

        //屏蔽多余的日志        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)        Logger.getLogger("org.project-spark").setLevel(Level.WARN)        //构建编程入口        val conf: SparkConf = new SparkConf()        conf.setAppName("SparkSqlTest")            .setMaster("local[2]")            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")            .registerKryoClasses(Array(classOf[Student]))        val spark: SparkSession = SparkSession.builder().config(conf)            .getOrCreate()        //创建sqlcontext对象        val sqlContext: SQLContext = spark.sqlContext        //创建sparkContext        val sc: SparkContext = spark.sparkContext        /**          * RDD--->DataFrame          */        val stuRDD: RDD[Student] = sc.makeRDD(stuList)        val stuDF: DataFrame = sqlContext.createDataFrame(stuRDD,classOf[Student])        val count = stuDF.count()        println(count)

list --> DataSet:

        //屏蔽多余的日志        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)        Logger.getLogger("org.project-spark").setLevel(Level.WARN)        //构建编程入口        val conf: SparkConf = new SparkConf()        conf.setAppName("SparkSqlTest")            .setMaster("local[2]")            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")            .registerKryoClasses(Array(classOf[Student]))        val spark: SparkSession = SparkSession.builder().config(conf)            .getOrCreate()        //创建sqlcontext对象        val sqlContext: SQLContext = spark.sqlContext        //创建sparkContext        val sc: SparkContext = spark.sparkContext        /**          * list--->DataSet          */        //如果创建Dataset 必须导入下面的隐式转换        import spark.implicits._        val stuDF: Dataset[Student] = sqlContext.createDataset(stuList)        stuDF.createTempView("student")        //使用完整的sql语句进行查询,使用反射的方式,只有Dataset可以,dataFrame不行        val sql=            """              |select * from student            """.stripMargin        spark.sql(sql).show()


RDD --> DataSet:

        //屏蔽多余的日志        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)        Logger.getLogger("org.project-spark").setLevel(Level.WARN)        //构建编程入口        val conf: SparkConf = new SparkConf()        conf.setAppName("SparkSqlTest")            .setMaster("local[2]")            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")            .registerKryoClasses(Array(classOf[Student]))        val spark: SparkSession = SparkSession.builder().config(conf)            .getOrCreate()        //创建sqlcontext对象        val sqlContext: SQLContext = spark.sqlContext        //创建sparkContext        val sc: SparkContext = spark.sparkContext        /**          * RDD--->DataSet          */        //如果创建Dataset 必须导入下面的隐式转换        import spark.implicits._        val stuRDD: RDD[Student] = sc.makeRDD(stuList)        val stuDF: Dataset[Student] = sqlContext.createDataset(stuRDD)        stuDF.createTempView("student")        //使用完整的sql语句进行查询,使用反射的方式,只有Dataset可以,dataFrame不行        val sql=            """              |select * from student            """.stripMargin        spark.sql(sql).show()
通过编程动态的来将外部的集合或者RDD转化为dataframe或者dataset

list --> DataFrame:

        //屏蔽多余的日志        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)        Logger.getLogger("org.project-spark").setLevel(Level.WARN)        //构建编程入口        val conf: SparkConf = new SparkConf()        conf.setAppName("SparkSqlTest")            .setMaster("local[2]")            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")            .registerKryoClasses(Array(classOf[Student]))        val spark: SparkSession = SparkSession.builder().config(conf)            .getOrCreate()        //创建sqlcontext对象        val sqlContext: SQLContext = spark.sqlContext        //创建sparkContext        val sc: SparkContext = spark.sparkContext        //list-DataFrame        //1.将list中的元素全部转化为Row        val RowList: List[Row] = stuList.map(item => {            Row(item.name, item.birthday, item.province)        })        //2.构建元数据        val schema=StructType(List(            StructField("name",DataTypes.StringType),            StructField("birthday",DataTypes.StringType),            StructField("province",DataTypes.StringType)        ))        //将scala的集合转化为java集合        val javaList = JavaConversions.seqAsJavaList(RowList)        val stuDF = spark.createDataFrame(javaList,schema)        stuDF.createTempView("student")        //使用完整的sql语句进行查询,使用动态编程的方式,Dataset、dataFrame都可以        val sql=            """              |select * from student            """.stripMargin        spark.sql(sql).show()

RDD--> DataFrame:

        //屏蔽多余的日志        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)        Logger.getLogger("org.project-spark").setLevel(Level.WARN)        //构建编程入口        val conf: SparkConf = new SparkConf()        conf.setAppName("SparkSqlTest")            .setMaster("local[2]")            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")            .registerKryoClasses(Array(classOf[Student]))        val spark: SparkSession = SparkSession.builder().config(conf)            .getOrCreate()        //创建sqlcontext对象        val sqlContext: SQLContext = spark.sqlContext        //创建sparkContext        val sc: SparkContext = spark.sparkContext        //RDD-DataFrame        //将RDD中的元素转换为Row        val RowRDD: RDD[Row] = sc.makeRDD(stuList).map(item => {            Row(item.name, item.birthday, item.province)        })        //2.构建元数据        val schema=StructType(List(            StructField("name",DataTypes.StringType),            StructField("birthday",DataTypes.StringType),            StructField("province",DataTypes.StringType)        ))        val stuDF = spark.createDataFrame(RowRDD,schema)        stuDF.createTempView("student")        //使用完整的sql语句进行查询,使用动态编程的方式,Dataset、dataFrame都可以        val sql=            """              |select * from student            """.stripMargin        spark.sql(sql).show()

由于构建DataFrame和构建DataSet一模一样,这里就不在演示

(2)spark SQL加载数据的方式

        //屏蔽多余的日志        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)        Logger.getLogger("org.project-spark").setLevel(Level.WARN)        //构建编程入口        val conf: SparkConf = new SparkConf()        conf.setAppName("SparkSqlTest")                .setMaster("local[2]")        val spark: SparkSession = SparkSession.builder().config(conf)                .getOrCreate()        //创建sqlcontext对象        val sqlContext: SQLContext = spark.sqlContext        //创建sparkContext        val sc: SparkContext = spark.sparkContext        //早期版本加载:parquet文件        sqlContext.load("hdfs://zzy/hello.parquet")        //加载json数据        sqlContext.read.json("hdfs://zzy/hello.json")        //加载普通文件        sqlContext.read.text("hdfs://zzy/hello.txt")        //加载csv        sqlContext.read.csv("hdfs://zy/hello.csv")        //读取jdbc的数据        val url="jdbc:mysql://localhost:3306/hello"        val properties=new Properties()        properties.setProperty("user","root")        properties.setProperty("password","123456")        val tableName="book"        sqlContext.read.jdbc(url,tableName,properties)

(3)spark SQL数据落地的方式

        //屏蔽多余的日志        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)        Logger.getLogger("org.project-spark").setLevel(Level.WARN)        //构建编程入口        val conf: SparkConf = new SparkConf()        conf.setAppName("SparkSqlTest")                .setMaster("local[2]")        val spark: SparkSession = SparkSession.builder().config(conf)                .getOrCreate()        //创建sqlcontext对象        val sqlContext: SQLContext = spark.sqlContext        //创建sparkContext        val sc: SparkContext = spark.sparkContext        val testFD: DataFrame = sqlContext.read.text("hdfs://zzy/hello.txt")        //写入到普通文件        testFD.write.format("json") //以什么格式写入                .mode(SaveMode.Append)  //写入方式                .save("hdfs://zzy/hello.json")  //写入的文件位置        //写入到数据库        val url="jdbc:mysql://localhost:3306/hello"        val table_name="book"        val prots=new Properties()        prots.put("user","root")        prots.put("password","123456")        testFD.write.mode(SaveMode.Append).jdbc(url,table_name,prots)
0