千家信息网

Spark 累加器实验

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,以下代码用 Pyspark + IPython 完成统计日志空行的数量:读取日志,创建RDD:myrdd = sc.textFile("access.log")不使用累加器:In [6
千家信息网最后更新 2025年01月24日Spark 累加器实验

以下代码用 Pyspark + IPython 完成


统计日志空行的数量:

读取日志,创建RDD:

myrdd = sc.textFile("access.log")


  1. 不使用累加器:

In [68]: s = 0In [69]: def f(x):    ...:     global s    ...:     if len(x) == 0:    ...:         s += 1    ...:In [70]: myrdd.foreach(f)In [71]: print (s)

得出结果为:

0

原因是python 的变量,即使是全局变量不能应用在各个计算进程(线程)中同步数据,所以需要分布式计算框架的变量来同步数据,Spark 中采用累加器来解决:


  1. 使用累加器

In [64]: s = sc.accumulator(0)In [65]: def f(x):    ...:     global s    ...:     if len(x) == 0:    ...:         s += 1    ...:In [66]: myrdd.foreach(f)In [67]: print (s)

得出正确结果:

14



0