Flink DataSet算子的作用是什么
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,这篇文章主要介绍"Flink DataSet算子的作用是什么",在日常操作中,相信很多人在Flink DataSet算子的作用是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对
千家信息网最后更新 2025年01月31日Flink DataSet算子的作用是什么
这篇文章主要介绍"Flink DataSet算子的作用是什么",在日常操作中,相信很多人在Flink DataSet算子的作用是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Flink DataSet算子的作用是什么"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
Flink为了能够处理有边界的数据集和无边界的数据集,提供了对应的DataSet API和DataStream API。我们可以开发对应的Java程序或者Scala程序来完成相应的功能。下面举例了一些DataSet API中的基本的算子。
下面我们通过具体的代码来为大家演示每个算子的作用。
1、Map、FlatMap与MapPartition
//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayListdata = new ArrayList ();data.add("I love Beijing");data.add("I love China");data.add("Beijing is the capital of China");DataSource text = env.fromCollection(data);DataSet > mapData = text.map(new MapFunction
>() { public List map(String data) throws Exception { String[] words = data.split(" "); //创建一个List List result = new ArrayList (); for(String w:words){ result.add(w); } return result; }});mapData.print();System.out.println("*****************************************");DataSet flatMapData = text.flatMap(new FlatMapFunction () { public void flatMap(String data, Collector collection) throws Exception { String[] words = data.split(" "); for(String w:words){ collection.collect(w); } }});flatMapData.print();System.out.println("*****************************************");/* new MapPartitionFunction 第一个String:表示分区中的数据元素类型 第二个String:表示处理后的数据元素类型*/DataSet mapPartitionData = text.mapPartition(new MapPartitionFunction () { public void mapPartition(Iterable values, Collector out) throws Exception { //针对分区进行操作的好处是:比如要进行数据库的操作,一个分区只需要创建一个Connection //values中保存了一个分区的数据 Iterator it = values.iterator(); while (it.hasNext()) { String next = it.next(); String[] split = next.split(" "); for (String word : split) { out.collect(word); } } //关闭链接 }});mapPartitionData.print();
2、Filter与Distinct
//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayListdata = new ArrayList ();data.add("I love Beijing");data.add("I love China");data.add("Beijing is the capital of China");DataSource text = env.fromCollection(data);DataSet flatMapData = text.flatMap(new FlatMapFunction () { public void flatMap(String data, Collector collection) throws Exception { String[] words = data.split(" "); for(String w:words){ collection.collect(w); } }});//去掉重复的单词flatMapData.distinct().print();System.out.println("*********************");//选出长度大于3的单词flatMapData.filter(new FilterFunction () { public boolean filter(String word) throws Exception { int length = word.length(); return length>3?true:false; }}).print();
3、Join操作
//获取运行的环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID 姓名ArrayList> data1 = new ArrayList >();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(2,"Mike"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList > data2 = new ArrayList >();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(3,"广州"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID 姓名 所在的程序DataSet > table1 = env.fromCollection(data1);DataSet > table2 = env.fromCollection(data2);table1.join(table2).where(0).equalTo(0)/*第一个Tuple2 :表示第一张表 * 第二个Tuple2 :表示第二张表 * Tuple3 :多表join连接查询后的返回结果 */ .with(new JoinFunction , Tuple2 , Tuple3 >() { public Tuple3 join(Tuple2 table1, Tuple2 table2) throws Exception { return new Tuple3 (table1.f0,table1.f1,table2.f1); } }).print();
4、笛卡尔积
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID 姓名ArrayList> data1 = new ArrayList >();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(2,"Mike"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList > data2 = new ArrayList >();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(3,"广州"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID 姓名 所在的程序DataSet > table1 = env.fromCollection(data1);DataSet > table2 = env.fromCollection(data2);//生成笛卡尔积table1.cross(table2).print();
5、First-N
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//这里的数据是:员工姓名、薪水、部门号DataSet> grade = env.fromElements(new Tuple3 ("Tom",1000,10), new Tuple3 ("Mary",1500,20), new Tuple3 ("Mike",1200,30), new Tuple3 ("Jerry",2000,10));//按照插入顺序取前三条记录grade.first(3).print();System.out.println("**********************");//先按照部门号排序,在按照薪水排序grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();System.out.println("**********************");//按照部门号分组,求每组的第一条记录grade.groupBy(2).first(1).print();
6、外链接操作
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID 姓名ArrayList> data1 = new ArrayList >();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList > data2 = new ArrayList >();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID 姓名 所在的程序DataSet > table1 = env.fromCollection(data1);DataSet > table2 = env.fromCollection(data2);//左外连接table1.leftOuterJoin(table2).where(0).equalTo(0) .with(new JoinFunction , Tuple2 , Tuple3 >() { public Tuple3 join(Tuple2 table1, Tuple2 table2) throws Exception { // 左外连接表示等号左边的信息会被包含 if(table2 == null){ return new Tuple3 (table1.f0,table1.f1,null); }else{ return new Tuple3 (table1.f0,table1.f1,table2.f1); } } }).print();System.out.println("***********************************");//右外连接table1.rightOuterJoin(table2).where(0).equalTo(0) .with(new JoinFunction , Tuple2 , Tuple3 >() { public Tuple3 join(Tuple2 table1, Tuple2 table2) throws Exception { //右外链接表示等号右边的表的信息会被包含 if(table1 == null){ return new Tuple3 (table2.f0,null,table2.f1); }else{ return new Tuple3 (table2.f0,table1.f1,table2.f1); } } }).print();System.out.println("***********************************");//全外连接table1.fullOuterJoin(table2).where(0).equalTo(0).with(new JoinFunction , Tuple2 , Tuple3 >() { public Tuple3 join(Tuple2 table1, Tuple2 table2) throws Exception { if(table1 == null){ return new Tuple3 (table2.f0,null,table2.f1); }else if(table2 == null){ return new Tuple3 (table1.f0,table1.f1,null); }else{ return new Tuple3 (table1.f0,table1.f1,table2.f1); } } }).print();
到此,关于"Flink DataSet算子的作用是什么"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
用户
姓名
数据
算子
所在
作用
程序
学习
查询
城市
环境
部门
链接
上海
北京
重庆
运行
信息
元素
单词
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
报名表模板软件开发
山东商城软件开发解决方案
数据库技术及应用第二章答案
维控科技互联网平台
前端创建的数据库在哪儿找
网络技术运营有限公司
软件开发技术基础教程
全文搜索引擎数据库
怎样租服务器最安全
中间件管理服务器
imfifs数据库怎么用
怎么替换服务器中文件
网络技术与维修
swot网络安全模式
网络安全宣传怎么上网
福州市网络安全等级备案
项目数据库与java
电脑显示无法连接后台数据库
软件开发公司的财务报表模板
网络安全黑板报内容初中
搭建本地服务器
软件开发类客服做什么
黄山网络安全证
云计算和服务器应用慕课测试答案
软件开发吃的是青春饭吗
网络安全的重要性对我们
王者服务器一直进不去怎么办
我的世界去哪租服务器
5g网络技术什么时候普及
数据库系统的FD和主键例题