千家信息网

Flink CEP事件处理

发表于:2024-10-21 作者:千家信息网编辑
千家信息网最后更新 2024年10月21日,这篇文章将为大家详细讲解有关Flink CEP事件处理,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。什么是CEP复杂事件处理,允许在无界数据流中检测出特定事件模型单
千家信息网最后更新 2024年10月21日Flink CEP事件处理

这篇文章将为大家详细讲解有关Flink CEP事件处理,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

什么是CEP

复杂事件处理,允许在无界数据流中检测出特定事件模型

单个模式

单个模式指一个模式,可以是一个单例也可以是循环模式。
模式都是单例的,可以通过量词转换成循环模式。每个模式可以有一个或多个条件来决定接受哪些事件。

量词
  1. pattern.oneOrMore():期望给定的事件出现一次或多次

  2. pattern.times(#oftimes):期望一个给定事件出现特定次数的模式

  3. pattern.times(#fromTimes, #toTimes):期望一个给定事件出现次数在一个最小值与最大值中间

  4. pattern.greedy():贪心算法,尽可能多匹配,还不能让模式组贪心

  5. pattern.optional():变为可选

示例:// 期望出现4次start.times(4);// 期望出现0或者4次start.times(4).optional();// 期望出现2、3或者4次start.times(2, 4);// 期望出现2、3或者4次,并且尽可能的重复次数多start.times(2, 4).greedy();// 期望出现0、2、3或者4次start.times(2, 4).optional();// 期望出现0、2、3或者4次,并且尽可能的重复次数多start.times(2, 4).optional().greedy();// 期望出现1到多次start.oneOrMore();// 期望出现1到多次,并且尽可能的重复次数多start.oneOrMore().greedy();// 期望出现0到多次start.oneOrMore().optional();// 期望出现0到多次,并且尽可能的重复次数多start.oneOrMore().optional().greedy();// 期望出现2到多次start.timesOrMore(2);// 期望出现2到多次,并且尽可能的重复次数多start.timesOrMore(2).greedy();// 期望出现0、2或多次start.timesOrMore(2).optional();// 期望出现0、2或多次,并且尽可能的重复次数多start.timesOrMore(2).optional().greedy();
条件

判断事件属性的条件可以是以下方法

  1. pattern.where()

  2. pattern.or()

  3. pattern.until()
    这些方法入参可以是IterativeCondition或SimpleCondition

pattern.subtype方法限制接受事件类型是初始事件的子类型。

  1. 迭代条件IterativeCondition

  2. 简单条件SimpleCondition

  3. 组合条件.where().or()等

  4. 停止条件.until()

组合模式

FlinkCEP支持事件之间如下形式的连续策略

  1. 严格连续:期望所有匹配事件严格的一个接一个出现,中间没有任何不匹配事件

  2. 松散连续:忽略匹配的事件之间的不匹配的事件

  3. 不确定的松散连续:更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配

1. next() 指定严格连续2. followedBy() 指定松散连续3. followedByAny() 不确定松散连续4. notNext() 如果不想后面直接连着一个特定事件5. notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方。ps: 模式序列不能以notFollowedBy()结尾    一个NOT模式前面不能是可选的模式

定义模式一个有效时间约束:pattern.within()方法指定有效时间内发生。
模式序列只能有一个时间限制,如果限制多个时间在不同的模式上,会使用最小的时间限制。

循环模式默认是松散连续,如果合用严格连续,需使用consecutive()方法明确指定。如果想使用不确定松散连续,可以使用allowCombinations()方法
==示例:consecutive==

Pattern.begin("start").where(new SimpleCondition() {  @Override  public boolean filter(Event value) throws Exception {    return value.getName().equals("c");  }}).followedBy("middle").where(new SimpleCondition() {  @Override  public boolean filter(Event value) throws Exception {    return value.getName().equals("a");  }}).oneOrMore().consecutive().followedBy("end1").where(new SimpleCondition() {  @Override  public boolean filter(Event value) throws Exception {    return value.getName().equals("b");  }});输入:C D A1 A2 A3 D A4 B,会产生下面的输出:如果施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B}不施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}

==示例:allowCombinations==

Pattern.begin("start").where(new SimpleCondition() {  @Override  public boolean filter(Event value) throws Exception {    return value.getName().equals("c");  }}).followedBy("middle").where(new SimpleCondition() {  @Override  public boolean filter(Event value) throws Exception {    return value.getName().equals("a");  }}).oneOrMore().allowCombinations().followedBy("end1").where(new SimpleCondition() {  @Override  public boolean filter(Event value) throws Exception {    return value.getName().equals("b");  }});输入:C D A1 A2 A3 D A4 B,会产生如下的输出:如果使用不确定松散连续: {C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}如果不使用:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
模式组

定义一个模式序列作为begin,followedBy,followedByAny和next条件

关于"Flink CEP事件处理"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

0