SparkSQL如何运用
发表于:2024-10-21 作者:千家信息网编辑
千家信息网最后更新 2024年10月21日,今天小编给大家分享一下SparkSQL如何运用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解
千家信息网最后更新 2024年10月21日SparkSQL如何运用
今天小编给大家分享一下SparkSQL如何运用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
一:SparkSQL
1.SparkSQL简介
Spark SQL是Spark的一个模块,用于处理结构化的数据,它提供了一个数据抽象DataFrame(最核心的编程抽象就是DataFrame),并且SparkSQL作为分布式SQL查询引擎。
Spark SQL就是将SQL转换成一个任务,提交到集群上运行,类似于Hive的执行方式。
2.SparkSQL运行原理
将Spark SQL转化为RDD,然后提交到集群执行。
3.SparkSQL特点
(1)容易整合,Spark SQL已经集成在Spark中
(2)提供了统一的数据访问方式:JSON、CSV、JDBC、Parquet等都是使用统一的方式进行访问
(3)兼容 Hive
(4)标准的数据连接:JDBC、ODBC
二、SparkSQL运用
package sqlimport org.apache.avro.ipc.specific.Personimport org.apache.sparkimport org.apache.spark.rdd.RDDimport org.apache.spark.sqlimport org.apache.spark.sql.catalyst.InternalRowimport org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import org.junit.Testclass Intro { @Test def dsIntro(): Unit ={ val spark: SparkSession = new sql.SparkSession.Builder() .appName("ds intro") .master("local[6]") .getOrCreate() //导入隐算是shi转换 import spark.implicits._ val sourceRDD: RDD[Person] =spark.sparkContext.parallelize(Seq(Person("张三",10),Person("李四",15))) val personDS: Dataset[Person] =sourceRDD.toDS();//personDS.printSchema()打印出错信息 val resultDS: Dataset[Person] =personDS.where('age>10) .select('name,'age) .as[Person] resultDS.show() } @Test def dfIntro(): Unit ={ val spark: SparkSession =new SparkSession.Builder() .appName("ds intro") .master("local") .getOrCreate() import spark.implicits._ val sourceRDD: RDD[Person] = spark.sparkContext.parallelize(Seq(Person("张三",10),Person("李四",15))) val df: DataFrame = sourceRDD.toDF()//隐shi转换 df.createOrReplaceTempView("person")//创建表 val resultDF: DataFrame =spark.sql("select name from person where age>=10 and age<=20") resultDF.show() } @Test def database1(): Unit ={ //1.创建sparkSession val spark: SparkSession =new SparkSession.Builder() .appName("database1") .master("local[6]") .getOrCreate() //2.导入引入shi子转换 import spark.implicits._ //3.演示 val sourceRDD: RDD[Person] =spark.sparkContext.parallelize(Seq(Person("张三",10),Person("李四",15))) val dataset: Dataset[Person] =sourceRDD.toDS() //Dataset 支持强类型的API dataset.filter(item => item.age >10).show() //Dataset 支持若弱类型的API dataset.filter('age>10).show() //Dataset 可以直接编写SQL表达式 dataset.filter("age>10").show() } @Test def database2(): Unit ={ val spark: SparkSession = new SparkSession.Builder() .master("local[6]") .appName("database2") .getOrCreate() import spark.implicits._ val dataset: Dataset[Person] =spark.createDataset(Seq(Person("张三",10),Person("李四",20))) //无论Dataset中放置的是什么类型的对象,最终执行计划中的RDD上都是internalRow //直接获取到已经分析和解析过得Dataset的执行计划,从中拿到RDD val executionRdd: RDD[InternalRow] =dataset.queryExecution.toRdd //通过将Dataset底层的RDD通过Decoder转成了和Dataset一样的类型RDD val typedRdd:RDD[Person] = dataset.rdd println(executionRdd.toDebugString) println() println() println(typedRdd.toDebugString) } @Test def database3(): Unit = { //1.创建sparkSession val spark: SparkSession = new SparkSession.Builder() .appName("database1") .master("local[6]") .getOrCreate() //2.导入引入shi子转换 import spark.implicits._ val dataFrame: DataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF() //3.看看DataFrame可以玩出什么花样 //select name from... dataFrame.where('age > 10) .select('name) .show() }// @Test// def database4(): Unit = {// //1.创建sparkSession// val spark: SparkSession = new SparkSession.Builder()// .appName("database1")// .master("local[6]")// .getOrCreate()// //2.导入引入shi子转换// import spark.implicits._// val personList=Seq(Person("zhangsan",15),Person("lisi",20))//// //1.toDF// val df1: DataFrame =personList.toDF()// val df2: DataFrame =spark.sparkContext.parallelize(personList).toDF()// //2.createDataFrame// val df3: DataFrame =spark.createDataFrame(personList)//// //3.read// val df4: DataFrame =spark.read.csv("")// df4.show()// } //toDF()是转成DataFrame,toDs是转成Dataset // DataFrame就是Dataset[Row] 代表弱类型的操作,Dataset代表强类型的操作,中的类型永远是row,DataFrame可以做到运行时类型安全,Dataset可以做到 编译时和运行时都安全@Testdef database4(): Unit = { //1.创建sparkSession val spark: SparkSession = new SparkSession.Builder() .appName("database1") .master("local[6]") .getOrCreate() //2.导入引入shi子转换 import spark.implicits._ val personList=Seq(Person("zhangsan",15),Person("lisi",20)) //DataFrame代表弱类型操作是编译时不安全 val df: DataFrame =personList.toDF() //Dataset是强类型的 val ds: Dataset[Person] =personList.toDS() ds.map((person:Person) =>Person(person.name,person.age))} @Test def row(): Unit ={ //1.Row如何创建,它是什么 //row对象必须配合Schema对象才会有列名 val p: Person =Person("zhangsan",15) val row: Row =Row("zhangsan",15) //2.如何从row中获取数据 row.getString(0) row.getInt(1) //3.Row也是样例类、 row match { case Row(name,age) => println(name,age) } }}case class Person(name: String, age: Int)
以上就是"SparkSQL如何运用"这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注行业资讯频道。
类型
数据
就是
知识
篇文章
运行
代表
对象
方式
张三
李四
安全
内容
集群
统一
编译
不同
很大
中放
从中
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库中如何查询出数字15位
网络安全测评资质被取消
网络技术行业有哪些
悉尼 墨尔本 软件开发
网络安全三个特性
成都高防御服务器
网络安全员能从事什么工作
免费服务器地址用户名和密码
生猪数据库
数据库如何评价
网络安全班会主持稿防骗
软件开发系统的bug定义
山东济南尚佳互联网科技
长春软件开发项目承接
c ie代理服务器
株洲it软件开发工程师技校
如何设计一个管理员的数据库
基于soa应用软件开发
首届网络安全宣传周在哪
社交软件开发在哪
数据库损坏游戏
计算机网络技术ATM
服务器安全模式能进
课件站下载网络安全教育课件
软件开发课程设计主要学什么
软件开发文档编写的重要性及特点
发动机应用层软件开发
威科先行 财务数据库
网络安全和信息化产业图谱
ip地址数据库 sql