Flink Process怎么用
发表于:2024-09-21 作者:千家信息网编辑
千家信息网最后更新 2024年09月21日,这篇文章主要介绍"Flink Process怎么用",在日常操作中,相信很多人在Flink Process怎么用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Flink
千家信息网最后更新 2024年09月21日Flink Process怎么用
这篇文章主要介绍"Flink Process怎么用",在日常操作中,相信很多人在Flink Process怎么用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Flink Process怎么用"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
process算子:处理每个keyBy(分区)输入到窗口的批量数据流(为KeyedStream类型数据流)
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
Process.java
import com.flink.examples.DataSource;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import org.apache.flink.util.Collector;import java.util.Iterator;import java.util.List;/** * @Description process算子:处理每个keyBy(分区)输入到窗口的批量数据流(为KeyedStream类型数据流) */public class Process { /** * 遍历集合,分别打印不同性别的总人数与年龄之和 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List> tuple3List = DataSource.getTuple3ToList(); DataStream dataStream = env.fromCollection(tuple3List) .keyBy((KeySelector , String>) k -> k.f1) //按数量窗口滚动,每3个输入数据流,计算一次 .countWindow(3) //处理每keyBy后的窗口数据流,process方法通常应用于KeyedStream类型的数据流处理 .process(new ProcessWindowFunction , String, String, GlobalWindow>() { /** * 处理窗口数据集合 * @param s 从keyBy里返回的key值 * @param context 窗口的上下文 * @param input 从窗口获取的所有分区数据流 * @param out 输出数据流对象 * @throws Exception */ @Override public void process(String s, Context context, Iterable > input, Collector out) throws Exception { Iterator > iterator = input.iterator(); int total = 0; int i = 0; while (iterator.hasNext()){ Tuple3 tuple3 = iterator.next(); total += tuple3.f2; i ++ ; } out.collect(s + "共:"+i+"人,平均年龄:" + total/i); } }); dataStream.print(); env.execute("flink Process job"); }}
打印结果
4> girl共:3人,平均年龄:242> man共:3人,平均年龄:26
到此,关于"Flink Process怎么用"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
数据
数据流
处理
年龄
学习
类型
输入
方法
更多
环境
示例
算子
帮助
不同
实用
接下来
上下
上下文
之和
人数
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
太原市互联网信息科技有限公司
网络安全优秀评选通知
核安全局堆芯软件开发导则
linux服务器流量分析
安果互联网科技有限公司湖北
数据库树表格显示
软件开发 增值税普通发票
读数据库应用技术有感
上海系统软件开发公司
苹果公司网站服务器在哪里
教育系统网络安全自查表
网络安全刷数据
营口游戏软件开发
软件开发企业成本归集案例
全国网络安全大赛第四场
华为手机软件开发自学hms
dell服务器设置
网络安全四个事关
砺锋软件开发
山东常用软件开发厂家价格
cs1.6如何添加服务器
数据库unique去重
网络安全吗阅读题
虹口区专业软件开发质量
打印机无线服务器远程安装
群控软件开发教程
海南省网络安全宣传周活动
网络安全用户能做什么
北邮时间服务器
深圳工控软件开发有用吗