千家信息网

SparkSQL简单使用

发表于:2025-01-26 作者:千家信息网编辑
千家信息网最后更新 2025年01月26日,==> 什么是 Spark SQL?---> Spark SQL 是 Spark 用来处理结构化数据的一个模块---> 作用:提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎-
千家信息网最后更新 2025年01月26日SparkSQL简单使用

==> 什么是 Spark SQL?

---> Spark SQL 是 Spark 用来处理结构化数据的一个模块

---> 作用:提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎

---> 运行原理:将 Spark SQL 转化为 RDD, 然后提交到集群执行

---> 特点:

---- 容易整合

---- 统一的数据访问方式

---- 兼容 Hive

---- 标准的数据连接

==> SparkSession

---> 特点:(2.0引用 SparkSession)

---- 为用户提供一个统一的切入点使用Spark 各项功能

---- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

---- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互

---- 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

==> DataFrames 组织成命名列的数据集,等同于数据库中的表

---> 与 RDD 相比较:

---- RDD 是分布式的 Java 对象 的集合

---- DataFrame 是分布式 Row 对象的集合

---> 创建 DataFrames

---- 通过 case class 创建 DataFrames

// 定义 case class (相当于表的结构)case class Emp(Empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int)   // 将 HDFS 上的数据读入 RDD, 并将 RDD 与 case class 关联val lines = sc.textFile("hdfs://bigdata0:9000/input/emp.csv").map(_.split(","))val emp = lines.map(x=> Emp(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt)) `// 将RDD 转换成 DataFramesval empDF = emp.toDF// 通过 DataFrames 查询数据empDF.show

---- 通过 SparkSession 创建 DataFrames

// 创建 StructType 来定义结构,注意,需要先导入模块import org.apache.spark.sql.types._val myschema = StructType(List(                StructField("empno", DataTypes.IntegerType),                 StructField("ename", DataTypes.StringType),                StructField("job", DataTypes.StringType),                StructField("mgr", DataTypes.StringType),                StructField("hiredate", DataTypes.StringType),                StructField("sal", DataTypes.IntegerType),                StructField("comm", DataTypes.StringType),                StructField("deptno", DataTypes.IntegerType)                ))                // 读入数据且切分数据val empcsvRDD = sc.textFile("hdfs://bigdata0:9000/input/emp.csv").map(_.split(","))// 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Rowimport org.apache.spark.sql.Rowval rowRDD = empcsvRDD.map(line=> Row(line(0).toInt, line(1), line(2), line(3),line(4), line(5).toInt, line(6), line(7).toInt)// 创建 DataFramesval df = spark.createDataFrame(rowRDD, myschema)// 查看表df.show

---- 使用 Json 文件来创建 DataFrame

val df = spark.read.json("Json 文件")// 查看数据df.show

---> DataFrame 操作 DataFrame 操作也称为无类型的 Dataset操作

---- 查询所有员工姓名

df.select("ename").show


---- 查询所有员工姓名和薪水,并给薪水加 100 元

df.select($"ename", $"sal", $"sal"+ 100).show


---- 查询工资大于2000的员工

df.select($"sal" > 2000).show


---- 求每个部门员工数

df.groupBy($"deptno").count.show

---- 在 DataFrame 中使用 SQL 语句 注: 需要首先将 DataFrame 注册成表(视图)

df.createOrReplaceTempView("emp")// 执行查询spark.sql("select * from emp").show


---> 临时视图(2种):

---- 只在当前会话中有效 df.createOrReplaceTempView("emp1")

---- 在全局有效 df.createGlobalTempView("emp2")



==> Datasets

---> 数据的分布式集合

--->特点:

---- Spark1.6中添加的新接口,是DataFrame之上更高一级的抽象

---- 提供了 RDD的优点(强类型化,使用 lambda函数的能力)

---- Spark SQL 优化后的执行引擎

---- 可以从 JVM 对象构造,然后使用函数转换(map, flatMap, filter等)去操作

---- 支持 Scala 和 Java,不支持 Python

---> 创建 DataSet

---- 使用序列

// 定义 case classcase class MyData(a:String, b:String)// 生成序列并创建 DataSetval ds = Seq(MyData(1, "Tom"), MyData(2, "Mary")).toDS// 查看结果ds.show


---- 使用 Json 数据

// 定义 case class case class Person(name:String, gender:String)//通过 Json 数据生成 DataFrameval df = spark.read.json(sc.parallelize("""{"gender":"Male", "name": "Tom"}""" ::Nil))// 将 DataFrame 转成 DataSetdf.as[Person].showdf.as[Person].collect


---- 通过使用 DHFS 执行 WordCount 程序

// 读取 HDFS 数据,并创建 DataSetval linesDS = spark.read.text("hdfs://bigdata0:9000/input/data.txt").as[String]// 对DataSet 进行操作:分词后, 查询长度大于3 的单词val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)// 查看结果words.showwords.collect// 执行wordcount 程序val result = linesDS.flatMap(_.split(" ").map((_.1)).groupByKey(x=> x._1).count)result.show// 排序result.orderBy($"value").show

==> Datasets 操作

---> joinWith 和 join 的区别是连接后的新 Dataset 的 schema 会不一样

// 使用 emp.json 生成 DataFrameval empDF = spark.read.json("/root/resources/emp.json")// 查询工资大于 3000 的员工empDF.where($"sal" > 3000).show// 创建 case classcase class Emp(empno:Lone, ename:String, job:String, hiredate:String, mgr:String, sal:Long, comm:String, deptno:Long)// 生成 DataSets,并查询数据val empDS = empDF.as[Emp]// 查询工资大于 3000 的员工empDS.filter(_.sal > 3000).show// 查看 10 号部门的员工empDS.filter(_.deptno == 10)// 多表查询// 1.创建部门表val deptRDD = sc.textFile("/test/dept.csv").map(_.split(","))case class Dept(deptno:Int, dname:String, loc:String)val deptDS = deptRDD.map(x=>Dept(x(0).toInt, x(1), x(2))).toDS// 2.创建员工表case class Emp(empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int)val empRDD = sc.textFile("/test/emp.csv").map(_.split(","))val empDS = empRDD.map(x=> Emp(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt))// 3.执行多表查询: 等值链接val result = deptDF.join(empDS, "deptno")// 另一种写法: 注意有三个等号val result = deptDS.joinWith(empDS, deptDS("deptno") === empDS("deptno"))// 查看执行计划result.explain





数据 查询 员工 分布式 对象 生成 工资 特点 用户 程序 结构 部门 有效 函数 姓名 序列 引擎 文件 模块 类型 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 城管局国家网络安全宣传周 ios 初始化数据库 数据库在线链接 重庆城市科技学院网络安全 景区售票系统软件开发 以下不是数据库技术的是 mc为什么进不去自己的服务器 南京java软件开发待遇 海淀区技术软件开发介绍 泰拉瑞亚服务器ip地址怎么联机 苹果手游天空之塔连接不到服务器 国家电网网络安全PPT 为什么女孩不适合做软件开发 服务器机房要不要排气扇 招标结束后可以换服务器型号吗 数据库技术的发展面临的挑战 网络安全为什么要保证 广州新华互联网科技学费 观网络安全问题视频心得 如何查看网线服务器是哪个国家的 如何显示数据库日志文件 ibatis查询数据库编码 一起考教师服务器即将维护完成 国家网络安全保护宣传月活动 如何添加数据库的面板 观看网络安全公益讲座观后感 福建网络技术服务采购 外文数据库一般用什么 厦门惠榕软件开发 matlab输入实验数据库
0