千家信息网

Flink的窗口机制介绍

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本篇内容介绍了"Flink的窗口机制介绍"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!读懂windo
千家信息网最后更新 2025年01月24日Flink的窗口机制介绍

本篇内容介绍了"Flink的窗口机制介绍"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

读懂window区别:

读懂window内部的源码实现关系

  • Window Assigner:用来决定某个元素被分配到哪个/哪些窗口中去。

  • Trigger:触发器。决定了一个窗口何时能够被计算或清除,每个窗口都会拥有一个自己的Trigger。


  • Evictor:可以译为"驱逐者"。在Trigger触发之后,在窗口被处理之前,Evictor(如果有Evictor的话)会用来剔除窗口中不需要的元素,相当于一个filter。

读懂WindowAssignern内部实现机制,它主要是实现数据的分发,分发到不同的window中,我简单举例一个,我设置window的开始和结束时间,然后触发器发现我的window达到了结束时间,这个window就会触发。

一张图读懂trigger,evictor,emit的执行顺序

假设有一个滑动计数窗口,每2个元素计算一次最近4个元素的总和,那么窗口工作示意图如下所示:

测试验证代码:

import java.utilimport org.apache.flink.api.common.ExecutionConfigimport org.apache.flink.streaming.api.{TimeCharacteristic, environment}import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.assigners.WindowAssignerimport org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, Trigger}import org.apache.flink.streaming.api.windowing.windows.TimeWindowobject FlinkWindowTest {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)    val input = env.socketTextStream("localhost", 9001)    val inputMap = input.flatMap(f => {      f.split("\\W+")    }).map(line =>(line ,1))      .keyBy(0).window(new WindowAssigner[Object,TimeWindow] {      override def isEventTime = false      override def getDefaultTrigger(env: environment.StreamExecutionEnvironment) = {        ProcessingTimeTrigger.create()      }      override def assignWindows(element: Object, timestamp: Long, context: WindowAssigner.WindowAssignerContext) = {        val windows = new util.ArrayList[TimeWindow](7)        //每隔1分钟统计历史5分钟的数据        val size =1000L * 60 * 5        val slide = 1000L * 60        val lastStart = timestamp - timestamp % slide        var start = lastStart        while ( {          start > timestamp - size        })        {          start -= slide          windows.add(new TimeWindow(start, start + size))        }        //每隔1分钟统计历史1分钟的数据        val size1 =1000L * 60        val lastStart1 = timestamp - timestamp % slide        println(timestamp % slide)        var start1 = lastStart1        while ( {          start1 > timestamp - size1        })        {          windows.add(new TimeWindow(start1, start1 + size1))          start1 -= slide        }        windows      }      override def getWindowSerializer(executionConfig: ExecutionConfig) = new TimeWindow.Serializer    }).sum(1)    .print()    env.execute()  }}

总结

WindowAssigner主要是把数据分发到不同的window窗口中去,然后每个window自己内部设置触发器,当数据还没有触发之前整个数据是存储在flink的state中,也就是状态存储。当window触发(Trigger的返回结果可以是)之后,Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。

"Flink的窗口机制介绍"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0