flink中的聚合算子是什么
发表于:2025-01-29 作者:千家信息网编辑
千家信息网最后更新 2025年01月29日,这篇文章主要讲解了"flink中的聚合算子是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"flink中的聚合算子是什么"吧!前言flink中的一个
千家信息网最后更新 2025年01月29日flink中的聚合算子是什么
这篇文章主要讲解了"flink中的聚合算子是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"flink中的聚合算子是什么"吧!
前言
flink中的一个接口org.apache.flink.api.common.functions.AggregateFunction,这个类可以接在window流之后,做窗口内的统计计算。
注意:除了这个接口AggregateFunction,flink中还有一个抽象类AggregateFunction:org.apache.flink.table.functions.AggregateFunction,大家不要把这个弄混淆了,接口AggregateFunction我们可以理解为flink中的一个算子,和MapFunction、FlatMapFunction等是同级别的,而抽象类AggregateFunction是用于用户自定义聚合函数的,和max、min之类的函数是同级的。
原理解析
比如我们想实现一个类似sql的功能:
select TUMBLE_START(proctime,INTERVAL '2' SECOND) as starttime,user,count(*) from logs group by user,TUMBLE(proctime,INTERVAL '2' SECOND)
这个sql就是来统计一下每两秒钟的滑动窗口内每个人出现的次数,今天我们就以这个简单的sql的功能为例讲解一下flink的aggregate算子,其实就是我们用程序来实现这个sql的功能。
首先看一下聚合函数的接口:
@PublicEvolving
public interface AggregateFunction extends Function, Serializable {
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
ACC merge(ACC a, ACC b);
OUT getResult(ACC accumulator);
}
这个接口AggregateFunction里面有4个方法,我们分别来讲解一下。
AggregateFunction这个类是一个泛型类,这里面有三个参数,IN, ACC, OUT。IN就是聚合函数的输入类型,ACC是存储中间结果的类型,OUT是聚合函数的输出类型。 createAccumulator
这个方法首先要创建一个累加器,要进行一些初始化的工作,比如我们要进行count计数操作,就要给累加器一个初始值。add
add方法就是我们要做聚合的时候的核心逻辑,比如我们做count累加,其实就是来一个数,然后就加一。
类似上面的sql的逻辑,我们在写业务逻辑的时候,可以这么想,进入这方法数的数据都是属于某一个用户的,系统在调用这个方法之前会先进行hash分组,然后不同的用户会重复调用这个方法。所以这个函数的入参是IN类型,返回值是ACC类型merge
因为flink是一个分布式计算框架,可能计算是分布在很多节点上同时进行的,比如上述的add操作,可能同一个用户在不同的节点上分别调用了add方法在本地节点对本地的数据进行了聚合操作,但是我们要的是整个结果,整个时候,我们就需要把每个用户各个节点上的聚合结果merge一下,整个merge方法就是做这个工作的,所以它的入参和出参的类型都是中间结果类型ACC。getResult
这个方法就是将每个用户最后聚合的结果经过处理之后,按照OUT的类型返回,返回的结果也就是聚合函数的输出结果了。
实例讲解
自定义source
首先我们自定义source生成用户的信息
public static class MySource implements SourceFunction>{
private volatile boolean isRunning = true;
String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};
@Override
public void run(SourceContext> ctx) throws Exception{
while (isRunning){
Thread.sleep(10);
String userid = userids[(int) (Math.random() * (userids.length - 1))];
ctx.collect(Tuple2.of(userid, System.currentTimeMillis()));
}
}
@Override
public void cancel(){
isRunning = false;
}
}
自定义聚合函数
public static class CountAggregate
implements AggregateFunction,Integer,Integer>{
@Override
public Integer createAccumulator(){
return 0;
}
@Override
public Integer add(Tuple2 value, Integer accumulator){
return ++accumulator;
}
@Override
public Integer getResult(Integer accumulator){
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b){
return a + b;
}
}
自定义结果输出函数
/**
* 这个是为了将聚合结果输出
*/
public static class WindowResult
implements WindowFunction,Tuple,TimeWindow>{
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable input,
Collector> out) throws Exception{
String k = ((Tuple1) key).f0;
long windowStart = window.getStart();
int result = input.iterator().next();
out.collect(Tuple3.of(k, new Date(windowStart), result));
}
}
主流程
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream> dataStream = env.addSource(new MySource());
dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
.aggregate(new CountAggregate(), new WindowResult()
).print();
env.execute();
感谢各位的阅读,以上就是"flink中的聚合算子是什么"的内容了,经过本文的学习后,相信大家对flink中的聚合算子是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
函数
结果
就是
方法
类型
用户
算子
接口
节点
输出
功能
时候
逻辑
学习
不同
内容
同级
数据
累加器
自定
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全的义务和权利
农村网络安全普及教育报告
巨神峰的服务器在哪个城市
湖北专业软件开发哪家正规
池州企业软件开发要多少钱
九游灌篮高手服务器
软件开发合同终止协议模板
战地1 管理踢出服务器
t3出行软件开发怎么样
关于初中网络安全的作文
数据库课件
济南刚刚好网络技术公司
镇江市网络安全与信息化领导小组
国家网络安全教育周演讲稿
软件开发 硬件指什么
局域网查看服务器ip
华古诗词数据库
数据库 case
服务器桌面怎么关闭
重庆长寿配送生鲜软件开发
北京软件开发好的培训
云服务器运维
金河田机箱的服务器
禅游科技中移互联网
青岛阿里巴巴网络技术公司
网络安全证文350左右
网络安全感基础
滨江区学习软件开发
听网络安全讲座作文
数据库自动备份工具源码