Flink Aggregate怎么用
发表于:2024-11-22 作者:千家信息网编辑
千家信息网最后更新 2024年11月22日,本篇内容主要讲解"Flink Aggregate怎么用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink Aggregate怎么用"吧!Aggreg
千家信息网最后更新 2024年11月22日Flink Aggregate怎么用
本篇内容主要讲解"Flink Aggregate怎么用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink Aggregate怎么用"吧!
Aggregate算子:提供基于事件窗口进行增量计算的函数。(对输入窗口每个数据流元素递增聚合计算,并将窗口状态与窗口内元素保持在累加器中)
示例环境
java.version: 1.8.xflink.version: 1.11.1
Aggregate.java
import com.flink.examples.DataSource;import org.apache.flink.api.common.accumulators.AverageAccumulator;import org.apache.flink.api.common.functions.AggregateFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;/** * @Description Aggregate算子:提供基于事件窗口进行增量计算的函数。(对输入窗口每个数据流元素递增聚合计算,并将窗口状态与窗口内元素保持在累加器中) */public class Aggregate { /** * 遍历集合,分别打印不同性别的总人数与平均值 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Tuple3<姓名,性别(man男,girl女),年龄> List> tuple3List = DataSource.getTuple3ToList(); DataStream dataStream = env.fromCollection(tuple3List) .keyBy((KeySelector , String>) k -> k.f1) //按数量窗口滚动,每3个输入窗口数据流,计算一次 .countWindow(3) //只能基于Windowed窗口Stream进行调用 .aggregate(new AggregateFunction , MyAverageAccumulator, MyAverageAccumulator>() { /** * 创建新累加器,开始聚合计算 * @return */ @Override public MyAverageAccumulator createAccumulator() { return new MyAverageAccumulator(); } /** * 将窗口输入的数据流值添加到窗口累加器,并返回新的累加器值 * @param tuple3 * @param accumulator * @return */ @Override public MyAverageAccumulator add(Tuple3 tuple3, MyAverageAccumulator accumulator) { System.out.println("tuple3:" + tuple3.toString()); accumulator.setGender(tuple3.f1); //此accumulator保含个数统计和值累计两个属性,add方法内会计算窗口内总数与求和 accumulator.add(tuple3.f2); return accumulator; } /** * 获取累加器聚合结果 * @param accumulator * @return */ @Override public MyAverageAccumulator getResult(MyAverageAccumulator accumulator) { return accumulator; } /** * 合并两个累加器,返回合并后的累加器的状态 * @param a * @param b * @return */ @Override public MyAverageAccumulator merge(MyAverageAccumulator a, MyAverageAccumulator b) { a.merge(b); return a; } }); dataStream.print(); env.execute("flink Filter job"); } /** * 添加性别属性(此类用于显示不同性别的平均值) */ public static class MyAverageAccumulator extends AverageAccumulator{ private String gender; public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } @Override public String toString() { //继承父类的this.getLocalValue()方法用于计算并返回平均值 return super.toString() + ", gender to " + gender; } }}
打印结果
tuple3:(张三,man,20)tuple3:(李四,girl,24)tuple3:(刘六,girl,32)tuple3:(王五,man,29)tuple3:(伍七,girl,18)tuple3:(吴八,man,30)4> AverageAccumulator 24.666666666666668 for 3 elements, gender to girl2> AverageAccumulator 26.333333333333332 for 3 elements, gender to man
到此,相信大家对"Flink Aggregate怎么用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
累加器
元素
数据
数据流
输入
平均值
性别
方法
状态
不同
两个
事件
内容
函数
增量
属性
算子
结果
并将
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
惠普服务器恢复bios出厂设置
广东什么是网络技术分类服务标准
数据库备份软件排名
社交电商软件开发中心
化学品理化性质数据库
网络安全管理建设性方案
通信网络安全办法
授予网络安全示范学校
安装金蝶服务器要选择客户端吗
济南宣昂网络技术有限公司
网络安全保卫大队考核办法
vba 数据库连接
数据库pass是什么意思
四川邺兴互联网科技有限公司
网络安全bug悬赏任务平台
广西电网专业网络安全技术服务
简介猫王互联网科技有限公司
6月15日网络安全
医保码显示数据库重复
虹口区营销软件开发定制价格
数据库分组技术
软件开发过程的迭代
山东云空间装饰设计云服务器
王思聪服务器还需要主机吗
汤旺县公安局网络安全大检查
研究所的软件开发岗怎么样
爱知网网络安全
大型数据库技术实验课程
西继迅达服务器怎么操作
长光卫星软件开发怎么样