Spark-SQL的具体编程场景
发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,入门案例:object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger
千家信息网最后更新 2024年09月22日Spark-SQL的具体编程场景
入门案例:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() /** * 注意在spark 2.0之后: * val sqlContext = new SQLContext(sparkContext) * val hiveContext = new HiveContext(sparkContext) * 主构造器被私有化,所以这里只能使用SparkSession对象创建 */ //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //加载数据为DataFrame,这里加载的是json数据 //数据格式:{name:'',age:18} val perDF: DataFrame = sqlContext.read.json("hdfs://zzy/data/person.json") //查看二维表结构 perDF.printSchema() //查看数据,默认显示20条记录 perDF.show() //复杂查询 perDF.select("name").show() //指定字段进行查询 perDF.select(new Column("name"),new Column("age").>(18)).show() //指定查询条件进行查询 perDF.select("name","age").where(new Column("age").>(18)).show() //指定查询条件进行查询 perDF.select("age").groupBy("age").avg("age") //聚合操作 }}
如果对入门案例不太了解的话,接下来分步骤的介绍:
(1)RDD/DataSet//DataFrame/list 之间的转化
通过RDD转换为DataFrame/DataSet,有两种方式:
- 通过反射的方式将RDD或者外部的集合转化为dataframe/datasets
- 要通过编程动态的来将外部的集合或者RDD转化为dataframe或者dataset
注意:如果是dataFrame对应的是java bean ,如果是dataSet对应的是case class
通过反射的方式将RDD或者外部的集合转化为dataframe/datasets
数据准备:
case class Student(name:String, birthday:String, province:String)val stuList = List( new Student("委xx", "1998-11-11", "山西"), new Student("吴xx", "1999-06-08", "河南"), new Student("戚xx", "2000-03-08", "山东"), new Student("王xx", "1997-07-09", "安徽"), new Student("薛xx", "2002-08-09", "辽宁") )
list --> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext /** * list--->DataFrame * 将scala集合转换为java集合 */ val javaList: util.List[Student] = JavaConversions.seqAsJavaList(stuList) val stuDF: DataFrame = sqlContext.createDataFrame(javaList,classOf[Student]) val count = stuDF.count() println(count)
RDD --> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext /** * RDD--->DataFrame */ val stuRDD: RDD[Student] = sc.makeRDD(stuList) val stuDF: DataFrame = sqlContext.createDataFrame(stuRDD,classOf[Student]) val count = stuDF.count() println(count)
list --> DataSet:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext /** * list--->DataSet */ //如果创建Dataset 必须导入下面的隐式转换 import spark.implicits._ val stuDF: Dataset[Student] = sqlContext.createDataset(stuList) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用反射的方式,只有Dataset可以,dataFrame不行 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
RDD --> DataSet:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext /** * RDD--->DataSet */ //如果创建Dataset 必须导入下面的隐式转换 import spark.implicits._ val stuRDD: RDD[Student] = sc.makeRDD(stuList) val stuDF: Dataset[Student] = sqlContext.createDataset(stuRDD) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用反射的方式,只有Dataset可以,dataFrame不行 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
通过编程动态的来将外部的集合或者RDD转化为dataframe或者dataset
list --> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext //list-DataFrame //1.将list中的元素全部转化为Row val RowList: List[Row] = stuList.map(item => { Row(item.name, item.birthday, item.province) }) //2.构建元数据 val schema=StructType(List( StructField("name",DataTypes.StringType), StructField("birthday",DataTypes.StringType), StructField("province",DataTypes.StringType) )) //将scala的集合转化为java集合 val javaList = JavaConversions.seqAsJavaList(RowList) val stuDF = spark.createDataFrame(javaList,schema) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用动态编程的方式,Dataset、dataFrame都可以 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
RDD--> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext //RDD-DataFrame //将RDD中的元素转换为Row val RowRDD: RDD[Row] = sc.makeRDD(stuList).map(item => { Row(item.name, item.birthday, item.province) }) //2.构建元数据 val schema=StructType(List( StructField("name",DataTypes.StringType), StructField("birthday",DataTypes.StringType), StructField("province",DataTypes.StringType) )) val stuDF = spark.createDataFrame(RowRDD,schema) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用动态编程的方式,Dataset、dataFrame都可以 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
由于构建DataFrame和构建DataSet一模一样,这里就不在演示
(2)spark SQL加载数据的方式
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext //早期版本加载:parquet文件 sqlContext.load("hdfs://zzy/hello.parquet") //加载json数据 sqlContext.read.json("hdfs://zzy/hello.json") //加载普通文件 sqlContext.read.text("hdfs://zzy/hello.txt") //加载csv sqlContext.read.csv("hdfs://zy/hello.csv") //读取jdbc的数据 val url="jdbc:mysql://localhost:3306/hello" val properties=new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") val tableName="book" sqlContext.read.jdbc(url,tableName,properties)
(3)spark SQL数据落地的方式
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext val testFD: DataFrame = sqlContext.read.text("hdfs://zzy/hello.txt") //写入到普通文件 testFD.write.format("json") //以什么格式写入 .mode(SaveMode.Append) //写入方式 .save("hdfs://zzy/hello.json") //写入的文件位置 //写入到数据库 val url="jdbc:mysql://localhost:3306/hello" val table_name="book" val prots=new Properties() prots.put("user","root") prots.put("password","123456") testFD.write.mode(SaveMode.Append).jdbc(url,table_name,prots)
编程
数据
对象
方式
查询
入口
日志
动态
文件
语句
反射
不行
普通
元素
只有
条件
格式
案例
面的
复杂
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
nfs万兆 服务器千兆
杨浦区个人数据库服务成本
谷歌地球不支持手机服务器怎么办
web服务器弱口令安全措施
泰山服务器电脑版
郑州工控软件开发定制费用
hp 分布式数据库
数据库备份恢复安全管理
网络安全和国家安全讲座
miui应用自动断开数据库
山西pdu服务器电源种类有哪些
图数据库排行榜
sql数据库怎么加入表
用旧电脑搭建家庭服务器的作用
软件开发公司用户获取
达梦数据库怎么添加
中国香港文档软件开发哪家强
山东省服务器代理商查询
工业级串口服务器区别
深圳麦哲伦网络技术 招聘
光遇怎么看服务器
眉山软件开发销售电话
密码学编码与网络安全视频
服务器如何查看登录用户
php 数据库数组 输出
锐捷网络技术支持面经
唐山附近哪有软件开发的
软件开发中级工程师工资
服务器如何显示在电脑
关于软件开发大学职业规划