千家信息网

10.spark sql之快速入门

发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,前世今生Hive&Shark  随着大数据时代的来临,Hadoop风靡一时。为了使熟悉RDBMS但又不理解MapReduce的技术人员快速进行大数据开发,Hive应运而生。Hive是当时唯一运行在Ha
千家信息网最后更新 2024年11月26日10.spark sql之快速入门

前世今生

Hive&Shark

  随着大数据时代的来临,Hadoop风靡一时。为了使熟悉RDBMS但又不理解MapReduce的技术人员快速进行大数据开发,Hive应运而生。Hive是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。

  但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率。为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:

  • MapR的Drill
  • Cloudera的Impala
  • Shark

  Shark是伯克利实验室Spark生态的组件之一,它修改了Hive Driver的内存管理、物理计划、执行三个模块,使之能运行在Spark引擎上,从而使得SQL查询的速度得到10-100倍的提升。

Shark&Spark SQL

  Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark的One Stack Rule Them All的既定方针,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。

  SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码。由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大地提升。

  • 数据兼容方面

  不但兼容Hive,还可以从RDD、parquet文件、JSON文件中获取数据,也支持获取RDBMS数据以及cassandra等NOSQL数据。

  • 性能优化方面

  除了采取In-Memory Columnar Storage、byte-code generation等优化技术外,引进Cost Model对查询进行动态评估、获取最佳物理计划等。

  • 组件扩展方面

  无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。

  2014年Shark停止开发,团队将所有资源放SparkSQL项目上,至此,Shark的发展画上了句号,但也因此发展出两条线:SparkSQL和Hive on Spark。

  其中SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。

简介

  Spark SQL是一个用于结构化数据处理的模块。Spark SQL赋予待处理数据一些结构化信息,可以使用SQL语句或DataSet API接口与Spark SQL进行交互。

  • SQL

  Spark SQL可以使用sql读写Hive中的数据;也可以在编程语言中使用sql,返回Dataset/DataFrame结果集。

  • DataSets&DataFrames

  Dataset是一个分布式数据集,它结合了RDD与SparkSQL执行引擎的优点。Dataset可以通过JVM对象构造,然后使用算子操作进行处理。Java和Scala都有Dataset API;Python和R本身支持Dataset特性。

  DataFrame是一个二维结构的DataSet,相当于RDBMS中的表。DataFrame可以有多种方式构造,比如结构化数据文件、hive表、外部数据库、RDD等。在Scala、Java、Python及R中都有DataFrame API。

DataFrame与DataSet

DataFrame创建及操作

  • scala
import org.apache.spark.sql.SparkSession// 构造SparkSessionval spark = SparkSession  .builder()  .appName("Spark SQL basic example")  .config("spark.some.config.option", "some-value")  .getOrCreate()// 创建DataFrameval df = spark.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show()// +----+-------+// | age|   name|// +----+-------+// |null|Michael|// |  30|   Andy|// |  19| Justin|// +----+-------+// DataFrame操作// This import is needed to use the $-notationimport spark.implicits._// Print the schema in a tree formatdf.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show()// +-------+// |   name|// +-------+// |Michael|// |   Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select($"name", $"age" + 1).show()// +-------+---------+// |   name|(age + 1)|// +-------+---------+// |Michael|     null|// |   Andy|       31|// | Justin|       20|// +-------+---------+// Select people older than 21df.filter($"age" > 21).show()// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show()// +----+-----+// | age|count|// +----+-----+// |  19|    1|// |null|    1|// |  30|    1|// +----+-----+
  • java
import org.apache.spark.sql.SparkSession;//构造SparkSessionSparkSession spark = SparkSession  .builder()  .appName("Java Spark SQL basic example")  .config("spark.some.config.option", "some-value")  .getOrCreate();//创建DataFrameimport org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;Dataset df = spark.read().json("examples/src/main/resources/people.json");// Displays the content of the DataFrame to stdoutdf.show();// +----+-------+// | age|   name|// +----+-------+// |null|Michael|// |  30|   Andy|// |  19| Justin|// +----+-------+//DataFrame操作// col("...") is preferable to df.col("...")import static org.apache.spark.sql.functions.col;// Print the schema in a tree formatdf.printSchema();// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// Select only the "name" columndf.select("name").show();// +-------+// |   name|// +-------+// |Michael|// |   Andy|// | Justin|// +-------+// Select everybody, but increment the age by 1df.select(col("name"), col("age").plus(1)).show();// +-------+---------+// |   name|(age + 1)|// +-------+---------+// |Michael|     null|// |   Andy|       31|// | Justin|       20|// +-------+---------+// Select people older than 21df.filter(col("age").gt(21)).show();// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// Count people by agedf.groupBy("age").count().show();// +----+-----+// | age|count|// +----+-----+// |  19|    1|// |null|    1|// |  30|    1|// +----+-----+
  • python
from pyspark.sql import SparkSession# 构造SparkSessionspark = SparkSession \    .builder \    .appName("Python Spark SQL basic example") \    .config("spark.some.config.option", "some-value") \    .getOrCreate()# 创建DataFrame# spark is an existing SparkSessiondf = spark.read.json("examples/src/main/resources/people.json")# Displays the content of the DataFrame to stdoutdf.show()# +----+-------+# | age|   name|# +----+-------+# |null|Michael|# |  30|   Andy|# |  19| Justin|# +----+-------+# DataFrame操作# spark, df are from the previous example# Print the schema in a tree formatdf.printSchema()# root# |-- age: long (nullable = true)# |-- name: string (nullable = true)# Select only the "name" columndf.select("name").show()# +-------+# |   name|# +-------+# |Michael|# |   Andy|# | Justin|# +-------+# Select everybody, but increment the age by 1df.select(df['name'], df['age'] + 1).show()# +-------+---------+# |   name|(age + 1)|# +-------+---------+# |Michael|     null|# |   Andy|       31|# | Justin|       20|# +-------+---------+# Select people older than 21df.filter(df['age'] > 21).show()# +---+----+# |age|name|# +---+----+# | 30|Andy|# +---+----+# Count people by agedf.groupBy("age").count().show()# +----+-----+# | age|count|# +----+-----+# |  19|    1|# |null|    1|# |  30|    1|# +----+-----+

DataSet创建及操作

  Datasets和RDD类似,但使用专门的Encoder编码器来序列化需要经过网络传输的数据对象,而不用RDD使用的Java序列化或Kryo库。Encoder编码器是动态生成的代码,允许直接执行各种算子操作,而不用反序列化。

  • scala
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interfacecase class Person(name: String, age: Long)// Encoders are created for case classesval caseClassDS = Seq(Person("Andy", 32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._val primitiveDS = Seq(1, 2, 3).toDS()primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by nameval path = "examples/src/main/resources/people.json"val peopleDS = spark.read.json(path).as[Person]peopleDS.show()// +----+-------+// | age|   name|// +----+-------+// |null|Michael|// |  30|   Andy|// |  19| Justin|// +----+-------+
  • java
import java.util.Arrays;import java.util.Collections;import java.io.Serializable;import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.Encoder;import org.apache.spark.sql.Encoders;public static class Person implements Serializable {  private String name;  private int age;  public String getName() {    return name;  }  public void setName(String name) {    this.name = name;  }  public int getAge() {    return age;  }  public void setAge(int age) {    this.age = age;  }}// Create an instance of a Bean classPerson person = new Person();person.setName("Andy");person.setAge(32);// Encoders are created for Java beansEncoder personEncoder = Encoders.bean(Person.class);Dataset javaBeanDS = spark.createDataset(  Collections.singletonList(person),  personEncoder);javaBeanDS.show();// +---+----+// |age|name|// +---+----+// | 32|Andy|// +---+----+// Encoders for most common types are provided in class EncodersEncoder integerEncoder = Encoders.INT();Dataset primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);Dataset transformedDS = primitiveDS.map(    (MapFunction) value -> value + 1,    integerEncoder);transformedDS.collect(); // Returns [2, 3, 4]// DataFrames can be converted to a Dataset by providing a class. Mapping based on nameString path = "examples/src/main/resources/people.json";Dataset peopleDS = spark.read().json(path).as(personEncoder);peopleDS.show();// +----+-------+// | age|   name|// +----+-------+// |null|Michael|// |  30|   Andy|// |  19| Justin|// +----+-------+

SQL操作

  • scala
// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")//df.createGlobalTempView("people")val sqlDF = spark.sql("SELECT * FROM people")sqlDF.show()// +----+-------+// | age|   name|// +----+-------+// |null|Michael|// |  30|   Andy|// |  19| Justin|// +----+-------+
  • java
import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people");//df.createGlobalTempView("people")Dataset sqlDF = spark.sql("SELECT * FROM people");sqlDF.show();// +----+-------+// | age|   name|// +----+-------+// |null|Michael|// |  30|   Andy|// |  19| Justin|// +----+-------+
  • python
# Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView("people")# df.createGlobalTempView("people")sqlDF = spark.sql("SELECT * FROM people")sqlDF.show()# +----+-------+# | age|   name|# +----+-------+# |null|Michael|# |  30|   Andy|# |  19| Justin|# +----+-------+

忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。

0