11.spark sql之RDD转换DataSet
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,简介 Spark SQL提供了两种方式用于将RDD转换为Dataset。使用反射机制推断RDD的数据结构 当spark应用可以推断RDD数据结构时,可使用这种方式。这种基于反射的方法可以使代码更简
千家信息网最后更新 2025年02月03日11.spark sql之RDD转换DataSet
简介
Spark SQL提供了两种方式用于将RDD转换为Dataset。
- 使用反射机制推断RDD的数据结构
当spark应用可以推断RDD数据结构时,可使用这种方式。这种基于反射的方法可以使代码更简洁有效。
- 通过编程接口构造一个数据结构,然后映射到RDD上
当spark应用无法推断RDD数据结构时,可使用这种方式。
反射方式
- scala
// For implicit conversions from RDDs to DataFramesimport spark.implicits._// Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager => "Name: " + teenager(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+// or by field nameteenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()// Array(Map("name" -> "Justin", "age" -> 19))
- java
import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.Encoder;import org.apache.spark.sql.Encoders;// Create an RDD of Person objects from a text fileJavaRDD peopleRDD = spark.read() .textFile("examples/src/main/resources/people.txt") .javaRDD() .map(line -> { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; });// Apply a schema to an RDD of JavaBeans to get a DataFrameDataset peopleDF = spark.createDataFrame(peopleRDD, Person.class);// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people");// SQL statements can be run by using the sql methods provided by sparkDataset teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");// The columns of a row in the result can be accessed by field indexEncoder stringEncoder = Encoders.STRING();Dataset teenagerNamesByIndexDF = teenagersDF.map( (MapFunction) row -> "Name: " + row.getString(0), stringEncoder);teenagerNamesByIndexDF.show();// +------------+// | value|// +------------+// |Name: Justin|// +------------+// or by field nameDataset teenagerNamesByFieldDF = teenagersDF.map( (MapFunction) row -> "Name: " + row.getAs("name"), stringEncoder);teenagerNamesByFieldDF.show();// +------------+// | value|// +------------+// |Name: Justin|// +------------+
- python
from pyspark.sql import Rowsc = spark.sparkContext# Load a text file and convert each line to a Row.lines = sc.textFile("examples/src/main/resources/people.txt")parts = lines.map(lambda l: l.split(","))people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))# Infer the schema, and register the DataFrame as a table.schemaPeople = spark.createDataFrame(people)schemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")# The results of SQL queries are Dataframe objects.# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()for name in teenNames: print(name)# Name: Justin
编程方式
- scala
import org.apache.spark.sql.types._// Create an RDDval peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a stringval schemaString = "name age"// Generate the schema based on the string of schemaval fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true))val schema = StructType(fields)// Convert records of the RDD (people) to Rowsval rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim))// Apply the schema to the RDDval peopleDF = spark.createDataFrame(rowRDD, schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL can be run over a temporary view created using DataFramesval results = spark.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes => "Name: " + attributes(0)).show()// +-------------+// | value|// +-------------+// |Name: Michael|// | Name: Andy|// | Name: Justin|// +-------------+
- java
import java.util.ArrayList;import java.util.List;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;// Create an RDDJavaRDD peopleRDD = spark.sparkContext() .textFile("examples/src/main/resources/people.txt", 1) .toJavaRDD();// The schema is encoded in a stringString schemaString = "name age";// Generate the schema based on the string of schemaList fields = new ArrayList<>();for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field);}StructType schema = DataTypes.createStructType(fields);// Convert records of the RDD (people) to RowsJavaRDD rowRDD = peopleRDD.map((Function) record -> { String[] attributes = record.split(","); return RowFactory.create(attributes[0], attributes[1].trim());});// Apply the schema to the RDDDataset peopleDataFrame = spark.createDataFrame(rowRDD, schema);// Creates a temporary view using the DataFramepeopleDataFrame.createOrReplaceTempView("people");// SQL can be run over a temporary view created using DataFramesDataset results = spark.sql("SELECT name FROM people");// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameDataset namesDS = results.map( (MapFunction) row -> "Name: " + row.getString(0), Encoders.STRING());namesDS.show();// +-------------+// | value|// +-------------+// |Name: Michael|// | Name: Andy|// | Name: Justin|// +-------------+
- python
# Import data typesfrom pyspark.sql.types import *sc = spark.sparkContext# Load a text file and convert each line to a Row.lines = sc.textFile("examples/src/main/resources/people.txt")parts = lines.map(lambda l: l.split(","))# Each line is converted to a tuple.people = parts.map(lambda p: (p[0], p[1].strip()))# The schema is encoded in a string.schemaString = "name age"fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]schema = StructType(fields)# Apply the schema to the RDD.schemaPeople = spark.createDataFrame(people, schema)# Creates a temporary view using the DataFrameschemaPeople.createOrReplaceTempView("people")# SQL can be run over DataFrames that have been registered as a table.results = spark.sql("SELECT name FROM people")results.show()# +-------+# | name|# +-------+# |Michael|# | Andy|# | Justin|# +-------+
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。
数据
方式
数据结构
结构
反射
推断
编程
技术
应用
有效
简洁
代码
公众
内容
可以使
接口
方法
更多
机制
简介
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
java向数据库上传照片
公安网络安全保密责任书
网络安全咨询师招聘
网络安全监测装置DPE
局域网服务器如何在异地链接
邮件服务器的安全
北京网动网络技术有限公司
互联网科技手段有哪些
北京好的软件开发要多少钱
开速软件开发有限公司招聘
数据库的维度概念
谈谈如何看待和对待网络技术
网络技术的网站论坛
记者在数据库找新闻线索案例
服务器不能正常上网
商丘联想服务器售后服务地址
奋鸟上海网络技术有限公司
服务器tcp主动终止
网络安全基本功训练
上海推荐的软件开发规格尺寸
金山区优势软件开发定制价格
互联网营销伍云洞科技
本服务器设在美国
网络安全教务
怎么样管理网络技术团队
游戏服务器结构
泰安ios软件开发解决方案
攀登阅读软件开发
深圳青元网络技术有限公司
河南云信海网络技术有限公司点评