10.spark sql之快速入门
前世今生
Hive&Shark
随着大数据时代的来临,Hadoop风靡一时。为了使熟悉RDBMS但又不理解MapReduce的技术人员快速进行大数据开发,Hive应运而生。Hive是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。
但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率。为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:
- MapR的Drill
- Cloudera的Impala
- Shark
Shark是伯克利实验室Spark生态的组件之一,它修改了Hive Driver的内存管理、物理计划、执行三个模块,使之能运行在Spark引擎上,从而使得SQL查询的速度得到10-100倍的提升。
Shark&Spark SQL
Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark的One Stack Rule Them All的既定方针,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。
SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码。由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大地提升。
- 数据兼容方面
不但兼容Hive,还可以从RDD、parquet文件、JSON文件中获取数据,也支持获取RDBMS数据以及cassandra等NOSQL数据。
- 性能优化方面
除了采取In-Memory Columnar Storage、byte-code generation等优化技术外,引进Cost Model对查询进行动态评估、获取最佳物理计划等。
- 组件扩展方面
无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。
2014年Shark停止开发,团队将所有资源放SparkSQL项目上,至此,Shark的发展画上了句号,但也因此发展出两条线:SparkSQL和Hive on Spark。
其中SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。
简介
Spark SQL是一个用于结构化数据处理的模块。Spark SQL赋予待处理数据一些结构化信息,可以使用SQL语句或DataSet API接口与Spark SQL进行交互。
- SQL
Spark SQL可以使用sql读写Hive中的数据;也可以在编程语言中使用sql,返回Dataset/DataFrame结果集。
- DataSets&DataFrames
Dataset是一个分布式数据集,它结合了RDD与SparkSQL执行引擎的优点。Dataset可以通过JVM对象构造,然后使用算子操作进行处理。Java和Scala都有Dataset API;Python和R本身支持Dataset特性。
DataFrame是一个二维结构的DataSet,相当于RDBMS中的表。DataFrame可以有多种方式构造,比如结构化数据文件、hive表、外部数据库、RDD等。在Scala、Java、Python及R中都有DataFrame API。
DataFrame与DataSet
DataFrame创建及操作
- scala
import org.apache.spark.sql.SparkSession// 构造SparkSessionval spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate()// 创建DataFrameval df = spark.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+// DataFrame操作// This import is needed to use the $-notationimport spark.implicits._// Print the schema in a tree formatdf.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show()// +-------+// | name|// +-------+// |Michael|// | Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select($"name", $"age" + 1).show()// +-------+---------+// | name|(age + 1)|// +-------+---------+// |Michael| null|// | Andy| 31|// | Justin| 20|// +-------+---------+// Select people older than 21df.filter($"age" > 21).show()// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show()// +----+-----+// | age|count|// +----+-----+// | 19| 1|// |null| 1|// | 30| 1|// +----+-----+
- java
import org.apache.spark.sql.SparkSession;//构造SparkSessionSparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate();//创建DataFrameimport org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;Dataset df = spark.read().json("examples/src/main/resources/people.json");// Displays the content of the DataFrame to stdoutdf.show();// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+//DataFrame操作// col("...") is preferable to df.col("...")import static org.apache.spark.sql.functions.col;// Print the schema in a tree formatdf.printSchema();// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show();// +-------+// | name|// +-------+// |Michael|// | Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select(col("name"), col("age").plus(1)).show();// +-------+---------+// | name|(age + 1)|// +-------+---------+// |Michael| null|// | Andy| 31|// | Justin| 20|// +-------+---------+// Select people older than 21df.filter(col("age").gt(21)).show();// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show();// +----+-----+// | age|count|// +----+-----+// | 19| 1|// |null| 1|// | 30| 1|// +----+-----+
- python
from pyspark.sql import SparkSession# 构造SparkSessionspark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()# 创建DataFrame# spark is an existing SparkSessiondf = spark.read.json("examples/src/main/resources/people.json")# Displays the content of the DataFrame to stdoutdf.show()# +----+-------+# | age| name|# +----+-------+# |null|Michael|# | 30| Andy|# | 19| Justin|# +----+-------+# DataFrame操作# spark, df are from the previous example# Print the schema in a tree formatdf.printSchema()# root# |-- age: long (nullable = true)# |-- name: string (nullable = true)# Select only the "name" columndf.select("name").show()# +-------+# | name|# +-------+# |Michael|# | Andy|# | Justin|# +-------+# Select everybody, but increment the age by 1df.select(df['name'], df['age'] + 1).show()# +-------+---------+# | name|(age + 1)|# +-------+---------+# |Michael| null|# | Andy| 31|# | Justin| 20|# +-------+---------+# Select people older than 21df.filter(df['age'] > 21).show()# +---+----+# |age|name|# +---+----+# | 30|Andy|# +---+----+# Count people by agedf.groupBy("age").count().show()# +----+-----+# | age|count|# +----+-----+# | 19| 1|# |null| 1|# | 30| 1|# +----+-----+
DataSet创建及操作
Datasets和RDD类似,但使用专门的Encoder编码器来序列化需要经过网络传输的数据对象,而不用RDD使用的Java序列化或Kryo库。Encoder编码器是动态生成的代码,允许直接执行各种算子操作,而不用反序列化。
- scala
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interfacecase class Person(name: String, age: Long)// Encoders are created for case classesval caseClassDS = Seq(Person("Andy", 32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._val primitiveDS = Seq(1, 2, 3).toDS()primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by nameval path = "examples/src/main/resources/people.json"val peopleDS = spark.read.json(path).as[Person]peopleDS.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
- java
import java.util.Arrays;import java.util.Collections;import java.io.Serializable;import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.Encoder;import org.apache.spark.sql.Encoders;public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; }}// Create an instance of a Bean classPerson person = new Person();person.setName("Andy");person.setAge(32);// Encoders are created for Java beansEncoder personEncoder = Encoders.bean(Person.class);Dataset javaBeanDS = spark.createDataset( Collections.singletonList(person), personEncoder);javaBeanDS.show();// +---+----+// |age|name|// +---+----+// | 32|Andy|// +---+----+// Encoders for most common types are provided in class EncodersEncoder integerEncoder = Encoders.INT();Dataset primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);Dataset transformedDS = primitiveDS.map( (MapFunction) value -> value + 1, integerEncoder);transformedDS.collect(); // Returns [2, 3, 4]// DataFrames can be converted to a Dataset by providing a class. Mapping based on nameString path = "examples/src/main/resources/people.json";Dataset peopleDS = spark.read().json(path).as(personEncoder);peopleDS.show();// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
SQL操作
- scala
// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")//df.createGlobalTempView("people")val sqlDF = spark.sql("SELECT * FROM people")sqlDF.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
- java
import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people");//df.createGlobalTempView("people")Dataset sqlDF = spark.sql("SELECT * FROM people");sqlDF.show();// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+
- python
# Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")# df.createGlobalTempView("people")sqlDF = spark.sql("SELECT * FROM people")sqlDF.show()# +----+-------+# | age| name|# +----+-------+# |null|Michael|# | 30| Andy|# | 19| Justin|# +----+-------+
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。