Flink Reduce怎么用
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章主要讲解了"Flink Reduce怎么用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Reduce怎么用"吧!Reduce算子:
千家信息网最后更新 2025年02月02日Flink Reduce怎么用
这篇文章主要讲解了"Flink Reduce怎么用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Reduce怎么用"吧!
Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
Reduce.java
import com.flink.examples.DataSource;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;/** * @Description Reduce算子:对数据流进行滚动聚合计算,并返回每次滚动聚合计算合并后的结果 */public class Reduce { /** * 遍历集合,分区打印每一次滚动聚合的结果 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); List> tuple3List = DataSource.getTuple3ToList(); //注意:使用Integer进行分区时,会导致分区结果不对,转换成String类型输出key即可正确输出 KeyedStream , String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector , String>() { @Override public String getKey(Tuple3 tuple3) throws Exception { //f1为性别字段,以相同f1值(性别)进行分区 return String.valueOf(tuple3.f1); } }); SingleOutputStreamOperator > result = keyedStream.reduce(new ReduceFunction >() { @Override public Tuple3 reduce(Tuple3 t0, Tuple3 t1) throws Exception { int totalAge = t0.f2 + t1.f2; return new Tuple3<>("", t0.f1, totalAge); } }); result.print(); env.execute("flink Reduce job"); }}
打印结果
## 说明:为什么每一个分区的第一个数据对象每一个参数有值,是因为滚动聚合返回的是从第二数据对象向前叠加第一个数据对象,开始计算,所以第一个数据对象根本就不进入reduce方法;2> (张三,man,20)2> (,man,49)2> (,man,79)4> (李四,girl,24)4> (,girl,56)4> (,girl,74)
感谢各位的阅读,以上就是"Flink Reduce怎么用"的内容了,经过本文的学习后,相信大家对Flink Reduce怎么用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
结果
对象
学习
内容
性别
数据流
环境
示例
算子
输出
不对
相同
参数
字段
就是
思路
情况
数据源
文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
惠普服务器管理员密码忘记了
网络安全与青少年问卷
湖南不缺暖阳网络技术有限公司
软件开发管理的内容
渤海大学软件开发
服务器电源有没有过载保护
广州所有软件开发微信开发
浪潮服务器怎么关闭硬件时钟同步
mib数据库表间关系
宜昌网络安全审计
java服务器宣传
网络安全等级保护条例正式颁布
layui数据库修改传参
服务器被黑客攻击了视频
汽车业网络安全有新突破
数据库技术分析图素材
查出数据库的字段去除换行符
福建应用软件开发
国家网络安全宣传周班会策划
华为手机web服务器 app
无锡太湖学院网络安全密钥
软件开发高新技术标准
北碚区网络软件开发流程特点
金蝶加密服务器过期怎么办
无法验证服务器身份p108
微信软件开发客服
服务器被黑客攻击了视频
数据库中数据的主要结构
数据库 backup
ibm服务器 温度