Flink Aggregate怎么用
发表于:2024-11-29 作者:千家信息网编辑
千家信息网最后更新 2024年11月29日,本篇内容主要讲解"Flink Aggregate怎么用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink Aggregate怎么用"吧!Aggreg
千家信息网最后更新 2024年11月29日Flink Aggregate怎么用
本篇内容主要讲解"Flink Aggregate怎么用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink Aggregate怎么用"吧!
Aggregate算子:提供基于事件窗口进行增量计算的函数。(对输入窗口每个数据流元素递增聚合计算,并将窗口状态与窗口内元素保持在累加器中)
示例环境
java.version: 1.8.xflink.version: 1.11.1
Aggregate.java
import com.flink.examples.DataSource;import org.apache.flink.api.common.accumulators.AverageAccumulator;import org.apache.flink.api.common.functions.AggregateFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;/** * @Description Aggregate算子:提供基于事件窗口进行增量计算的函数。(对输入窗口每个数据流元素递增聚合计算,并将窗口状态与窗口内元素保持在累加器中) */public class Aggregate { /** * 遍历集合,分别打印不同性别的总人数与平均值 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Tuple3<姓名,性别(man男,girl女),年龄> List> tuple3List = DataSource.getTuple3ToList(); DataStream dataStream = env.fromCollection(tuple3List) .keyBy((KeySelector , String>) k -> k.f1) //按数量窗口滚动,每3个输入窗口数据流,计算一次 .countWindow(3) //只能基于Windowed窗口Stream进行调用 .aggregate(new AggregateFunction , MyAverageAccumulator, MyAverageAccumulator>() { /** * 创建新累加器,开始聚合计算 * @return */ @Override public MyAverageAccumulator createAccumulator() { return new MyAverageAccumulator(); } /** * 将窗口输入的数据流值添加到窗口累加器,并返回新的累加器值 * @param tuple3 * @param accumulator * @return */ @Override public MyAverageAccumulator add(Tuple3 tuple3, MyAverageAccumulator accumulator) { System.out.println("tuple3:" + tuple3.toString()); accumulator.setGender(tuple3.f1); //此accumulator保含个数统计和值累计两个属性,add方法内会计算窗口内总数与求和 accumulator.add(tuple3.f2); return accumulator; } /** * 获取累加器聚合结果 * @param accumulator * @return */ @Override public MyAverageAccumulator getResult(MyAverageAccumulator accumulator) { return accumulator; } /** * 合并两个累加器,返回合并后的累加器的状态 * @param a * @param b * @return */ @Override public MyAverageAccumulator merge(MyAverageAccumulator a, MyAverageAccumulator b) { a.merge(b); return a; } }); dataStream.print(); env.execute("flink Filter job"); } /** * 添加性别属性(此类用于显示不同性别的平均值) */ public static class MyAverageAccumulator extends AverageAccumulator{ private String gender; public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } @Override public String toString() { //继承父类的this.getLocalValue()方法用于计算并返回平均值 return super.toString() + ", gender to " + gender; } }}
打印结果
tuple3:(张三,man,20)tuple3:(李四,girl,24)tuple3:(刘六,girl,32)tuple3:(王五,man,29)tuple3:(伍七,girl,18)tuple3:(吴八,man,30)4> AverageAccumulator 24.666666666666668 for 3 elements, gender to girl2> AverageAccumulator 26.333333333333332 for 3 elements, gender to man
到此,相信大家对"Flink Aggregate怎么用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
累加器
元素
数据
数据流
输入
平均值
性别
方法
状态
不同
两个
事件
内容
函数
增量
属性
算子
结果
并将
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
证券交易记录数据库
软件开发行业成本控制
更换主机备份数据库
最近网络安全专家
mfc线下数据库
区块链与城市网络安全
找不到网上数据库
广州旅游软件开发订制
明日之后服务器没有安全区
对讲机语音存储服务器
快递软件开发法律风险
小学网络安全访问日志记录表
服务器UEFI模式装系统
售后服务好的定制软件开发
安卓应用软件开发步骤
关于服务器安全方面
一个服务器能连接两个路由器吗
邮箱服务器地是什么意思
全国公民身份证信息数据库
mac 服务器远程管理软件
云南超频服务器技术参数
大学生信息网络安全意识
无锡的软件开发公司哪家好
口碑好的网络技术推广经验丰富
服务器指南
cfhd打着打着就连不上服务器
多伦多软件开发工资
专业性软件开发程序
智慧城市的网络安全
小型办公室服务器