spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式
发表于:2024-10-23 作者:千家信息网编辑
千家信息网最后更新 2024年10月23日,学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个sc
千家信息网最后更新 2024年10月23日spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式
学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark
我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个scala函数其实就是java中的一个接口,对于java8 lambda而言,也是一样,一个lambda表达式就是java中的一个接口。接下来我们先看看spark中最简单的wordcount这个例子,分别用java8的非lambda以及lambda来实现:
一、非lambda实现的java spark wordcount程序:
public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); //JavaPairRDDinputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt", // TextInputFormat.class, LongWritable.class, Text.class); JavaRDD inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt"); JavaRDD wordsRDD = inputRDD.flatMap(new FlatMapFunction () { @Override public Iterator call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); JavaPairRDD keyValueWordsRDD = wordsRDD.mapToPair(new PairFunction () { @Override public Tuple2 call(String s) throws Exception { return new Tuple2 (s, 1); } }); JavaPairRDD wordCountRDD = keyValueWordsRDD.reduceByKey(new HashPartitioner(2), new Function2 () { @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } }); //如果输出文件存在的话需要删除掉 File outputFile = new File("/Users/tangweiqun/wordcount"); if (outputFile.exists()) { File[] files = outputFile.listFiles(); for(File file: files) { file.delete(); } outputFile.delete(); } wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount"); System.out.println(wordCountRDD.collect()); }}
二、java8 lambda实现的wordcount代码
public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); //JavaPairRDDinputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt", // TextInputFormat.class, LongWritable.class, Text.class); JavaRDD inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt"); JavaRDD wordsRDD = inputRDD.flatMap(input -> Arrays.asList(input.split(" ")).iterator()); JavaPairRDD keyValueWordsRDD = wordsRDD.mapToPair(word -> new Tuple2 (word, 1)); JavaPairRDD wordCountRDD = keyValueWordsRDD.reduceByKey((a, b) -> a + b); //如果输出文件存在的话需要删除掉 File outputFile = new File("/Users/tangweiqun/wordcount"); if (outputFile.exists()) { File[] files = outputFile.listFiles(); for(File file: files) { file.delete(); } outputFile.delete(); } wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount"); System.out.println(wordCountRDD.collect()); }}
从上面可以看出,lambda的实现更加简洁,也可以看出一个lambda函数表达式就是一个java接口。
我们在http://7639240.blog.51cto.com/7629240/1966958提到的combineByKey,如下的代码:
JavaPairRDDjavaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2); //当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数Function > createCombiner = new Function >() { @Override public Tuple2 call(Integer value) throws Exception { return new Tuple2<>(value, 1); }};//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数Function2 , Integer, Tuple2 > mergeValue = new Function2 , Integer, Tuple2 >() { @Override public Tuple2 call(Tuple2 acc, Integer value) throws Exception { return new Tuple2<>(acc._1() + value, acc._2() + 1); } };//当需要对不同分区的数据进行聚合的时候应用这个函数Function2 , Tuple2 , Tuple2 > mergeCombiners = new Function2 , Tuple2 , Tuple2 >() { @Override public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception { return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); } }; JavaPairRDD > combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);//结果:[(coffee,(12,3)), (panda,(3,1))]System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
可以写成如下的lambda实现的combineByKey:
JavaPairRDDjavaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数Function > createCombiner = value -> new Tuple2<>(value, 1);//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数Function2 , Integer, Tuple2 > mergeValue = (acc, value) ->new Tuple2<>(acc._1() + value, acc._2() + 1);//当需要对不同分区的数据进行聚合的时候应用这个函数Function2 , Tuple2 , Tuple2 > mergeCombiners = (acc1, acc2) -> new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2());JavaPairRDD > combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);//结果:[(coffee,(12,3)), (panda,(3,1))]System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
如果想深入的系统的理解spark RDD api可以参考: spark core RDD api原理详解
函数
应用
时候
就是
接口
表达式
不同
代码
数据
文件
结果
参考
输出
简洁
接下来
例子
原理
技术
程序
系统
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库的基础对象是
威锋网 互联网科技媒体
智慧社区软件开发投标文件
关于五年级网络安全的手抄报
服务器内存条必须要大小一致吗
银行数据库软件开发
用友如何恢复数据库
阿里云 dns服务器地址
女生学网络技术好不好
松江区银联数据库价格查询
伤害世界老是被踢出服务器
成都鲁齐软件开发工作室
网络安全设备市场研究报告
宿州oa管理软件开发定制
2019网络安全周走进校园
软件开发项目质保金规定
新三板区域数据库
网络技术负载均衡实验
贵州正规软件开发价格优惠
软件开发团队沟通机制
桌面软件开发软件工程师
腾讯云游戏服务器
cdc 数据库
破解全国中级经济师数据库
松江区银联数据库价格查询
小学网络技术培训a5作业
道道熊(天津)网络技术有限公司
c5235扫描后显示检查服务器
标准研究院网络安全
数据库未提交会怎么样