spark2.x由浅入深深到底系列六之RDD java api详解三
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark本文详细介绍了spark key-value类型的rdd java api一、key-value类型的RDD的创建方式1
千家信息网最后更新 2025年02月05日spark2.x由浅入深深到底系列六之RDD java api详解三
学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark
本文详细介绍了spark key-value类型的rdd java api
一、key-value类型的RDD的创建方式
1、sparkContext.parallelizePairs
JavaPairRDDjavaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3)));//结果:[(test,3), (kkk,3)]System.out.println("javaPairRDD = " + javaPairRDD.collect());
2、keyBy的方式
public class User implements Serializable { private String userId; private Integer amount; public User(String userId, Integer amount) { this.userId = userId; this.amount = amount; } @Override public String toString() { return "User{" + "userId='" + userId + '\'' + ", amount=" + amount + '}'; }}JavaRDDuserJavaRDD = sc.parallelize(Arrays.asList(new User("u1", 20)));JavaPairRDD userJavaPairRDD = userJavaRDD.keyBy(new Function () { @Override public String call(User user) throws Exception { return user.getUserId(); }});//结果:[(u1,User{userId='u1', amount=20})]System.out.println("userJavaPairRDD = " + userJavaPairRDD.collect());
3、zip的方式
JavaRDDrdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));//两个rdd zip也是创建key-value类型RDD的一种方式JavaPairRDD zipPairRDD = rdd.zip(rdd);//结果:[(1,1), (1,1), (2,2), (3,3), (5,5), (8,8), (13,13)]System.out.println("zipPairRDD = " + zipPairRDD.collect());
4、groupBy的方式
JavaRDDrdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));Function isEven = new Function () { @Override public Boolean call(Integer x) throws Exception { return x % 2 == 0; }};//将偶数和奇数分组,生成key-value类型的RDDJavaPairRDD > oddsAndEvens = rdd.groupBy(isEven);//结果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])]System.out.println("oddsAndEvens = " + oddsAndEvens.collect());//结果:1System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());oddsAndEvens = rdd.groupBy(isEven, 2);//结果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])]System.out.println("oddsAndEvens = " + oddsAndEvens.collect());//结果:2System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());
二、combineByKey
JavaPairRDDjavaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数Function > createCombiner = new Function >() { @Override public Tuple2 call(Integer value) throws Exception { return new Tuple2<>(value, 1); }};//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数Function2 , Integer, Tuple2 > mergeValue = new Function2 , Integer, Tuple2 >() { @Override public Tuple2 call(Tuple2 acc, Integer value) throws Exception { return new Tuple2<>(acc._1() + value, acc._2() + 1); } };//当需要对不同分区的数据进行聚合的时候应用这个函数Function2 , Tuple2 , Tuple2 > mergeCombiners = new Function2 , Tuple2 , Tuple2 >() { @Override public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception { return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); } };JavaPairRDD > combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);//结果:[(coffee,(12,3)), (panda,(3,1))]System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
combineByKey的数据流如下:
对于combineByKey的原理讲解详细见: spark core RDD api原理详解
三、aggregateByKey
JavaPairRDD> aggregateByKeyRDD = javaPairRDD.aggregateByKey(new Tuple2<>(0, 0), mergeValue, mergeCombiners);//结果:[(coffee,(12,3)), (panda,(3,1))]System.out.println("aggregateByKeyRDD = " + aggregateByKeyRDD.collect());//aggregateByKey是由combineByKey实现的,上面的aggregateByKey就是等于下面的combineByKeyRDDFunction > createCombinerAggregateByKey = new Function >() { @Override public Tuple2 call(Integer value) throws Exception { return mergeValue.call(new Tuple2<>(0, 0), value); } };//结果是: [(coffee,(12,3)), (panda,(3,1))]System.out.println(javaPairRDD.combineByKey(createCombinerAggregateByKey, mergeValue, mergeCombiners).collect());
四、reduceByKey
JavaPairRDDreduceByKeyRDD = javaPairRDD.reduceByKey(new Function2 () { @Override public Integer call(Integer value1, Integer value2) throws Exception { return value1 + value2; }});//结果:[(coffee,12), (panda,3)]System.out.println("reduceByKeyRDD = " + reduceByKeyRDD.collect());//reduceByKey底层也是combineByKey实现的,上面的reduceByKey等于下面的combineByKeyFunction createCombinerReduce = new Function () { @Override public Integer call(Integer integer) throws Exception { return integer; }};Function2 mergeValueReduce = new Function2 () { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } };//结果:[(coffee,12), (panda,3)]System.out.println(javaPairRDD.combineByKey(createCombinerReduce, mergeValueReduce, mergeValueReduce).collect());
五、foldByKey
JavaPairRDDfoldByKeyRDD = javaPairRDD.foldByKey(0, new Function2 () { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; }});//结果:[(coffee,12), (panda,3)]System.out.println("foldByKeyRDD = " + foldByKeyRDD.collect());//foldByKey底层也是combineByKey实现的,上面的foldByKey等于下面的combineByKeyFunction2 mergeValueFold = new Function2 () { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } };Function createCombinerFold = new Function () { @Override public Integer call(Integer integer) throws Exception { return mergeValueFold.call(0, integer); }};//结果:[(coffee,12), (panda,3)]System.out.println(javaPairRDD.combineByKey(createCombinerFold, mergeValueFold, mergeValueFold).collect());
六、groupByKey
JavaPairRDD> groupByKeyRDD = javaPairRDD.groupByKey();//结果:[(coffee,[1, 2, 9]), (panda,[3])]System.out.println("groupByKeyRDD = " + groupByKeyRDD.collect());//groupByKey底层也是combineByKey实现的,上面的groupByKey等于下面的combineByKeyFunction > createCombinerGroup = new Function >() { @Override public List call(Integer integer) throws Exception { List list = new ArrayList<>(); list.add(integer); return list; }};Function2 , Integer, List
> mergeValueGroup = new Function2 , Integer, List
>() { @Override public List call(List integers, Integer integer) throws Exception { integers.add(integer); return integers; }};Function2 , List
, List > mergeCombinersGroup = new Function2 , List
, List >() { @Override public List call(List integers, List integers2) throws Exception { integers.addAll(integers2); return integers; } };//结果:[(coffee,[1, 2, 9]), (panda,[3])]System.out.println(javaPairRDD.combineByKey(createCombinerGroup, mergeValueGroup, mergeCombinersGroup).collect());
对于api原理性的东西很难用文档说明清楚,如果想更深入,更透彻的理解api的原理,可以参考: spark core RDD api原理详解
结果
面的
原理
方式
函数
类型
应用
底层
时候
数据
参考
不同
清楚
透彻
东西
两个
偶数
奇数
就是
数据流
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发的职责
linux服务器有什么用
网络安全在哪举办的
尚学堂北京软件开发
录像服务器管理软件价格
服务器2682v4
自动化无线网络技术
共享经济服务器租赁
南安洪濑网络安全监督部门
红米3谷歌服务器
科技风互联网工作汇报
网络安全黑板抄报
软件开发做账注意事项
数据库代码的特点
ipad御剑天涯服务器
我校开展网络安全演练
怎样查看数据库连接
中国企业网络安全投入平均值
Java软件开发 长春
rpu软件开发
rad 软件开发
学网络技术应该从哪先学起
软件开发社团缩写
项目部署到服务器以后的相对路径
好的软件开发设计公司
方舟生存游戏官方服务器解说
王者注销后服务器怎么还在
中国企业网络安全投入平均值
维护国家网络安全10句金句
网络安全产品 分类