Flink中Watermarks怎么用
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章将为大家详细讲解有关Flink中Watermarks怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Watermarks水印:为输入的数据流的设置一个时
千家信息网最后更新 2025年02月04日Flink中Watermarks怎么用
这篇文章将为大家详细讲解有关Flink中Watermarks怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案
示例环境
java.version: 1.8.xflink.version: 1.11.1
TimestampsAndWatermarks.java
import com.flink.examples.DataSource;import org.apache.commons.lang3.time.DateFormatUtils;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import java.util.Date;import java.util.Iterator;import java.util.List;/** * @Description Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案 */public class TimestampsAndWatermarks { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html */ /** * 遍历集合,分别打印不同性别的信息,对于执行超时,自动触发定时器 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* TimeCharacteristic有三种时间类型: ProcessingTime:以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间; IngestionTime:以数据进入flink streaming data flow的时间为准; EventTime:以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段;需要实现assignTimestampsAndWatermarks方法,并设置时间水位线; */ //使用event time,需要指定事件的时间戳 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); //设置自动生成水印的时间周期,避免数据流量大的情况下,频繁添加水印导致计算性能降低。 env.getConfig().setAutoWatermarkInterval(1000L); List> tuple3List = DataSource.getTuple3ToList(); DataStream > inStream = env.addSource(new MyRichSourceFunction()); DataStream > dataStream = inStream //为一个水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。 //Duration.ofSeconds(2),到数据流到达flink后,再水位线中设置延迟时间,也就是在所有数据流的最大的事件时间比window窗口结束时间大或相等时,再延迟多久触发window窗口结束;// .assignTimestampsAndWatermarks(// WatermarkStrategy. >forBoundedOutOfOrderness(Duration.ofSeconds(2))// .withTimestampAssigner((element, timestamp) -> {// long times = System.currentTimeMillis() ;// System.out.println(element.f1 + ","+ element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));// return times;// })// ) .assignTimestampsAndWatermarks(new MyWatermarkStrategy() .withTimestampAssigner(new SerializableTimestampAssigner >() { @Override public long extractTimestamp(Tuple3 element, long timestamp) { long times = System.currentTimeMillis(); System.out.println(element.f1 + "," + element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss")); return times; } })) //分区窗口 .keyBy((KeySelector , String>) k -> k.f1) //触发3s滚动窗口 .window(TumblingEventTimeWindows.of(Time.seconds(3))) //执行窗口数据,对keyBy数据流批量处理 .apply(new WindowFunction , Tuple2 , String, TimeWindow>(){ @Override public void apply(String s, TimeWindow window, Iterable > input, Collector > out) throws Exception { long times = System.currentTimeMillis() ; System.out.println(); System.out.println("窗口处理时间:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss")); Iterator > iterator = input.iterator(); int total = 0; int size = 0; String sex = ""; while (iterator.hasNext()){ Tuple3 tuple3 = iterator.next(); total += tuple3.f2; size ++; sex = tuple3.f1; } out.collect(new Tuple2<>(sex, total / size)); } }); dataStream.print(); env.execute("flink Filter job"); } /** * 定期水印生成器 */ public static class MyWatermarkStrategy implements WatermarkStrategy >{ @Override public WatermarkGenerator > createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator >() { //设置固定的延迟量3.5 seconds private final long maxOutOfOrderness = 3500; private long currentMaxTimestamp; /** * 事件处理 * @param event 数据流对象 * @param eventTimestamp 事件水位线时间 * @param output 输出 */ @Override public void onEvent(Tuple3 event, long eventTimestamp, WatermarkOutput output) { currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { // 拿上一个水印时间 - 延迟量 = 等于给的窗口最终数据最后时间(如果在窗口到期内,未发生新的水印事件,则按window正常结束时间计算,当在最后水印时间-延迟量的时间范围内,有新的数据流进入,则会重新触发窗口内对全部数据流计算) output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1)); } }; } } /** * 模拟数据持续输出 */ public static class MyRichSourceFunction extends RichSourceFunction > { @Override public void run(SourceContext > ctx) throws Exception { List > tuple3List = DataSource.getTuple3ToList(); int j = 0; for (int i=0;i<100;i++){ if (i%6 == 0){ j=0; } ctx.collect(tuple3List.get(j)); //1秒钟输出一个 Thread.sleep(1 * 1000); j ++; } } @Override public void cancel() { try{ super.close(); }catch (Exception e){ e.printStackTrace(); } } }}
打印结果
man,张三的水位线为:2020-12-27 10:28:20girl,李四的水位线为:2020-12-27 10:28:21man,王五的水位线为:2020-12-27 10:28:22girl,刘六的水位线为:2020-12-27 10:28:23girl,伍七的水位线为:2020-12-27 10:28:24窗口处理时间:2020-12-27 10:28:25(man,20)man,吴八的水位线为:2020-12-27 10:28:25man,张三的水位线为:2020-12-27 10:28:26girl,李四的水位线为:2020-12-27 10:28:27窗口处理时间:2020-12-27 10:28:28(girl,28)窗口处理时间:2020-12-27 10:28:28(man,29)
关于"Flink中Watermarks怎么用"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
时间
数据
水位
水位线
数据流
水印
事件
处理
延迟
输入
篇文章
输出
字段
方案
更多
解决方案
张三
李四
生成
不同
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
精灵起源服务器客服端怎么进去
云服务器ecs哪个品牌好
软件开发经营范围包括
鲲鹏服务器授权供应商
土地利用规划数据库更新技术标准
771 主板服务器
java数据库连接如何实现
实时数据库对象有哪五种类型数据
美国网络安全战略中的网络战略
网络安全企业文化标语挂墙
网络安全主题讲座主持词
ciw网络安全证书样本
中科院网络安全教学视频第二讲
网络安全特选
网络安全叶檀
连云港软件开发公司
服务器空间怎么用
商业智能数据库问题
电力信息网络安全演练
软件开发如何避免人员流失
广东省公安厅网络安全局
得力集团软件开发是干啥的
互通网络技术有限公司
云栖大会数据库
深圳网络技术研究院
中信银行总行软件开发待遇
数据库 列的属性值
虚拟电脑能开多少个服务器
客户管理服务器
方舟手机版本怎么开服务器