千家信息网

第五章 Flink Complex Event Processing 复杂事件处理

发表于:2024-11-27 作者:千家信息网编辑
千家信息网最后更新 2024年11月27日,转载需标明出处 mythmoon@163.comComplex Event Processing 复杂事件处理In the previous chapter, we talked about the
千家信息网最后更新 2024年11月27日第五章 Flink Complex Event Processing 复杂事件处理

转载需标明出处 mythmoon@163.com

Complex Event Processing 复杂事件处理

In the previous chapter, we talked about the Table API provided by Apache Flink and how we can use it to process relational data structures. This chapter onwards, we will start learning more about the libraries provided by Apache Flink and how we can use them for specific use cases. To start with, let's try to understand a library called Comrlex Event Processing (CEP). CEP is a very interesting but complex topic that has its value in various industries. Wherever there is a stream of events expected, naturally people want to perform complex event processing in all such use cases. Let's try to understand what CEP is all about. 在前一章中, 我们讨论了Apache Flink 提供的表 api, 以及如何使用它来处理关系数据结构。本章之后, 我们将开始了解有关 apacheflink 提供的库的更多信息, 以及如何将它们用于特定的用例。首先, 让我们尝试了解一个名为 Comrlex Event Processing (CEP) 的库。cep 是一个非常有趣但复杂的话题, 在各个行业都有其价值。只要有预期的事件流, 人们自然希望在所有此类用例中执行复杂的事件处理。让我们试着了解 cep 的意义。

What is complex event processing? 什么是复杂事件处理?

CEP analyzes streams of disparate events occurring with high frequency and low latency. These days, streaming events can be found in various industries, for example: cep 分析以高频和低延迟发生的不同事件流。如今, 流媒体事件可以在不同的行业中找到, 例如:

In the oil and gas domain, sensor data comes from various drilling tools or from upstream oil pipeline equipment 在石油和天然气领域, 传感器数据来自各种钻井工具或上游石油管道设备

In the security domain, activity data, malware information, and usage pattern data come from various end points 在安全域中, 活动数据、恶意软件信息和使用模式数据来自不同的端点

In the wearable domain, data comes from various wrist bands with information about your heart beat rate, your activity, and so on 在可穿戴领域, 数据来自不同的腕带, 其中包含有关您的心跳率、活动等信息

In the banking domain, data comes from credit card usage, banking activities, and so on 在银行领域, 数据来自信用卡使用、银行活动等


It is very important to analyze variation patterns to get notified in real time about any change in the regular assembly. CEP can understand patterns across the streams of events, sub-events, and their sequences. CEP helps to identify meaningful patterns and complex relationships among unrelated events, and sends notifications in real and near real time to prevent damage: 分析变体模式以实时获得有关常规程序集中任何更改的通知是非常重要的。cep 可以了解跨事件流、子事件及其序列的模式。cep 有助于识别不相关事件之间有意义的模式和复杂关系, 并实时和近实时发送通知, 以防止损坏:

The preceding diagram shows how the CEP flow works. Even though the flow looks simple, CEP has various abilities such as: 上图显示了 cep 流的工作原理。尽管流看起来很简单, cep 也有各种能力, 例如:

The ability to produce results as soon as the input event stream is available在输入事件流可用时生成结果的能力

The ability to provide computations such as aggregation over time and timeout between two events of interest提供计算 (如随时间的聚合和两个感兴趣的事件之间的超时) 的能力

The ability to provide real-timeInear real-time alerts and notifications on detection of complex event patterns能够提供实时输入实时警报和通知, 用于检测复杂事件模式

The ability to connect and correlate heterogeneous sources and analyze patterns in them连接异构源并将其关联并分析其中模式的能力

The ability to achieve high-throughput, low-latency processing实现高吞吐量、低延迟处理的能力

There are various solutions available on the market. With big data technology advancements, we have multiple options like Apache Spark, Apache Samza, Apache Beam, among others, but none of them have a dedicated library to fit all solutions. Now let us try to understand what we can achieve with Flink's CEP library.

市场上有各种各样的解决方案。随着大数据技术的进步, 我们有多种选择, apache spark, apache samza, apache beam , 但没有一个专用的库, 以适应所有的解决方案。现在, 让我们尝试了解 flink cep 库可以实现什么。


Flink CEP

Apache Flink provides the Flink CEP library, which provides APIs to perform complex event processing. The library consists of the following core components: apache flink 提供 flink cep 库, 该库提供用于执行复杂事件处理的 api。该库由以下核心组件组成:

Event stream

Pattern definition 模式定义

Pattern detection 模式检测

Alert generation 警告生成

Flink CEP works on Flink's streaming API called DataStream. A programmer needs to define the pattern to be detected from the stream of events and then Flink's CEP engine detects the pattern and takes the appropriate action, such as generating alerts. flink cep 适用于 flink 名为 datastream 的流媒体 api。程序员需要定义要从事件流中检测到的模式, 然后 flink 的 cep 引擎检测到该模式并采取适当的操作, 例如生成警报。

In order to get started, we need to add the following Maven dependency:

org.apache.flink

flink-streaming-java_2.11

1.1.4

org.apache.flink

flink-streaming-scala_2.11

1.1.4


org.apache.flink

flink-connector-kafka-0.9_2.11

1.1.4

Next we need to do following things for using Kafka.

First we need to define a custom Kafka deserializer. This will read bytes from a Kafka topic and convert it into TemperatureEvent. The following is the code to do this.

EventDeserializationSchema.java:

package com.demo.chapter05;

import java.io.IOException;

import java.nio.charset.StandardCharsets;

import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor;

import org.apache.flink.streaming.util.serialization.DeserializationSchema;

public class EventDeserializationSchema implements DeserializationSchema {

public TypeInformation getProducedType() { return TypeExtractor.getForClass(TemperatureEvent.class);

}

public TemperatureEvent deserialize(byte[] arg0) throws IOException { String str = new String(arg0, StandardCharsets.UTF_8);

String[] parts = str.split("=");

return new TemperatureEvent(parts[0], Double.parseDouble(parts[1]));

}

public boolean isEndOfStream(TemperatureEvent arg0) { return false;

}

}

Next we create topics in Kafka called temperature:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication- factor 1 --partitions 1 --topic temperature


Now we move to Java code which would listen to these events in Flink streams: 现在, 我们转到 java 代码, 它将侦听 flink 流中的这些事件:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test");

DataStream inputEventStream = env.addSource(

new FlinkKafkaConsumer09("temperature", new EventDeserializationSchema(), properties));

Next we will define the pattern to check if the temperature is greater than 26.0 degrees Celsius within 10 seconds:

Pattern warningPattern = Pattern. begin("first").subtype(TemperatureEvent.class).where(new FilterFunction() {

private static final long serialVersionUID = 1L;

public boolean filter(TemperatureEvent value) { if (value.getTemperature() >= 26.0) {

return true;

}

return false;

}

}).within(Time.seconds(10));

Next match this pattern with the stream of events and select the event. We will also add up the alert messages into results stream as shown here:

DataStream patternStream = CEP.pattern(inputEventStream, warningPattern)

.select(new PatternSelectFunction() { private static final long serialVersionUID = 1L;

public Alert select(Map event) throws Exception {

return new Alert("Temperature Rise Detected:" + event.get("first").getTemperature()

+ " on machine name:" + event.get("first").getMachineName());

}

});


In order to know what the alerts were generated, we will print the results:

patternStream.print();

And we execute the stream:

env.execute("CEP on Temperature Sensor");

Now we are all set to execute the application. As and when we get messages in Kafka topics, the CEP will keep on executing.

The actual execution will looks like the following. Here is how we can provide sample input:

xyz=21.0 xyz=30.0 LogShaft=29.3 Boiler=23.1 Boiler=24.2 Boiler=27.0 Boiler=29.0

Here is how the sample output will look like:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1010488393]

10/09/2016

18:15:55

Job execution switched to status RUNNING.

10/09/2016

18:15:55

Source: Custom Source(1/4) switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(1/4) switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(2/4) switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(2/4) switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(3/4) switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(3/4) switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(4/4) switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(4/4) switched to DEPLOYING

10/09/2016

18:15:55

CEPPatternOperator(1/1) switched to SCHEDULED

10/09/2016

18:15:55

CEPPatternOperator(1/1) switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink: Unnamed(1/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink: Unnamed(1/4) switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink: Unnamed(2/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink: Unnamed(2/4) switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink: Unnamed(3/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink: Unnamed(3/4) switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink: Unnamed(4/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink: Unnamed(4/4) switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(2/4) switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source(3/4) switched to RUNNING

10/09/2016

18:15:55

Map -> Sink: Unnamed(1/4) switched to RUNNING

10/09/2016

18:15:55

Map -> Sink: Unnamed(2/4) switched to RUNNING


10/09/2016

18:15:55

Map -> Sink: Unnamed(3/4) switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source(4/4) switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source(1/4) switched to RUNNING

10/09/2016

18:15:55

CEPPatternOperator(1/1) switched to RUNNING

10/09/2016

18:15:55

Map -> Sink: Unnamed(4/4) switched to RUNNING

1> Alert [message=Temperature Rise Detected:30.0 on machine name:xyz]

2> Alert [message=Temperature Rise Detected:29.3 on machine name:LogShaft] 3> Alert [message=Temperature Rise Detected:27.0 on machine name:Boiler] 4> Alert [message=Temperature Rise Detected:29.0 on machine name:Boiler]

We can also configure a mail client and use some external web hook to send e-mail or messenger notifications.

Summary 小结

In this chapter, we learnt about CEP. We discussed the challenges involved and how we can use the Flink CEP library to solve CEP problems. We also learnt about Pattern API and the various operators we can use to define the pattern. In the final section, we tried to connect the dots and see one complete use case. With some changes, this setup can be used as it is present in various other domains as well. 在本章中, 我们了解了 cep。我们讨论了所涉及的挑战以及如何使用 flink cep 库来解决 cep 问题。我们还了解了模式 api 和各种运算符, 我们可以使用来定义模式。在最后一节中, 我们尝试连接点, 并看到一个完整的用例。通过一些更改, 此设置也可以在其他各种域中使用。

In the next chapter, we will see how to use Flink's built-in Machine Learning library to solve complex problems.


0