千家信息网

spark2.x由浅入深深到底系列六之RDD java api详解三

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark本文详细介绍了spark key-value类型的rdd java api一、key-value类型的RDD的创建方式1
千家信息网最后更新 2025年02月05日spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark


本文详细介绍了spark key-value类型的rdd java api


一、key-value类型的RDD的创建方式

1、sparkContext.parallelizePairs

JavaPairRDD javaPairRDD =        sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3)));//结果:[(test,3), (kkk,3)]System.out.println("javaPairRDD = " + javaPairRDD.collect());

2、keyBy的方式

public class User implements Serializable {    private String userId;    private Integer amount;    public User(String userId, Integer amount) {        this.userId = userId;        this.amount = amount;    }    @Override    public String toString() {        return "User{" +                "userId='" + userId + '\'' +                ", amount=" + amount +                '}';    }}JavaRDD userJavaRDD = sc.parallelize(Arrays.asList(new User("u1", 20)));JavaPairRDD userJavaPairRDD = userJavaRDD.keyBy(new Function() {    @Override    public String call(User user) throws Exception {        return user.getUserId();    }});//结果:[(u1,User{userId='u1', amount=20})]System.out.println("userJavaPairRDD = " + userJavaPairRDD.collect());

3、zip的方式

JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));//两个rdd zip也是创建key-value类型RDD的一种方式JavaPairRDD zipPairRDD = rdd.zip(rdd);//结果:[(1,1), (1,1), (2,2), (3,3), (5,5), (8,8), (13,13)]System.out.println("zipPairRDD = " + zipPairRDD.collect());

4、groupBy的方式

JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));Function isEven = new Function() {    @Override    public Boolean call(Integer x) throws Exception {        return x % 2 == 0;    }};//将偶数和奇数分组,生成key-value类型的RDDJavaPairRDD> oddsAndEvens = rdd.groupBy(isEven);//结果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])]System.out.println("oddsAndEvens = " + oddsAndEvens.collect());//结果:1System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());oddsAndEvens = rdd.groupBy(isEven, 2);//结果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])]System.out.println("oddsAndEvens = " + oddsAndEvens.collect());//结果:2System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());

二、combineByKey

JavaPairRDD javaPairRDD =        sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2),                new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数Function> createCombiner = new Function>() {    @Override    public Tuple2 call(Integer value) throws Exception {        return new Tuple2<>(value, 1);    }};//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数Function2, Integer, Tuple2> mergeValue =        new Function2, Integer, Tuple2>() {            @Override            public Tuple2 call(Tuple2 acc, Integer value) throws Exception {                return new Tuple2<>(acc._1() + value, acc._2() + 1);            }        };//当需要对不同分区的数据进行聚合的时候应用这个函数Function2, Tuple2, Tuple2> mergeCombiners =        new Function2, Tuple2, Tuple2>() {            @Override            public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception {                return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2());            }        };JavaPairRDD> combineByKeyRDD =        javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);//结果:[(coffee,(12,3)), (panda,(3,1))]System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());

combineByKey的数据流如下:

对于combineByKey的原理讲解详细见: spark core RDD api原理详解

三、aggregateByKey

JavaPairRDD> aggregateByKeyRDD =        javaPairRDD.aggregateByKey(new Tuple2<>(0, 0), mergeValue, mergeCombiners);//结果:[(coffee,(12,3)), (panda,(3,1))]System.out.println("aggregateByKeyRDD = " + aggregateByKeyRDD.collect());//aggregateByKey是由combineByKey实现的,上面的aggregateByKey就是等于下面的combineByKeyRDDFunction> createCombinerAggregateByKey =        new Function>() {            @Override            public Tuple2 call(Integer value) throws Exception {                return mergeValue.call(new Tuple2<>(0, 0), value);            }        };//结果是: [(coffee,(12,3)), (panda,(3,1))]System.out.println(javaPairRDD.combineByKey(createCombinerAggregateByKey, mergeValue, mergeCombiners).collect());

四、reduceByKey

JavaPairRDD reduceByKeyRDD = javaPairRDD.reduceByKey(new Function2() {    @Override    public Integer call(Integer value1, Integer value2) throws Exception {        return value1 + value2;    }});//结果:[(coffee,12), (panda,3)]System.out.println("reduceByKeyRDD = " + reduceByKeyRDD.collect());//reduceByKey底层也是combineByKey实现的,上面的reduceByKey等于下面的combineByKeyFunction createCombinerReduce = new Function() {    @Override    public Integer call(Integer integer) throws Exception {        return integer;    }};Function2 mergeValueReduce =        new Function2() {            @Override            public Integer call(Integer integer, Integer integer2) throws Exception {                return integer + integer2;            }        };//结果:[(coffee,12), (panda,3)]System.out.println(javaPairRDD.combineByKey(createCombinerReduce, mergeValueReduce, mergeValueReduce).collect());

五、foldByKey

JavaPairRDD foldByKeyRDD = javaPairRDD.foldByKey(0, new Function2() {    @Override    public Integer call(Integer integer, Integer integer2) throws Exception {        return integer + integer2;    }});//结果:[(coffee,12), (panda,3)]System.out.println("foldByKeyRDD = " + foldByKeyRDD.collect());//foldByKey底层也是combineByKey实现的,上面的foldByKey等于下面的combineByKeyFunction2 mergeValueFold =        new Function2() {            @Override            public Integer call(Integer integer, Integer integer2) throws Exception {                return integer + integer2;            }        };Function createCombinerFold = new Function() {    @Override    public Integer call(Integer integer) throws Exception {        return mergeValueFold.call(0, integer);    }};//结果:[(coffee,12), (panda,3)]System.out.println(javaPairRDD.combineByKey(createCombinerFold, mergeValueFold, mergeValueFold).collect());

六、groupByKey

JavaPairRDD> groupByKeyRDD = javaPairRDD.groupByKey();//结果:[(coffee,[1, 2, 9]), (panda,[3])]System.out.println("groupByKeyRDD = " + groupByKeyRDD.collect());//groupByKey底层也是combineByKey实现的,上面的groupByKey等于下面的combineByKeyFunction> createCombinerGroup = new Function>() {    @Override    public List call(Integer integer) throws Exception {        List list = new ArrayList<>();        list.add(integer);        return list;    }};Function2, Integer, List> mergeValueGroup = new Function2, Integer, List>() {    @Override    public List call(List integers, Integer integer) throws Exception {        integers.add(integer);        return integers;    }};Function2, List, List> mergeCombinersGroup =        new Function2, List, List>() {            @Override            public List call(List integers, List integers2) throws Exception {                integers.addAll(integers2);                return integers;            }        };//结果:[(coffee,[1, 2, 9]), (panda,[3])]System.out.println(javaPairRDD.combineByKey(createCombinerGroup, mergeValueGroup, mergeCombinersGroup).collect());


对于api原理性的东西很难用文档说明清楚,如果想更深入,更透彻的理解api的原理,可以参考: spark core RDD api原理详解

0