千家信息网

spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式

发表于:2024-10-23 作者:千家信息网编辑
千家信息网最后更新 2024年10月23日,学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个sc
千家信息网最后更新 2024年10月23日spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式

学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark


我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个scala函数其实就是java中的一个接口,对于java8 lambda而言,也是一样,一个lambda表达式就是java中的一个接口。接下来我们先看看spark中最简单的wordcount这个例子,分别用java8的非lambda以及lambda来实现:


一、非lambda实现的java spark wordcount程序:

public class WordCount {    public static void main(String[] args) {        SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");        JavaSparkContext sc = new JavaSparkContext(conf);        //JavaPairRDD inputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt",        //        TextInputFormat.class, LongWritable.class, Text.class);        JavaRDD inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt");        JavaRDD wordsRDD = inputRDD.flatMap(new FlatMapFunction() {            @Override            public Iterator call(String s) throws Exception {                return Arrays.asList(s.split(" ")).iterator();            }        });        JavaPairRDD keyValueWordsRDD                = wordsRDD.mapToPair(new PairFunction() {            @Override            public Tuple2 call(String s) throws Exception {                return new Tuple2(s, 1);            }        });        JavaPairRDD wordCountRDD =                keyValueWordsRDD.reduceByKey(new HashPartitioner(2),                        new Function2() {            @Override            public Integer call(Integer a, Integer b) throws Exception {                return a + b;            }        });        //如果输出文件存在的话需要删除掉        File outputFile = new File("/Users/tangweiqun/wordcount");        if (outputFile.exists()) {            File[] files = outputFile.listFiles();            for(File file: files) {                file.delete();            }            outputFile.delete();        }        wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount");        System.out.println(wordCountRDD.collect());    }}


二、java8 lambda实现的wordcount代码

public class WordCount {    public static void main(String[] args) {        SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");        JavaSparkContext sc = new JavaSparkContext(conf);        //JavaPairRDD inputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt",        //        TextInputFormat.class, LongWritable.class, Text.class);        JavaRDD inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt");        JavaRDD wordsRDD = inputRDD.flatMap(input -> Arrays.asList(input.split(" ")).iterator());        JavaPairRDD keyValueWordsRDD                = wordsRDD.mapToPair(word -> new Tuple2(word, 1));        JavaPairRDD wordCountRDD = keyValueWordsRDD.reduceByKey((a, b) -> a + b);        //如果输出文件存在的话需要删除掉        File outputFile = new File("/Users/tangweiqun/wordcount");        if (outputFile.exists()) {            File[] files = outputFile.listFiles();            for(File file: files) {                file.delete();            }            outputFile.delete();        }        wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount");        System.out.println(wordCountRDD.collect());    }}


从上面可以看出,lambda的实现更加简洁,也可以看出一个lambda函数表达式就是一个java接口。


我们在http://7639240.blog.51cto.com/7629240/1966958提到的combineByKey,如下的代码:

JavaPairRDD javaPairRDD =        sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2),                new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2); //当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数Function> createCombiner = new Function>() {    @Override    public Tuple2 call(Integer value) throws Exception {        return new Tuple2<>(value, 1);    }};//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数Function2, Integer, Tuple2> mergeValue =        new Function2, Integer, Tuple2>() {            @Override            public Tuple2 call(Tuple2 acc, Integer value) throws Exception {                return new Tuple2<>(acc._1() + value, acc._2() + 1);            }        };//当需要对不同分区的数据进行聚合的时候应用这个函数Function2, Tuple2, Tuple2> mergeCombiners =        new Function2, Tuple2, Tuple2>() {            @Override            public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception {                return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2());            }        }; JavaPairRDD> combineByKeyRDD =        javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);//结果:[(coffee,(12,3)), (panda,(3,1))]System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());

可以写成如下的lambda实现的combineByKey:

JavaPairRDD javaPairRDD =        sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2),                new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数Function> createCombiner = value -> new Tuple2<>(value, 1);//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数Function2, Integer, Tuple2> mergeValue = (acc, value) ->new Tuple2<>(acc._1() + value, acc._2() + 1);//当需要对不同分区的数据进行聚合的时候应用这个函数Function2, Tuple2, Tuple2> mergeCombiners = (acc1, acc2) -> new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2());JavaPairRDD> combineByKeyRDD =        javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);//结果:[(coffee,(12,3)), (panda,(3,1))]System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());


如果想深入的系统的理解spark RDD api可以参考: spark core RDD api原理详解

0