千家信息网

生产常用Spark累加器剖析之四

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,生产常用Spark累加器剖析之四现象描述val acc = sc.accumulator(0, "Error Accumulator")val data = sc.parallelize(1 to 1
千家信息网最后更新 2025年01月22日生产常用Spark累加器剖析之四

生产常用Spark累加器剖析之四

现象描述

val acc = sc.accumulator(0, "Error Accumulator")val data = sc.parallelize(1 to 10)val newData = data.map(x => {  if (x % 2 == 0) { accum += 1}})newData.countacc.valuenewData.foreach(println)acc.value

上述现象,会造成acc.value的最终值变为10

原因分析

Spark中的一系列transform操作都会构造成一长串的任务链,此时就需要通过一个action操作来触发(lazy的特性),accumulator也是如此。

  • 因此在一个action操作之后,调用value方法查看,是没有任何变化
  • 第一次action操作之后,调用value方法查看,变成了5
  • 第二次action操作之后,调用value方法查看,变成了10

原因就在于第二次action操作的时候,又执行了一次累加器的操作,同个累加器,在原有的基础上又加了5,从而变成了10

解决方案

通过上述的现象描述,我们可以很快知道解决的方法:只进行一次action操作。基于此,我们只要切断任务之间的依赖关系就可以了,即使用cache、persist。这样操作之后,那么后续的累加器操作就不会受前面的transform操作影响了

相关案例

  • 需求

    使用Accumulators统计emp表中NULL出现的次数以及正常数据的条数 & 打印正常数据的信息

  • 数据

    7369  SMITH   CLERK   7902    1980-12-17  800.00      207499  ALLEN   SALESMAN    7698    1981-2-20   1600.00 300.00  307521  WARD    SALESMAN    7698    1981-2-22   1250.00 500.00  307566  JONES   MANAGER 7839    1981-4-2    2975.00     207654  MARTIN  SALESMAN    7698    1981-9-28   1250.00 1400.00 307698  BLAKE   MANAGER 7839    1981-5-1    2850.00     307782  CLARK   MANAGER 7839    1981-6-9    2450.00     107788  SCOTT   ANALYST 7566    1987-4-19   3000.00     207839  KING    PRESIDENT       1981-11-17  5000.00     107844  TURNER  SALESMAN    7698    1981-9-8    1500.00 0.00    307876  ADAMS   CLERK   7788    1987-5-23   1100.00     207900  JAMES   CLERK   7698    1981-12-3   950.00      307902  FORD    ANALYST 7566    1981-12-3   3000.00     207934  MILLER  CLERK   7782    1982-1-23   1300.00     10
  • 遇到的坑 & 解决方法

    现象描述 & 原因分析:

    我们都知道,spark中的一系列transform操作会构成一串长的任务链,此时就需要通过一个action操作来触发; accumulator也是一样的,只有当action操作执行时,才会触发accumulator的执行; 因此在一个action操作之前,我们调用accumulator的value方法是无法查看其数值的,肯定是没有任何变化的; 所以在对normalData进行foreach操作之后,即action操作之后,我们会发现累加器的数值就变成了11; 之后,我们对normalData再进行一次count操作之后,即又一次的action操作之后,其实这时候,又去执行了一次前面的transform操作; 因此累加器的值又增加了11,变成了22

    解决办法:

    经过上面的分析,我们可以知道,使用累加器的时候,我们只有使用一次action操作才能够保证结果的准确性 因此,我们面对这种情况,是有办法的,做法就是切断它们相互之间的依赖关系即可 因此对normalData使用cache方法,当RDD第一次被计算出来时,就会被直接缓存起来 再调用时,相同的计算操作就不会再重新计算一遍

    import org.apache.spark.{SparkConf, SparkContext}/*** 使用Spark Accumulators完成Job的数据量处理* 统计emp表中NULL出现的次数以及正常数据的条数 & 打印正常数据的信息*/object AccumulatorsApp {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("AccumulatorsApp")val sc = new SparkContext(conf)val lines = sc.textFile("E:/emp.txt")// long类型的累加器值val nullNum = sc.longAccumulator("NullNumber")val normalData = lines.filter(line => {  var flag = trueval splitLines = line.split("\t")  for (splitLine <- splitLines){    if ("".equals(splitLine)){      flag = false      nullNum.add(1)    }  }  flag})// 使用cache方法,将RDD的第一次计算结果进行缓存;防止后面RDD进行重复计算,导致累加器的值不准确normalData.cache()// 打印每一条正常数据normalData.foreach(println)// 打印正常数据的条数println("NORMAL DATA NUMBER: " + normalData.count())// 打印emp表中NULL出现的次数println("NULL: " + nullNum.value)sc.stop()}}
0