千家信息网

Beam 超实用examples之Pi值计算

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,Beam Pi值计算Beam刚刚开源不是很久,快2个月了。目前的版本是0.5.0版本。官方的源码中提供了4个examples.无奈这四个案例都只是WordCount的四种不同的实现。作为一个从Spar
千家信息网最后更新 2025年01月23日Beam 超实用examples之Pi值计算Beam Pi值计算





Beam刚刚开源不是很久,快2个月了。目前的版本是0.5.0版本。官方的源码中提供了4个examples.无奈这四个案例都只是WordCount的四种不同的实现。作为一个从Spark进入大数据殿堂的笔者来说,用过n多次的SparkPi的我,怎么能忍受竟然没有Pi实现的example呢。假如有了这个案例,可以非常方便的无论在开发工具中还是在集群中进行测试。于是便有了下文。笔者的文笔和技术有限。不足之处,还望朋友多多提建议。Let us come on 。

我们先来讲讲Pi的实现原理。我们是用概率统计的方法来实现的。先来想象一下,以一个单位为半径画圆,再画一个圆的外切正方形。假设一个杯子的底部就被这个正方形和正方形内切圆全部填满。做n次试验,往杯子中扔石头,落在圆内的次数除以总次数是不是Pi*r*r/2r*2r也就是Pi/4.

因此,Pi就是4倍的此概率。


以下是代码的试验。仅供参考。


可以复制代码

package org.tongfang.beam.examples;


import java.util.ArrayList;

import java.util.List;



import org.apache.beam.runners.spark.SparkRunner;

import org.apache.beam.sdk.Pipeline;

import org.apache.beam.sdk.options.PipelineOptions;

import org.apache.beam.sdk.options.PipelineOptionsFactory;

import org.apache.beam.sdk.transforms.*;


public class BeamPi {


public static void main(String[] args) {

// Beam Pi的自定义实现方式

//第一步骤:创建options,

//通过该对象可以选择使用哪个计算框架来计算,并且设置应用的名称

PipelineOptions options = PipelineOptionsFactory.create();

//设置job(应用)名称

options.setJobName("Beam Pi");

//设置runner为Spark

options.setRunner(SparkRunner.class);

//创建管道 p

Pipeline p = Pipeline.create(options);

//100000000次的随机试验的次数,如果资源,

//足够的大可以进行更多次的试验,用大数据的理论来说,

//理论上可以进行无数次的试验(只要不断的横向扩展计算的资源)。

List list = new ArrayList();

for(int i = 0;i<1000000;i++){

list.add(i);

}


//相当于Spark从内存中读取数据,并通过map迭代访问每一个元素,

//这里迭代1000000的访问每个依次增大的数字,

//没迭代一次,做一次试验,当点落到圆内,计数增加1,否则不计数

//也就是什么也不做

//然后再近些Count计数,最后计数结果除以试验次数,就是概率。

//从数学角度来看,PI的值就是4倍这个概率。从而计算出PI的值。

p.apply(Create.of(list)).apply(ParDo.of(new DoFn() {

double x = 0;

double y = 0;

private static final long serialVersionUID = 1L;

@ProcessElement

public void processElement(ProcessContext c) {

x = Math.random() * 2 - 1;;

y = Math.random() * 2 - 1;;

if((x*x+y*y)<1){

c.output(1);

}else{

}

}

})).apply( Count.globally()).

apply(MapElements.via(new SimpleFunction() {


/**

*

*/

private static final long serialVersionUID = 1L;


public Void apply(Long input) {

Float res = (float) (4.0*(float)input / 1000000f);

System.out.println(input);

System.out.println("PI : "+res);


return null;

}

}));

//这是运行计算的关键,如果这个代码不写,

//整个代码都是懒加载,并非真正计算。

p.run().waitUntilFinish();

}


}



0