千家信息网

Spark 调优之RDD持久化级别及kryo序列化性能测试

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,我们上篇文章中讲了,RDD的持久化是spark优化中必须掌握的,并且,在内存不足的情况下,我们可以将持久化类型选择为MEMORY_ONLY_SER,减少内存的占用,持久化更多的partition,并且
千家信息网最后更新 2025年01月23日Spark 调优之RDD持久化级别及kryo序列化性能测试

我们上篇文章中讲了,RDD的持久化是spark优化中必须掌握的,并且,在内存不足的情况下,我们可以将持久化类型选择为MEMORY_ONLY_SER,减少内存的占用,持久化更多的partition,并且不同的序列化方法也会影响序列化性能。
下面,我们就来测试下,持久化级别和序列化方法的选择对RDD持久化大小的影响。
我选择了一个170.9MB的日志文件,传到了百度网盘
提取码:ffae
测试环境是windows,
IDEA参数配置

MEMORY_ONLY

代码为

case class CleanedLog(cdn:String,region:String,level:String,date:String,ip:String, domain:String, url:String, traffic:String)  object KyroTest {    def main(args: Array[String]) {      val inputPath=new Path(args(0))      val outputPath=new Path(args(1))      val fsConf=new Configuration()      val fs= FileSystem.get(fsConf)      if (fs.exists(outputPath)) {        fs.delete(outputPath,true)        val path=args(1).toString        println(s"已删除已存在的路径$path")      }      val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")      //conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")      //conf.set("spark.kryo.registrationRequired", "true")      val sc = new SparkContext(conf)      val logs = sc.textFile(args(0))      //logs.filter(_.split("\t").length==8).take(10).foreach(println(_))      val logsCache=logsCahe(logs)      //序列化的方式将rdd存到内存      saveAtLocal(logsCache,args(1))      Thread.sleep(100000)    }    def logsCahe(logs:RDD[String]): RDD[CleanedLog] ={      logs.filter(_.split("\t").length==8).map(x=>{        val fields=x.split("\t")        CleanedLog(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6),fields(7))      }).cache()    }    def saveAtLocal(logsCache: RDD[CleanedLog], outputPath: String) = {      logsCache.map(x=>{        x.cdn+"\t"+x.region+"\t"+x.level+"\t"+x.date+"\t"+x.ip+"\t"+x.domain+"\t"+x.url+"\t"+x.traffic      }).repartition(1).saveAsTextFile(outputPath)    }  }

代码逻辑就是输入是什么内容,输就是什么内容,在中间我将输入的文本RDD进行了memory_only持久化,我们就看这个持久化内存占多少


显然,input size大小是170.9 MB,但是持久化之后是908.5 MB,显然占据内存空间增大了好几倍,如果在生产上,内存资源不足的情况下,这种方式显然缓存不了不少partition
时间耗费14s

MEMORY_ONLY_SER 未使用kryo序列化

def logsCahe(logs:RDD[String]): RDD[CleanedLog] ={      logs.filter(_.split("\t").length==8).map(x=>{        val fields=x.split("\t")        CleanedLog(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6),fields(7))      }).persist(StorageLevel.MEMORY_ONLY_SER)

代码仅更改了persist(StorageLevel.MEMORY_ONLY_SER)


显然,input size大小是170.9 MB,但是持久化之后是有204.9MB,所以序列化对于节约内存空间是很有帮助的。

时间耗费11s

MEMORY_ONLY_SER 使用kryo序列化未注册

 val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")      conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

与上一代码相比,为SparkConf设置了开启kryo序列化,不是默认的java序列化了,但是没有进行具体的类注册!


显然,input size大小是170.9 MB,但是持久化之后是有230.8MB,使用未注册的kryo序列化竟然比使用java序列化还臃肿!原因是:每一个对象实例的序列化结果都会包含一份完整的类名,造成了大量的空间浪费!

时间是9s,比java序列化快了一些。

MEMORY_ONLY_SER 使用kryo序列化并注册

val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")      conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")      conf.registerKryoClasses(Array(classOf[CleanedLog], classOf[String]))

添加了String类和自定义样例类的kryo注册


显然,input size大小是170.9 MB,使用注册的kryo序列化之后,只有175.7MB,时间也才9秒,很舒服!

所以在目前为止,使用kryo序列化并注册是性能最好得了!!!

如果CPU还是那么悠闲的话,我们还有另外一个进一步优化点!

注册kryo序列化并开启RDD压缩

注意:RDD压缩只能存在于序列化的情况下

val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")      conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")      conf.registerKryoClasses(Array(classOf[CleanedLog], classOf[String]))      conf.set("spark.rdd.compress","true")



持久化的大小仅有45.6MB!!!

spark.rdd.compress

这个参数决定了RDD Cache的过程中,RDD数据在序列化之后是否进一步进行压缩再储存到内存或磁盘上。当然是为了进一步减小Cache数据的尺寸,对于Cache在磁盘上而言,绝对大小大概没有太大关系,主要是考虑Disk的IO带宽。而对于Cache在内存中,那主要就是考虑尺寸的影响,是否能够Cache更多的数据,是否能减小Cache数据对GC造成的压力等。

这两者,前者通常不会是主要问题,尤其是在RDD Cache本身的目的就是追求速度,减少重算步骤,用IO换CPU的情况下。而后者,GC问题当然是需要考量的,数据量小,占用空间少,GC的问题大概会减轻,但是是否真的需要走到RDD Cache压缩这一步,或许用其它方式来解决可能更加有效。

所以这个值默认是关闭的,但是如果在磁盘IO的确成为问题或者GC问题真的没有其它更好的解决办法的时候,可以考虑启用RDD压缩。

对比表格

类型输入大小持久化大小时间
MEMORY_ONLY170.9 MB908.5 MB14s
MEMORY_ONLY_SER170.9 MB204.9MB11s
kyro序列化未注册170.9 MB230.8MB9s
kyro序列化注册170.9 MB175.7MB9s
注册kryo序列化并开启RDD压缩170.9 MB45.6MB9s
0