千家信息网

11.spark sql之RDD转换DataSet

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,简介  Spark SQL提供了两种方式用于将RDD转换为Dataset。使用反射机制推断RDD的数据结构  当spark应用可以推断RDD数据结构时,可使用这种方式。这种基于反射的方法可以使代码更简
千家信息网最后更新 2025年02月03日11.spark sql之RDD转换DataSet

简介

  Spark SQL提供了两种方式用于将RDD转换为Dataset。

  • 使用反射机制推断RDD的数据结构

  当spark应用可以推断RDD数据结构时,可使用这种方式。这种基于反射的方法可以使代码更简洁有效。

  • 通过编程接口构造一个数据结构,然后映射到RDD上

  当spark应用无法推断RDD数据结构时,可使用这种方式。

反射方式

  • scala
// For implicit conversions from RDDs to DataFramesimport spark.implicits._// Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF = spark.sparkContext  .textFile("examples/src/main/resources/people.txt")  .map(_.split(","))  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))  .toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager => "Name: " + teenager(0)).show()// +------------+// |       value|// +------------+// |Name: Justin|// +------------+// or by field nameteenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()// +------------+// |       value|// +------------+// |Name: Justin|// +------------+// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()// Array(Map("name" -> "Justin", "age" -> 19))
  • java
import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.Encoder;import org.apache.spark.sql.Encoders;// Create an RDD of Person objects from a text fileJavaRDD peopleRDD = spark.read()  .textFile("examples/src/main/resources/people.txt")  .javaRDD()  .map(line -> {    String[] parts = line.split(",");    Person person = new Person();    person.setName(parts[0]);    person.setAge(Integer.parseInt(parts[1].trim()));    return person;  });// Apply a schema to an RDD of JavaBeans to get a DataFrameDataset peopleDF = spark.createDataFrame(peopleRDD, Person.class);// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people");// SQL statements can be run by using the sql methods provided by sparkDataset teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");// The columns of a row in the result can be accessed by field indexEncoder stringEncoder = Encoders.STRING();Dataset teenagerNamesByIndexDF = teenagersDF.map(    (MapFunction) row -> "Name: " + row.getString(0),    stringEncoder);teenagerNamesByIndexDF.show();// +------------+// |       value|// +------------+// |Name: Justin|// +------------+// or by field nameDataset teenagerNamesByFieldDF = teenagersDF.map(    (MapFunction) row -> "Name: " + row.getAs("name"),    stringEncoder);teenagerNamesByFieldDF.show();// +------------+// |       value|// +------------+// |Name: Justin|// +------------+
  • python
from pyspark.sql import Rowsc = spark.sparkContext# Load a text file and convert each line to a Row.lines = sc.textFile("examples/src/main/resources/people.txt")parts = lines.map(lambda l: l.split(","))people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))# Infer the schema, and register the DataFrame as a table.schemaPeople = spark.createDataFrame(people)schemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")# The results of SQL queries are Dataframe objects.# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()for name in teenNames:    print(name)# Name: Justin

编程方式

  • scala
import org.apache.spark.sql.types._// Create an RDDval peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a stringval schemaString = "name age"// Generate the schema based on the string of schemaval fields = schemaString.split(" ")  .map(fieldName => StructField(fieldName, StringType, nullable = true))val schema = StructType(fields)// Convert records of the RDD (people) to Rowsval rowRDD = peopleRDD  .map(_.split(","))  .map(attributes => Row(attributes(0), attributes(1).trim))// Apply the schema to the RDDval peopleDF = spark.createDataFrame(rowRDD, schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL can be run over a temporary view created using DataFramesval results = spark.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes => "Name: " + attributes(0)).show()// +-------------+// |        value|// +-------------+// |Name: Michael|// |   Name: Andy|// | Name: Justin|// +-------------+
  • java
import java.util.ArrayList;import java.util.List;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;// Create an RDDJavaRDD peopleRDD = spark.sparkContext()  .textFile("examples/src/main/resources/people.txt", 1)  .toJavaRDD();// The schema is encoded in a stringString schemaString = "name age";// Generate the schema based on the string of schemaList fields = new ArrayList<>();for (String fieldName : schemaString.split(" ")) {  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);  fields.add(field);}StructType schema = DataTypes.createStructType(fields);// Convert records of the RDD (people) to RowsJavaRDD rowRDD = peopleRDD.map((Function) record -> {  String[] attributes = record.split(",");  return RowFactory.create(attributes[0], attributes[1].trim());});// Apply the schema to the RDDDataset peopleDataFrame = spark.createDataFrame(rowRDD, schema);// Creates a temporary view using the DataFramepeopleDataFrame.createOrReplaceTempView("people");// SQL can be run over a temporary view created using DataFramesDataset results = spark.sql("SELECT name FROM people");// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameDataset namesDS = results.map(    (MapFunction) row -> "Name: " + row.getString(0),    Encoders.STRING());namesDS.show();// +-------------+// |        value|// +-------------+// |Name: Michael|// |   Name: Andy|// | Name: Justin|// +-------------+
  • python
# Import data typesfrom pyspark.sql.types import *sc = spark.sparkContext# Load a text file and convert each line to a Row.lines = sc.textFile("examples/src/main/resources/people.txt")parts = lines.map(lambda l: l.split(","))# Each line is converted to a tuple.people = parts.map(lambda p: (p[0], p[1].strip()))# The schema is encoded in a string.schemaString = "name age"fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]schema = StructType(fields)# Apply the schema to the RDD.schemaPeople = spark.createDataFrame(people, schema)# Creates a temporary view using the DataFrameschemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.results = spark.sql("SELECT name FROM people")results.show()# +-------+# |   name|# +-------+# |Michael|# |   Andy|# | Justin|# +-------+

忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。

0