Spark-SQL的具体编程场景
发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,入门案例:object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger
千家信息网最后更新 2025年02月07日Spark-SQL的具体编程场景
入门案例:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() /** * 注意在spark 2.0之后: * val sqlContext = new SQLContext(sparkContext) * val hiveContext = new HiveContext(sparkContext) * 主构造器被私有化,所以这里只能使用SparkSession对象创建 */ //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //加载数据为DataFrame,这里加载的是json数据 //数据格式:{name:'',age:18} val perDF: DataFrame = sqlContext.read.json("hdfs://zzy/data/person.json") //查看二维表结构 perDF.printSchema() //查看数据,默认显示20条记录 perDF.show() //复杂查询 perDF.select("name").show() //指定字段进行查询 perDF.select(new Column("name"),new Column("age").>(18)).show() //指定查询条件进行查询 perDF.select("name","age").where(new Column("age").>(18)).show() //指定查询条件进行查询 perDF.select("age").groupBy("age").avg("age") //聚合操作 }}
如果对入门案例不太了解的话,接下来分步骤的介绍:
(1)RDD/DataSet//DataFrame/list 之间的转化
通过RDD转换为DataFrame/DataSet,有两种方式:
- 通过反射的方式将RDD或者外部的集合转化为dataframe/datasets
- 要通过编程动态的来将外部的集合或者RDD转化为dataframe或者dataset
注意:如果是dataFrame对应的是java bean ,如果是dataSet对应的是case class
通过反射的方式将RDD或者外部的集合转化为dataframe/datasets
数据准备:
case class Student(name:String, birthday:String, province:String)val stuList = List( new Student("委xx", "1998-11-11", "山西"), new Student("吴xx", "1999-06-08", "河南"), new Student("戚xx", "2000-03-08", "山东"), new Student("王xx", "1997-07-09", "安徽"), new Student("薛xx", "2002-08-09", "辽宁") )
list --> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext /** * list--->DataFrame * 将scala集合转换为java集合 */ val javaList: util.List[Student] = JavaConversions.seqAsJavaList(stuList) val stuDF: DataFrame = sqlContext.createDataFrame(javaList,classOf[Student]) val count = stuDF.count() println(count)
RDD --> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext /** * RDD--->DataFrame */ val stuRDD: RDD[Student] = sc.makeRDD(stuList) val stuDF: DataFrame = sqlContext.createDataFrame(stuRDD,classOf[Student]) val count = stuDF.count() println(count)
list --> DataSet:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext /** * list--->DataSet */ //如果创建Dataset 必须导入下面的隐式转换 import spark.implicits._ val stuDF: Dataset[Student] = sqlContext.createDataset(stuList) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用反射的方式,只有Dataset可以,dataFrame不行 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
RDD --> DataSet:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext /** * RDD--->DataSet */ //如果创建Dataset 必须导入下面的隐式转换 import spark.implicits._ val stuRDD: RDD[Student] = sc.makeRDD(stuList) val stuDF: Dataset[Student] = sqlContext.createDataset(stuRDD) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用反射的方式,只有Dataset可以,dataFrame不行 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
通过编程动态的来将外部的集合或者RDD转化为dataframe或者dataset
list --> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext //list-DataFrame //1.将list中的元素全部转化为Row val RowList: List[Row] = stuList.map(item => { Row(item.name, item.birthday, item.province) }) //2.构建元数据 val schema=StructType(List( StructField("name",DataTypes.StringType), StructField("birthday",DataTypes.StringType), StructField("province",DataTypes.StringType) )) //将scala的集合转化为java集合 val javaList = JavaConversions.seqAsJavaList(RowList) val stuDF = spark.createDataFrame(javaList,schema) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用动态编程的方式,Dataset、dataFrame都可以 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
RDD--> DataFrame:
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext //RDD-DataFrame //将RDD中的元素转换为Row val RowRDD: RDD[Row] = sc.makeRDD(stuList).map(item => { Row(item.name, item.birthday, item.province) }) //2.构建元数据 val schema=StructType(List( StructField("name",DataTypes.StringType), StructField("birthday",DataTypes.StringType), StructField("province",DataTypes.StringType) )) val stuDF = spark.createDataFrame(RowRDD,schema) stuDF.createTempView("student") //使用完整的sql语句进行查询,使用动态编程的方式,Dataset、dataFrame都可以 val sql= """ |select * from student """.stripMargin spark.sql(sql).show()
由于构建DataFrame和构建DataSet一模一样,这里就不在演示
(2)spark SQL加载数据的方式
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext //早期版本加载:parquet文件 sqlContext.load("hdfs://zzy/hello.parquet") //加载json数据 sqlContext.read.json("hdfs://zzy/hello.json") //加载普通文件 sqlContext.read.text("hdfs://zzy/hello.txt") //加载csv sqlContext.read.csv("hdfs://zy/hello.csv") //读取jdbc的数据 val url="jdbc:mysql://localhost:3306/hello" val properties=new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") val tableName="book" sqlContext.read.jdbc(url,tableName,properties)
(3)spark SQL数据落地的方式
//屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //创建sparkContext val sc: SparkContext = spark.sparkContext val testFD: DataFrame = sqlContext.read.text("hdfs://zzy/hello.txt") //写入到普通文件 testFD.write.format("json") //以什么格式写入 .mode(SaveMode.Append) //写入方式 .save("hdfs://zzy/hello.json") //写入的文件位置 //写入到数据库 val url="jdbc:mysql://localhost:3306/hello" val table_name="book" val prots=new Properties() prots.put("user","root") prots.put("password","123456") testFD.write.mode(SaveMode.Append).jdbc(url,table_name,prots)
编程
数据
对象
方式
查询
入口
日志
动态
文件
语句
反射
不行
普通
元素
只有
条件
格式
案例
面的
复杂
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
义乌微锐网络技术有限公司
软件开发不难吗
aws软件开发
ios系统软件开发方案
比邻软件开发
用自己的电脑做网站服务器
泰安量化积分管理软件开发电话
学校举行网络安全周
重庆软件开发学习网址哪家好
负责阿里巴巴网络安全的人
不能作为网络安全的特征是
做软件开发工程师好不好
为什么要进行网络安全研究
如何坚守网络安全七条底线
深圳晨星软件开发有限公司
广东嵌入式软件开发机构
上海鑫锘计算机软件开发
信息管理系统中数据库的作用
新乡市领恒网络技术有限公司
QQ数据库用户表
负责阿里巴巴网络安全的人
网络安全高级讲师
违反网络安全义务
精通网络技术有限公司怎么样
海康nvr存储服务器
发改委发布的网络安全试点
表单设计写入数据库
gdc服务器交换机
奇虎360网络技术有限公司
求生之路2开不了本地服务器