千家信息网

DataSet数据集在使用sql()时,无法使用map,flatMap等转换算子的解决办法

发表于:2025-01-27 作者:千家信息网编辑
千家信息网最后更新 2025年01月27日,摘要我们在使用spark的一个流程是:利用spark.sql()函数把数据读入到内存形成DataSet[Row](DataFrame)由于Row是新的spark数据集中无法实现自动的编码,需要对这个数
千家信息网最后更新 2025年01月27日DataSet数据集在使用sql()时,无法使用map,flatMap等转换算子的解决办法

摘要

我们在使用spark的一个流程是:利用spark.sql()函数把数据读入到内存形成DataSet[Row](DataFrame)由于Row是新的spark数据集中无法实现自动的编码,需要对这个数据集进行编码,才能利用这些算子进行相关的操作,如何编码是一个问题,在这里就把这几个问题进行总结一下。报的错误:error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

报这个错误一般就是我们在使用算子时其返回值的数据类型往往不是spark通过自身的反射能完成的自动编码部分,比如通过map算子,我们在map算子的函数的返回值类型是Map类型的,就会出现上面的问题,因为Map集合类不在:基本的类型和String,case class和元组的范围之内,spark内部不能通过反射完成自动编码。


出现这个问题的原因

spark2.0以后的版本采用的是新的分布式数据集DataSet,其中DataFrame是DataSet[Row]的别名形式。而新的数据集采用了很多的优化,其中一个就是利用了Tungsten execution engine的计算引擎,这个计算引擎采用了很多的优化。其中一个就是自己维护了一个内存管理器,从而使计算从java jvm解脱出来了,使得内存的优化得到了很大的提升。同时新的计算引擎,把数据存储在内存中是以二进制的形式存储的,大部分所有的计算都是在二进制数据流上进行的,不需要把二进制数据流反序列化成java对象,然后再把计算的结果序列化成二进制数据流,而是直接在二进制流上进行操作,这样的情况就需要我们存在一种机制就是java对象到二进制数据流的映射关系,不然我们不知道二进制流对应的数据对象是几个字节,spark这个过程是通过Encoders来完成的,spark自身通过反射完成了一部分的自动编码过程:基本的类型和String,case class和元组,对于其他的集合类型或者我们自定义的类,他是无法完成这样的编码的。需要我们自己定义这样的编码也就是让其拥有一个schema。

解决这个问题方式

方法一:

这样就是把其转化为RDD,利用RDD进行操作,但是不建议用这个,相对于RDD,DataSet进行了很多的底层优化,拥有很不错性能


val orderInfo1 = spark.sql(

"""

|SELECT

|o.id,

|o.user_id

|FROM default.api_order o

|limit 100

""".stripMargin).rdd.map(myfunction)



方法二:

让其自动把DataSet[Row]转化为DataSet[P],如果Row里面有复杂的类型出现的话。


case class Orders(id: String, user_id: String)

//这个case class要定义在我们的单例对象的外面

object a {

def main(args: Array[String]): Unit ={

import spark.implicits._

val orderInfo1 = spark.sql(

"""

|SELECT

|o.id,

|o.user_id

|FROM default.api_order o

|limit 100

""".stripMargin).as[Orders].map(myfunction)

}

}



方式三:

自定义一个schema,然后利用RowEncoder进行编码。这只是一个例子,里面的类型其实都可以通过spark的反射自动完成编码过程。


import spark.implicits._

val schema = StructType(StructType(Seq(StructField("id",StringType,true),StructField("user_id",StringType,true))))

val encoders = RowEncoder(schema)

val orderInfo1 = spark.sql(

"""

|SELECT

|o.id,

|o.user_id

|FROM default.api_order o

|limit 100

""".stripMargin).map(row => row)(encoders)


方法四:

直接利用scala的模式匹配的策略case Row来进行是可以通过的,原因是case Row()scala模式匹配的知识,这样可以知道集合Row里面拥有多少个基本的类型,则可以通过scala就可以完成对Row的自动编码,然后可以进行相应的处理。


import spark.implicits._

val orderInfo1 = spark.sql(

"""

|SELECT

|o.id,

|o.user_id

|FROM default.api_order o

|limit 100

""".stripMargin).map{case Row(id: String, user_id: String) => (id,user_id)}

这个得到的schema为:

orderInfo1: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]

如果换成这样:

val orderInfo1 = spark.sql(

"""

|SELECT

|o.id,

|o.user_id

|FROM default.api_order o

|limit 100

""".stripMargin).map{case Row(id: String, user_id: String) => List(id,user_id)}

得到的schema为:

orderInfo1: org.apache.spark.sql.Dataset[List[String]] = [value: array]

可以看出:spark是把元祖看成case class一种特殊形式拥有,schame的字段名称为_1,_2这样的特殊case clase



0