Spark-SQL的具体编程场景
发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,入门案例:object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger
千家信息网最后更新 2024年09月22日Spark-SQL的具体编程场景
入门案例:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() /** * 注意在spark 2.0之后: * val sqlContext = new SQLContext(sparkContext) * val hiveContext = new HiveContext(sparkContext) * 主构造器被私有化,所以这里只能使用SparkSession对象创建 */ //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //加载数据为DataFrame,这里加载的是json数据 //数据格式:{name:'',age:18} val perDF: DataFrame = sqlContext.read.json("hdfs://zzy/data/person.json") //查看二维表结构 perDF.printSchema() //查看数据,默认显示20条记录 perDF.show() //复杂查询 perDF.select("name").show() //指定字段进行查询 perDF.select(new Column("name"),new Column("age").>(18)).show() //指定查询条件进行查询 perDF.select("name","age").where(new Column("age").>(18)).show() //指定查询条件进行查询 perDF.select("age").groupBy("age").avg("age") //聚合操作 }}
如果对入门案例不太了解的话,接下来分步骤的介绍:
(1)RDD/DataSet//DataFrame/list 之间的转化
通过RDD转换为DataFrame/DataSet,有两种方式:
- 通过反射的方式将RDD或者外部的集合转化为dataframe/datasets
- 要通过编程动态的来将外部的集合或者RDD转化为dataframe或者dataset
注意:如果是dataFrame对应的是java bean ,如果是dataSet对应的是case class
通过反射的方式将RDD或者外部的集合转化为dataframe/datasets
数据准备:
case class Student(name:String, birthday:String, province:String)val stuList = List( new Student("委xx", "1998-11-11", "山西"), new Student("吴xx", "1999-06-08", "河南"), new Student("戚xx", "2000-03-08", "山东"), new Student("王xx", "1997-07-09", "安徽"), new Student("薛xx", "2002-08-09", "辽宁") )
list --> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext /** * list--->DataFrame * 将scala集合转换为java集合 */ val javaList: util.List[Student] = JavaConversions.seqAsJavaList(stuList) val stuDF: DataFrame = sqlContext.createDataFrame(javaList,classOf[Student]) val count = stuDF.count() println(count)
RDD --> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext /** * RDD--->DataFrame */ val stuRDD: RDD[Student] = sc.makeRDD(stuList) val stuDF: DataFrame = sqlContext.createDataFrame(stuRDD,classOf[Student]) val count = stuDF.count() println(count)
list --> DataSet:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext /** * list--->DataSet */ //如果创建Dataset 必须导入下面的隐式转换 import spark.implicits._ val stuDF: Dataset[Student] = sqlContext.createDataset(stuList) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用反射的方式,只有Dataset可以,dataFrame不行 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
RDD --> DataSet:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext /** * RDD--->DataSet */ //如果创建Dataset 必须导入下面的隐式转换 import spark.implicits._ val stuRDD: RDD[Student] = sc.makeRDD(stuList) val stuDF: Dataset[Student] = sqlContext.createDataset(stuRDD) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用反射的方式,只有Dataset可以,dataFrame不行 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
通过编程动态的来将外部的集合或者RDD转化为dataframe或者dataset
list --> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext //list-DataFrame //1.将list中的元素全部转化为Row val RowList: List[Row] = stuList.map(item => { Row(item.name, item.birthday, item.province) }) //2.构建元数据 val schema=StructType(List( StructField("name",DataTypes.StringType), StructField("birthday",DataTypes.StringType), StructField("province",DataTypes.StringType) )) //将scala的集合转化为java集合 val javaList = JavaConversions.seqAsJavaList(RowList) val stuDF = spark.createDataFrame(javaList,schema) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用动态编程的方式,Dataset、dataFrame都可以 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
RDD--> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext //RDD-DataFrame //将RDD中的元素转换为Row val RowRDD: RDD[Row] = sc.makeRDD(stuList).map(item => { Row(item.name, item.birthday, item.province) }) //2.构建元数据 val schema=StructType(List( StructField("name",DataTypes.StringType), StructField("birthday",DataTypes.StringType), StructField("province",DataTypes.StringType) )) val stuDF = spark.createDataFrame(RowRDD,schema) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用动态编程的方式,Dataset、dataFrame都可以 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
由于构建DataFrame和构建DataSet一模一样,这里就不在演示
(2)spark SQL加载数据的方式
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext //早期版本加载:parquet文件 sqlContext.load("hdfs://zzy/hello.parquet") //加载json数据 sqlContext.read.json("hdfs://zzy/hello.json") //加载普通文件 sqlContext.read.text("hdfs://zzy/hello.txt") //加载csv sqlContext.read.csv("hdfs://zy/hello.csv") //读取jdbc的数据 val url="jdbc:mysql://localhost:3306/hello" val properties=new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") val tableName="book" sqlContext.read.jdbc(url,tableName,properties)
(3)spark SQL数据落地的方式
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext val testFD: DataFrame = sqlContext.read.text("hdfs://zzy/hello.txt") //写入到普通文件 testFD.write.format("json") //以什么格式写入 .mode(SaveMode.Append) //写入方式 .save("hdfs://zzy/hello.json") //写入的文件位置 //写入到数据库 val url="jdbc:mysql://localhost:3306/hello" val table_name="book" val prots=new Properties() prots.put("user","root") prots.put("password","123456") testFD.write.mode(SaveMode.Append).jdbc(url,table_name,prots)
编程
数据
对象
方式
查询
入口
日志
动态
文件
语句
反射
不行
普通
元素
只有
条件
格式
案例
面的
复杂
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
的网络安全风险和威胁
基于图形的软件开发工具
淘宝怎么开通软件开发类
互联网科技智慧门诊
石家庄便捷式设备管理软件开发
附魔服务器
网络安全的应用方式
软件开发服务费怎么挂账
目前最常用的数据库有
软件开发哪个方向就业好
csgo躲猫猫服务器贴吧
打印服务器工具
街头篮球暂时无法连接服务器
公安信息网络安全保密工作纪律心得
软件开发过程中的二八定律
vs2008如何连接自带数据库
网络安全工程师在国企好不好
张禹艺术名家数据库
电力行业软件开发公司
安装数据库要哪些东西
全球华商百科数据库王奕萌
中国世界领先数据库技术
怎么连接公司的redis数据库
vb 判断数据库非空
html表单如何发到数据库
博士数据库连接失败
守护城市网络安全
常熟市微派网络技术有限公司
2019年软件开发行业分析
查看数据库中是否存在