SparkSQL如何运用
发表于:2025-01-20 作者:千家信息网编辑
千家信息网最后更新 2025年01月20日,今天小编给大家分享一下SparkSQL如何运用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解
千家信息网最后更新 2025年01月20日SparkSQL如何运用
今天小编给大家分享一下SparkSQL如何运用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
一:SparkSQL
1.SparkSQL简介
Spark SQL是Spark的一个模块,用于处理结构化的数据,它提供了一个数据抽象DataFrame(最核心的编程抽象就是DataFrame),并且SparkSQL作为分布式SQL查询引擎。
Spark SQL就是将SQL转换成一个任务,提交到集群上运行,类似于Hive的执行方式。
2.SparkSQL运行原理
将Spark SQL转化为RDD,然后提交到集群执行。
3.SparkSQL特点
(1)容易整合,Spark SQL已经集成在Spark中
(2)提供了统一的数据访问方式:JSON、CSV、JDBC、Parquet等都是使用统一的方式进行访问
(3)兼容 Hive
(4)标准的数据连接:JDBC、ODBC
二、SparkSQL运用
package sqlimport org.apache.avro.ipc.specific.Personimport org.apache.sparkimport org.apache.spark.rdd.RDDimport org.apache.spark.sqlimport org.apache.spark.sql.catalyst.InternalRowimport org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import org.junit.Testclass Intro { @Test def dsIntro(): Unit ={ val spark: SparkSession = new sql.SparkSession.Builder() .appName("ds intro") .master("local[6]") .getOrCreate() //导入隐算是shi转换 import spark.implicits._ val sourceRDD: RDD[Person] =spark.sparkContext.parallelize(Seq(Person("张三",10),Person("李四",15))) val personDS: Dataset[Person] =sourceRDD.toDS();//personDS.printSchema()打印出错信息 val resultDS: Dataset[Person] =personDS.where('age>10) .select('name,'age) .as[Person] resultDS.show() } @Test def dfIntro(): Unit ={ val spark: SparkSession =new SparkSession.Builder() .appName("ds intro") .master("local") .getOrCreate() import spark.implicits._ val sourceRDD: RDD[Person] = spark.sparkContext.parallelize(Seq(Person("张三",10),Person("李四",15))) val df: DataFrame = sourceRDD.toDF()//隐shi转换 df.createOrReplaceTempView("person")//创建表 val resultDF: DataFrame =spark.sql("select name from person where age>=10 and age<=20") resultDF.show() } @Test def database1(): Unit ={ //1.创建sparkSession val spark: SparkSession =new SparkSession.Builder() .appName("database1") .master("local[6]") .getOrCreate() //2.导入引入shi子转换 import spark.implicits._ //3.演示 val sourceRDD: RDD[Person] =spark.sparkContext.parallelize(Seq(Person("张三",10),Person("李四",15))) val dataset: Dataset[Person] =sourceRDD.toDS() //Dataset 支持强类型的API dataset.filter(item => item.age >10).show() //Dataset 支持若弱类型的API dataset.filter('age>10).show() //Dataset 可以直接编写SQL表达式 dataset.filter("age>10").show() } @Test def database2(): Unit ={ val spark: SparkSession = new SparkSession.Builder() .master("local[6]") .appName("database2") .getOrCreate() import spark.implicits._ val dataset: Dataset[Person] =spark.createDataset(Seq(Person("张三",10),Person("李四",20))) //无论Dataset中放置的是什么类型的对象,最终执行计划中的RDD上都是internalRow //直接获取到已经分析和解析过得Dataset的执行计划,从中拿到RDD val executionRdd: RDD[InternalRow] =dataset.queryExecution.toRdd //通过将Dataset底层的RDD通过Decoder转成了和Dataset一样的类型RDD val typedRdd:RDD[Person] = dataset.rdd println(executionRdd.toDebugString) println() println() println(typedRdd.toDebugString) } @Test def database3(): Unit = { //1.创建sparkSession val spark: SparkSession = new SparkSession.Builder() .appName("database1") .master("local[6]") .getOrCreate() //2.导入引入shi子转换 import spark.implicits._ val dataFrame: DataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF() //3.看看DataFrame可以玩出什么花样 //select name from... dataFrame.where('age > 10) .select('name) .show() }// @Test// def database4(): Unit = {// //1.创建sparkSession// val spark: SparkSession = new SparkSession.Builder()// .appName("database1")// .master("local[6]")// .getOrCreate()// //2.导入引入shi子转换// import spark.implicits._// val personList=Seq(Person("zhangsan",15),Person("lisi",20))//// //1.toDF// val df1: DataFrame =personList.toDF()// val df2: DataFrame =spark.sparkContext.parallelize(personList).toDF()// //2.createDataFrame// val df3: DataFrame =spark.createDataFrame(personList)//// //3.read// val df4: DataFrame =spark.read.csv("")// df4.show()// } //toDF()是转成DataFrame,toDs是转成Dataset // DataFrame就是Dataset[Row] 代表弱类型的操作,Dataset代表强类型的操作,中的类型永远是row,DataFrame可以做到运行时类型安全,Dataset可以做到 编译时和运行时都安全@Testdef database4(): Unit = { //1.创建sparkSession val spark: SparkSession = new SparkSession.Builder() .appName("database1") .master("local[6]") .getOrCreate() //2.导入引入shi子转换 import spark.implicits._ val personList=Seq(Person("zhangsan",15),Person("lisi",20)) //DataFrame代表弱类型操作是编译时不安全 val df: DataFrame =personList.toDF() //Dataset是强类型的 val ds: Dataset[Person] =personList.toDS() ds.map((person:Person) =>Person(person.name,person.age))} @Test def row(): Unit ={ //1.Row如何创建,它是什么 //row对象必须配合Schema对象才会有列名 val p: Person =Person("zhangsan",15) val row: Row =Row("zhangsan",15) //2.如何从row中获取数据 row.getString(0) row.getInt(1) //3.Row也是样例类、 row match { case Row(name,age) => println(name,age) } }}case class Person(name: String, age: Int)
以上就是"SparkSQL如何运用"这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注行业资讯频道。
类型
数据
就是
知识
篇文章
运行
代表
对象
方式
张三
李四
安全
内容
集群
统一
编译
不同
很大
中放
从中
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器管理 用户
数据库实验二数据更新
华润 雪花 软件开发
午夜服务器
网络安全公众号起名
网络技术基础看不懂
阿里巴巴网络技术有限公司子公司
网络安全执法检查图解
时代传媒网络技术有限公司
枣庄数据库安全审计系统
地理国情监测数据库建设技术
盛泉恒元软件开发
app软件开发的相关技术
网络安全与防火墙技术概念
数据库系统技术考试大纲
qq面对面快传会上传到服务器吗
济南最大的软件开发公司
俄国网络安全战略
服务器lpm
商云8数据库恢复备用数据
csgo服务器目前处于脱机状态
网络安全公众号起名
阿里学生服务器考试答案
计算机网络技术调剂信息
网络安全法经营许可证
网络安全法第一个
盛泉恒元软件开发
模拟器清楚app数据库
软件开发公司装修需要什么
商云8数据库恢复备用数据