Beam 超实用examples之Pi值计算
Beam刚刚开源不是很久,快2个月了。目前的版本是0.5.0版本。官方的源码中提供了4个examples.无奈这四个案例都只是WordCount的四种不同的实现。作为一个从Spark进入大数据殿堂的笔者来说,用过n多次的SparkPi的我,怎么能忍受竟然没有Pi实现的example呢。假如有了这个案例,可以非常方便的无论在开发工具中还是在集群中进行测试。于是便有了下文。笔者的文笔和技术有限。不足之处,还望朋友多多提建议。Let us come on 。
我们先来讲讲Pi的实现原理。我们是用概率统计的方法来实现的。先来想象一下,以一个单位为半径画圆,再画一个圆的外切正方形。假设一个杯子的底部就被这个正方形和正方形内切圆全部填满。做n次试验,往杯子中扔石头,落在圆内的次数除以总次数是不是Pi*r*r/2r*2r也就是Pi/4.
因此,Pi就是4倍的此概率。
以下是代码的试验。仅供参考。
可以复制代码
package org.tongfang.beam.examples;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
public class BeamPi {
public static void main(String[] args) {
// Beam Pi的自定义实现方式
//第一步骤:创建options,
//通过该对象可以选择使用哪个计算框架来计算,并且设置应用的名称
PipelineOptions options = PipelineOptionsFactory.create();
//设置job(应用)名称
options.setJobName("Beam Pi");
//设置runner为Spark
options.setRunner(SparkRunner.class);
//创建管道 p
Pipeline p = Pipeline.create(options);
//100000000次的随机试验的次数,如果资源,
//足够的大可以进行更多次的试验,用大数据的理论来说,
//理论上可以进行无数次的试验(只要不断的横向扩展计算的资源)。
List
for(int i = 0;i<1000000;i++){
list.add(i);
}
//相当于Spark从内存中读取数据,并通过map迭代访问每一个元素,
//这里迭代1000000的访问每个依次增大的数字,
//没迭代一次,做一次试验,当点落到圆内,计数增加1,否则不计数
//也就是什么也不做
//然后再近些Count计数,最后计数结果除以试验次数,就是概率。
//从数学角度来看,PI的值就是4倍这个概率。从而计算出PI的值。
p.apply(Create.of(list)).apply(ParDo.of(new DoFn
double x = 0;
double y = 0;
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
x = Math.random() * 2 - 1;;
y = Math.random() * 2 - 1;;
if((x*x+y*y)<1){
c.output(1);
}else{
}
}
})).apply( Count.
apply(MapElements.via(new SimpleFunction
/**
*
*/
private static final long serialVersionUID = 1L;
public Void apply(Long input) {
Float res = (float) (4.0*(float)input / 1000000f);
System.out.println(input);
System.out.println("PI : "+res);
return null;
}
}));
//这是运行计算的关键,如果这个代码不写,
//整个代码都是懒加载,并非真正计算。
p.run().waitUntilFinish();
}
}