千家信息网

第二章 Data Processing Using the DataStream API

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,第二章 Data Processing Using the DataStream API (中英对照)转载需标明出处 mythmoon@163.comReal-time analytics is cu
千家信息网最后更新 2025年01月23日第二章 Data Processing Using the DataStream API

第二章 Data Processing Using the DataStream API (中英对照)

转载需标明出处 mythmoon@163.com

Real-time analytics is currently an important issue. Many different domains need to process data in real time. So far there have been multiple technologies trying to provide this capability. Technologies such as Storm and Spark have been on the market for a long time now. Applications derived from the Internet of Things (IoT) need data to be stored, processed, and analyzed in real or near real time. In order to cater for such needs, Flink provides a streaming data processing API called DataStream API. 实时分析目前是一个重要问题。许多不同的域需要实时处理数据。到目前为止, 已经有多种技术试图提供这种能力。Storm和Spark等技术已经上市很长时间了。从物联网 (IoT) 派生的应用程序需要实时或近实时地存储、处理和分析数据。为了满足这些需求, Flink提供了一个名为DataStream API 的流式数据处理 API。

In this chapter, we are going to look at the details relating to DataStream API, covering the following topics: 在本章中, 我们将介绍与 datastream api 相关的详细信息, 其中包括以下主题:

l Execution environment 执行环境

l Data sources 数据源

l Transformations 转化

l Data sinks 数据汇聚

l Connectors 连接器

l Use case - sensor data analytics ­例--­数据传感器分析


Any Flink program works on a certain defined anatomy as follows: 任何Flink程序的工作原理在某一定义的分析, 如下所示:


We will be looking at each step and how we can use DataStream API with this anatomy. 我们将研究每个步骤, 以及如何在这个分析结构中使用DataStream API。

Execution environment

In order to start writing a Flink program, we first need to get an existing execution environment or create one. Depending upon what you are trying to do, Flink supports: 为了开始编写Flink程序, 我们首先需要获得一个现有的执行环境或创建一个执行环境。根据您要执行的操作, Flink支持:

l Getting an already existing Flink environment获取已存在的 Flink环境

l Creating a local environment创建本地环境

l Creating a remote environment创建远程环境

Typically, you only need to use getExecutionEnvironment(). This will do the right thing based on your context. If you are executing on a local environment in an IDE then it will start a local execution environment. Otherwise, if you are executing the JAR then the Flink cluster manager will execute the program in a distributed manner. 通常, 您只需要使用 "getExecutionEnvironment()" 。根据您的上下文, 这将执行正确的操作。如果在 IDE中的本地环境上执行, 则它将启动本地执行环境。否则, 如果您正在执行 JAR, 则 Flink群集管理器将以分布式方式执行程序。

If you want to create a local or remote environment on your own then you can also choose do so by using methods such as createLocalEnvironment() and createRemoteEnvironment (String host, int port, String, and .jar files).

如果要自己选择本地或远程环境, 还可以通过使用 createLocalEnvironment () 和createRemoteEnvironment (String host, int port, String, and .jar files). 等方法来设置。


Data sources 数据源

Sources are places where the Flink program expects to get its data from. This is a second step in the Flink program's anatomy. Flink supports a number of pre-implemented data source functions. It also supports writing custom data source functions so anything that is not supported can be programmed easily. First let's try to understand the built-in source functions. 源是Flink程序希望从中获取数据的地方。这是Flink程序解剖的第二步。Flink支持许多预先实现的数据源函数。它还支持编写自定义数据源函数, 以便可以轻松地对任何不受支持的内容进行编程。首先, 让我们尝试了解内置的源函数。

Socket-based基于套接字

DataStream API supports reading data from a socket. You just need to specify the host and port to read the data from and it will do the work: DataStream API支持从套接字读取数据。您只需指定要从中读取数据的主机和端口, 它就可以完成以下工作:

socketTextStream(hostName, port);

You can also choose to specify the delimiter: 您还可以选择指定分隔符:

socketTextStream(hostName,port,delimiter)

You can also specify the maximum number of times the API should try to fetch the data: 您还可以指定API应尝试获取数据的最大次数:

socketTextStream(hostName,port,delimiter, maxRetry)

File-based基于文件

You can also choose to stream data from a file source using file-based source functions in Flink. You can use readTextFile(String path) to stream data from a file specified in the path. By default it will read TextInputFormat and will read strings line by line. 您还可以选择使用Flink中基于文件的源函数从文件源流数据。可以使用readTextFile(String path)从路径中指定的文件流式传输数据。默认情况下, 它将读取TextInputFormat, 并将逐行读取字符串。

If the file format is other than text, you can specify the same using these functions: 如果文件格式不是文本, 则可以使用以下函数指定相同的格式:

readFile(FileInputFormat inputFormat, String path)

Flink also supports reading file streams as they are produced using the readFileStream()

function: Flink还支持读取使用readFileStream() 函数生成的文件流:

readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType)


You just need to specify the file path, the polling interval in which the file path should be polled, and the watch type. Watch types consist of three types: 您只需指定文件路径、应轮询文件路径的轮询间隔以及监视类型。监视类型由三种类型组成:

FileMonitoringFunction.WatchType.ONLY_NEW_FILES is used when the system should process only new files FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED is used when the system should process only appended contents of files FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED is used when the system should re-process not only the appended contents of files but also the previous content in the file

If the file is not a text file, then we do have an option to use following function, which lets us define the file input format: 如果该文件不是文本文件, 那么我们确实有一个选项来使用以下函数, 这使我们可以定义文件输入格式:

readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

Internally, it divides the reading file task into two sub-tasks. One sub task only monitors the file path based on the WatchType given. The second sub-task does the actual file reading in parallel. The sub-task which monitors the file path is a non-parallel sub-task. Its job is to keep scanning the file path based on the polling interval and report files to be processed, split the files, and assign the splits to the respective downstream threads: 在内部, 它将读取文件任务分为两个子任务。一个子任务仅监视基于给定的监视类型的文件路径。第二个子任务并行执行实际的文件读取。监视文件路径的子任务是非并行子任务。它的工作是根据要处理的轮询间隔和报告文件继续扫描文件路径, 拆分文件, 并将拆分分配给各自的下游线程:


Transformations转化

Data transformations transform the data stream from one form into another. The input could be one or more data streams and the output could also be zero, or one or more data streams. Now let's try to understand each transformation one by one. 数据转换将数据流从一种形式转换为另一种形式。输入可以是一个或多个数据流, 输出也可以是零, 也可以是一个或多个数据流。现在, 让我们尝试一个接一个地理解每个转换。

Map映射

This is one of the simplest transformations, where the input is one data stream and the output is also one data stream. 这是最简单的转换之一, 其中输入是一个数据流, 输出也是一个数据流。

In Java:

inputStream.map(new MapFunction() {

@Override

publicInteger map(Integer value) throws Exception { return 5 * value;

}

});

In Scala:

inputStream.map { x => x * 5 }

FlatMap

FlatMap takes one record and outputs zero, one, or more than one record. 平面地图获取一条记录并输出零、一条或多条记录。

In Java:

inputStream.flatMap(new FlatMapFunction() {

@Override

public void flatMap(String value, Collector out) throws Exception {

for(String word: value.split(" ")){ out.collect(word);

}

}

});

In Scala:

inputStream.flatMap { str => str.split(" ") }


Filter

Filter functions evaluate the conditions and then, if they result as true, only emit the record. Filter functions can output zero records. 筛选函数计算条件, 然后, 如果它们的结果为 true, 则只发出记录。筛选功能可以输出零记录。

In Java:

inputStream.filter(new FilterFunction() {

@Override

public boolean filter(Integer value) throws Exception { return value != 1;

}

});

In Scala:

inputStream.filter { _ != 1 }

KeyBy

KeyBy logically partitions the stream-based on the key. Internally it uses hash functions to partition the stream. It returns KeyedDataStream. 在逻辑上对键上基于流的分区。在内部, 它使用哈希函数对流进行分区。它返回KeyedDataStream

In Java:

inputStream.keyBy("someKey");

In Scala:

inputStream.keyBy("someKey")

Reduce归约

Reduce rolls out the KeyedDataStream by reducing the last reduced value with the current value. The following code does the sum reduce of a KeyedDataStream. 通过归约当前值与当前值的关系来归约KeyedDataStream的滚动。下面的代码执行KeyedDataStream的总和归约

In Java:

keyedInputStream. reduce(new ReduceFunction() {

@Override

public Integer reduce(Integer value1, Integer value2) throws Exception {

return value1 + value2;

}


});

In Scala:

keyedInputStream. reduce { _ + _ }

Fold折叠

Fold rolls out the KeyedDataStream by combining the last folder stream with the current record. It emits a data stream back. 折叠通过将最后一个文件与当前记录组合来滚动KeyedDataStream。它将数据流发出回来。

In Java:

keyedInputStream keyedStream.fold("Start", new FoldFunction() {

@Override

public String fold(String current, Integer value) { return current + "=" + value;

}

});

In Scala:

keyedInputStream.fold("Start")((str, i) => { str + "=" + i })

The preceding given function when applied on a stream of (1,2,3,4,5) would emit a stream like this: Start=1=2=3=4=5

Aggregations聚合

DataStream API supports various aggregations such as min, max, sum, and so on. These functions can be applied on KeyedDataStream in order to get rolling aggregations. DataStream API支持各种聚合,如MIN、MAX、SUM等。这些函数应用于KeyedDataStream以获得滚动聚合。

In Java:

keyedInputStream.sum(0) keyedInputStream.sum("key") keyedInputStream.min(0) keyedInputStream.min("key") keyedInputStream.max(0) keyedInputStream.max("key") keyedInputStream.minBy(0) keyedInputStream.minBy("key") keyedInputStream.maxBy(0) keyedInputStream.maxBy("key")


In Scala:

keyedInputStream.sum(0) keyedInputStream.sum("key") keyedInputStream.min(0) keyedInputStream.min("key") keyedInputStream.max(0) keyedInputStream.max("key") keyedInputStream.minBy(0) keyedInputStream.minBy("key") keyedInputStream.maxBy(0) keyedInputStream.maxBy("key")

The difference between max and maxBy is that max returns the maximum value in a stream but maxBy returns a key that has a maximum value. The same applies to min and minBy. maxmaxBy之间的区别在于, 最大值返回流中的最大值, 但maxBy返回具有最大值的键。这同样适用于最小和最小的。

Window窗口

The window function allows the grouping of existing KeyedDataStreams by time or other conditions. The following transformation emits groups of records by a time window of 10 seconds. window函数允许按时间或其他条件对现有的KeyedDataStreams进行分组。以下转换以10秒的时间窗口发出记录组。

In Java:

inputStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10)));

In Scala:

inputStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10)))

Flink defines slices of data in order to process (potentially) infinite data streams. These slices are called windows. This slicing helps processing data in chunks by applying transformations. To do windowing on a stream, we need to assign a key on which the distribution can be made and a function which describes what transformations to perform on a windowed stream. Flink定义数据切片, 以便处理 (可能) 无限数据流。这些切片称为窗口。此切片有助于通过应用转换来处理块中的数据。要在流上进行窗口处理, 我们需要分配一个可以在其上进行分布的键和一个描述要在窗口流上执行的转换的函数。

To slice streams into windows, we can use pre-implemented Flink window assigners. We have options such as, tumbling windows, sliding windows, global and session windows. Flink also allows you to write custom window assigners by extending WindowAssginer class. Let's try to understand how these various assigners work.

要将流切片到窗口中, 我们可以使用预实现的 Flink窗口分配程序。我们有各种选择, 如翻滚窗口、滑动窗口、全局和会话窗口。Flink还允许您通过扩展WindowAssginer类编写自定义窗口分配程序。让我们试着了解这些不同的分配程序是如何工作的。


Global windows

Global windows are never-ending windows unless specified by a trigger. Generally in this case, each element is assigned to one single per-key global Window. If we don't specify any trigger, no computation will ever get triggered. 全局窗口是永无止境的窗口, 除非由触发器指定。通常在这种情况下, 每个元素都分配给一个per-key全局窗口。如果我们不指定任何触发器, 任何计算都不会被触发。

Tumbling windows

Tumbling windows are created based on certain times. They are fixed-length windows and non over lapping. Tumbling windows should be useful when you need to do computation of elements in specific time. For example, tumbling window of 10 minutes can be used to compute a group of events occurring in 10 minutes time. 翻滚窗口是根据特定时间创建的。它们是固定长度的窗口, 非重叠。在特定时间内计算元素时, 翻滚窗口应该是有用的。例如, 10分钟的翻滚窗口可用于计算10分钟内发生的一组事件。

Sliding windows

Sliding windows are like tumbling windows but they are overlapping. They are fixed- length windows overlapping the previous ones by a user given window slide parameter. This type of windowing is useful when you want to compute something out of a group of events occurring in a certain time frame. 滑动窗口就像翻滚的窗口, 但它们是重叠的。它们是固定长度的窗口, 由给定的窗口幻灯片参数与以前的窗口重叠。当您要从在特定时间范围内发生的一组事件中计算某些内容时, 这种类型的窗口非常有用。

Session windows

Session windows are useful when windows boundaries need to be decided upon the input data. Session windows allows flexibility in window start time and window size. We can also provide session gap configuration parameter which indicates how long to wait before considering the session in closed. 当需要根据输入数据确定窗口边界时, 会话窗口非常有用。会话窗口允许在窗口开始时间和窗口大小方面具有灵活性。我们还可以提供会话间隙配置参数, 该参数指示在结束时考虑会话之前需要等待多长时间。

WindowAll

The windowAll function allows the grouping of regular data streams. Generally this is a non-parallel data transformation as it runs on non-partitioned streams of data. windowAll函数允许对常规数据流进行分组。通常, 这是一个非并行数据转换, 因为它在非分区数据流上运行。

In Java:

inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)));

In Scala:

inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))


Similar to regular data stream functions, we have window data stream functions as well. The only difference is they work on windowed data streams. So window reduce works like the Reduce function, Window fold works like the Fold function, and there are aggregations as well. 与常规数据流函数类似, 我们也有窗口数据流函数。唯一不同的是, 它们适用于窗口数据流。因此, 窗口减少的工作方式类似于 "归约函数, "窗口折叠" 的工作方式类似于 "折叠" 函数, 并且还有聚合。

Union

The Union function performs the union of two or more data streams together. This does the combining of data streams in parallel. If we combine one stream with itself then it outputs each record twice. Union函数将两个或多个数据流合并在一起。这将并行合并数据流。如果我们将一个流与自身组合在一起, 则它将两次输出每个记录。

In Java:

inputStream. union(inputStream1, inputStream2, ...);

In Scala:

inputStream. union(inputStream1, inputStream2, ...)

Window join

We can also join two data streams by some keys in a common window. The following example shows the joining of two streams in a Window of 5 seconds where the joining condition of the first attribute of the first stream is equal to the second attribute of the other stream. 我们还可以通过公共窗口中的某些键连接两个数据流。下面的示例演示在5秒的窗口中连接两个流, 其中第一个流的第一个属性的连接条件等于另一个流的第二个属性。

In Java:

inputStream. join(inputStream1)

.where(0).equalTo(1)

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

.apply (new JoinFunction () {...});

In Scala:

inputStream. join(inputStream1)

.where(0).equalTo(1)

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

.apply { ... }


Split

This function splits the stream into two or more streams based on the criteria. This can be used when you get a mixed stream and you may want to process each data separately. 此函数根据条件将流拆分为两个或多个流。当您获得混合流, 并且您可能希望分别处理每个数据时, 可以使用此方法。

In Java:

SplitStream split = inputStream.split(new OutputSelector() {

@Override

public Iterable select(Integer value) { List output = new ArrayList(); if (value % 2 == 0) {

output.add("even");

}

else {

output.add("odd");


}

});

In Scala:


}

return output;


val split = inputStream.split( (num: Int) =>

(num % 2) match {

case 0 => List("even") case 1 => List("odd")

}

)

Select

This function allows you to select a specific stream from the split stream. 此函数允许您从拆分流中选择特定的流。

In Java:

SplitStream split;

DataStream even = split.select("even"); DataStream odd = split.select("odd"); DataStream all = split.select("even","odd");


In Scala:

val even = split select "even" val odd = split select "odd"

val all = split.select("even","odd")

Project

The Project function allows you to select a sub-set of attributes from the event stream and only sends selected elements to the next processing stream. Project函数允许您从事件流中选择属性的子集, 并且仅将选定的元素发送到下一个处理流。

In Java:

DataStream> in = // [...] DataStream> out = in.project(3,2);

In Scala:

val in : DataStream[(Int,Double,String)] = // [...] val out = in.project(3,2)

The preceding function selects the attribute numbers 2 and 3 from the given records. The following is the sample input and output records: 前面的函数从给定的记录中选择属性编号2和3。以下是示例输入和输出记录:

(1,10.0, A, B )=> (B,A)

(2,20.0, C, D )=> (D,C)

Physical partitioning

Flink allows us to perform physical partitioning of the stream data. You have an option to provide custom partitioning. Let us have a look at the different types of partitioning. Flink允许我们对流数据执行物理分区。您可以选择提供自定义分区。让我们来看看不同类型的分区。

Custom partitioning

As mentioned earlier, you can provide custom implementation of a partitioner. 如前所述, 您可以提供分区程序的自定义实现。

In Java:

inputStream.partitionCustom(partitioner, "someKey"); inputStream.partitionCustom(partitioner, 0)


In Scala:

inputStream.partitionCustom(partitioner, "someKey") inputStream.partitionCustom(partitioner, 0)

While writing a custom partitioner you need make sure you implement an efficient hash function. 在编写自定义分区程序时, 您需要确保实现有效的哈希函数。

Random partitioning

Random partitioning randomly partitions data streams in an evenly manner. 随机分区以均匀的方式随机对数据流进行分区。

In Java:

inputStream.shuffle();

In Scala:

inputStream.shuffle()

Rebalancing partitioning

This type of partitioning helps distribute the data evenly. It uses a round robin method for distribution. This type of partitioning is good when data is skewed. 这种类型的分区有助于均匀地分配数据。它使用循环方法进行分发。当数据倾斜时, 这种类型的分区是好的。

In Java:

inputStream.rebalance();

In Scala:

inputStream.rebalance()

Rescaling

Rescaling is used to distribute the data across operations, perform transformations on sub- sets of data and combine them together. This rebalancing happens over a single node only, hence it does not require any data transfer across networks.

重新缩放用于跨操作分发数据, 对数据子集执行转换, 并将它们组合在一起。这种重新平衡只发生在单个节点上, 因此不需要任何跨网络的数据传输。


The following diagram shows the distribution:

In Java:

inputStream.rescale();

In Scala:

inputStream.rescale()

Broadcasting

Broadcasting distributes all records to each partition. This fans out each and every element to all partitions. 广播将所有记录分配给每个分区。这传播到所有分区的每个元素。

In Java:

inputStream.broadcast();

In Scala:

inputStream.broadcast()


Data sinks

After the data transformations are done, we need to save results into some place. The following are some options Flink provides us to save results: 数据转换完成后, 我们需要将结果保存到某个位置。以下是Flink为我们保存结果提供的一些选项:

writeAsText(): Writes records one line at a time as strings. 写入记录一次写入一行字符串。

writeAsCsV(): Writes tuples as comma separated value files. Row and fields delimiter can also be configured. 将元组写入逗号分隔的值文件。还可以配置行和字段分隔符。

print()/printErr(): Writes records to the standard output. You can also choose to write to the standard error. 将记录写入标准输出。您还可以选择写入标准错误。

writeUsingOutputFormat(): You can also choose to provide a custom output format. While defining the custom format you need to extend the OutputFormat which takes care of serialization and deserialization. 您还可以选择提供自定义输出格式。在定义自定义格式时, 您需要扩展OutputFormat, 以处理序列化和反序列化。

writeToSocket(): Flink supports writing data to a specific socket as well. It is required to define SerializationSchema for proper serialization and formatting. Flink也支持将数据写入特定的套接字。它需要定义SerializationSchema, 以便进行适当的序列化和格式化。

Event time and watermarks

Flink Streaming API takes inspiration from Google Data Flow model. It supports different concepts of time for its streaming API. In general, there three places where we can capture time in a streaming environment. They are as followsFlink Streaming API从 google 数据流模型中获得灵感。它支持streaming API的不同时间概念。一般来说, 有三个地方, 我们可以在流媒体环境中捕获时间。它们如下所示

Event time

The time at which event occurred on its producing device. For example in IoT project, the time at which sensor captures a reading. Generally these event times needs to embed in the record before they enter Flink. At the time processing, these timestamps are extracted and considering for windowing. Event time processing can be used for out of order events.

事件发生在其生产设备上的时间。例如, 在物联网项目中, 传感器捕获读数的时间。通常, 这些事件时间需要在进入 flink 之前嵌入到记录中。在处理时, 提取这些时间戳并考虑窗口。事件时间处理可用于无序事件。


Processing time

Processing time is the time of machine executing the stream of data processing. Processing time windowing considers only that timestamps where event is getting processed.

Processing time is simplest way of stream processing as it does not require any synchronization between processing machines and producing machines. In distributed asynchronous environment processing time does not provide determinism as it is dependent on the speed at which records flow in the system. 处理时间是机器执行数据处理流的时间。处理时间窗口只考虑处理事件的时间戳。处理时间是最简单的流处理方式, 因为它不需要处理计算机和生产机器之间的任何同步。在分布式异步环境中, 处理时间不提供确定性, 因为它依赖于记录在系统中的流动速度。

Ingestion time

This is time at which a particular event enters Flink. All time based operations refer to this timestamp. Ingestion time is more expensive operation than processing but it gives predictable results. Ingestion time programs cannot handle any out of order events as it assigs timestamp only after the event is entered the Flink system. 此时是特定事件进入Flink的时间。所有基于时间的操作都引用此时间戳。与处理相比, 摄入时间更昂贵, 但它能提供可预测的结果。摄入时间程序不能处理任何无序事件, 因为它只在事件进入 Flink系统后才进行时间戳。

Here is an example which shows how to set event time and watermarks. In case of ingestion time and processing time, we just need to the time characteristics and watermark generation is taken care automatically. Following is a code snippet for the same. 下面是一个示例, 演示如何设置事件时间和水印。在摄入时间和处理时间的情况下, 只需要对时间特征和水印生成进行自动处理。下面是相同的代码段。

In Java:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

//or env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

In Scala:

val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

//or env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)


In case of event time stream programs, we need to specify the way to assign watermarks and timestamps. There are two ways of assigning watermarks and timestamps: 在事件时间流程序的情况下, 我们需要指定分配水印和时间戳的方法。有两种方法可以分配水印和时间戳:

Directly from data source attribute Using a timestamp assigner

To work with event time streams, we need to assign the time characteristic as follows 要处理事件时间流, 我们需要按如下方式分配时间特征

In Java:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime;

In Scala:

val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

It is always best to store event time while storing the record in source. Flink also supports some pre-defined timestamp extractors and watermark generators. Refer to https://ci.ap ache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractor s.html.


Connectors

Apache Flink supports various connectors that allow data readIwrites across various technologies. Let's learn more about this. Apache Flink 支持允许跨各种技术进行数据读写的各种连接器。让我们了解更多关于这一点。

Kafka connector

Kafka is a publish-subscribe, distributed, message queuing system that allows users to publish messages to a certain topic; this is then distributed to the subscribers of the topic. Flink provides options to define a Kafka consumer as a data source in Flink Streaming. In order to use the Flink Kafka connector, we need to use a specific JAR file.

Kafka是一个发布-订阅, 分布式, 消息排队系统, 允许用户发布消息到某一主题;然后将其分发给该主题的订阅者。Flink提供了将卡夫卡使用者定义为 Flink Streaming中的数据源的选项。为了使用 Flink Kafka连接器, 我们需要使用特定的 JAR文件。


The following diagram shows how the Flink Kafka connector works: 下图显示了 Flink Kafka连接器的工作原理:

We need to use the following Maven dependency to use the connector. I have been using Kafka version 0.9 so I will be adding the following dependency in pom.xml: 我们需要使用以下 maven 依赖项来使用连接器。我一直在使用Kafka版本 0.9, 所以我将在 pom. xml 中添加以下依赖项:

org.apache.flink

flink-connector-kafka-0.9_2.11/artifactId>

1.1.4

Now let's try to understand how to use the Kafka consumer as the Kafka source现在让我们尝试了解如何使用Kafka消费者作为Kafka源:

In Java:

Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test");

DataStream input = env.addSource(new FlinkKafkaConsumer09("mytopic", new SimpleStringSchema(), properties));

In Scala:

val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092");

// only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test");

stream = env

.addSource(new FlinkKafkaConsumer09[String]("mytopic", new


SimpleStringSchema(), properties))

.print

In the preceding code, we first set the properties of the Kafka host and the zookeeper host and port. Next we need to specify the topic name, in this case mytopic. So if any messages get published to the mytopic topic, they will be processed by the Flink streams. 在前面的代码中, 我们首先设置了Kafka主机和zookeeper主机和端口的属性。接下来, 在这种情况下我们需要指定主题名称mytopic。因此, 如果任何消息被发布到主题mytopic, 它们将由 Flink流处理。

If you get data in a different format, then you can also specify your custom schema for deserialization. By default, Flink supports string and JSON deserializers. 如果以不同的格式获取数据, 则还可以指定用于反序列化的自定义架构。默认情况下, Flink支持字符串和JSON反序列化器。

In order to enable fault tolerance, we need to enable checkpointing in Flink. Flink is keen on taking snapshots of the state in a periodic manner. In the case of failure, it will restore to the last checkpoint and then restart the processing. 为了启用容错能力, 我们需要在Flink中启用检查点。Flink热衷于定期(周期)状态的快照。在失败的情况下, 它将还原到最后一个检查点, 然后重新启动处理。

We can also define the Kafka producer as a sink. This will write the data to a Kafka topic. The following is a way to write data to a Kafka topic: 我们也可以将Kafka生产定义为槽(通道)。这将把数据写到Kafka主题。以下是一种将数据写入Kafka主题的方法:

In Scala:

stream.addSink(new FlinkKafkaProducer09("localhost:9092", "mytopic", new SimpleStringSchema()));

In Java:

stream.addSink(new FlinkKafkaProducer09[String]("localhost:9092", "mytopic", new SimpleStringSchema()))


Use case - sensor data analytics

Now that we have looked at various aspects of DataStream API, let's try to use these concepts to solve a real world use case. Consider a machine which has sensor installed on it and we wish to collect data from these sensors and calculate average temperature per sensor every five minutes.

现在我们已经研究了 DataStream API的各个方面, 让我们尝试使用这些概念来解决一个真实的世界用例。考虑一台安装了传感器的机器, 我们希望从这些传感器收集数据, 每五分钟计算一次每个传感器的平均温度。


Following would be the architecture:

In this scenario, we assume that sensors are sending information to Kafka topic called temr with information as (timestamp, temperature, sensor-ID). Now we need to write code to read data from Kafka topics and processing it using Flink transformation. 在这种情况下, 我们假设传感器向名为temr的Kafka主题发送信息, 其中包含 (时间戳、温度、sensor-ID) 的信息。现在, 我们需要编写代码来读取Kafka主题中的数据, 并使用 Kafka转换对其进行处理。

Here important thing to consider is as we already have timestamp values coming from sensor, we can use Event Time computations for time factors. This means we would be able to take care of events even if they reach out of order. 这里需要考虑的重要事项是, 由于我们已经有了来自传感器的时间戳值, 我们可以使用事件时间计算来计算时间因素。这意味着, 即使事件处于正常状态, 我们也能处理好这些事件。

We start with simple streaming execution environment which will be reading data from Kafka. Since we have timestamps in events, we will be writing a custom timestamp and watermark extractor to read the timestamp values and do window processing based on that. Here is code snippet for the same. 我们从简单的流执行环境开始, 这将是阅读Kafka的数据。由于事件中有时间戳, 因此我们将编写自定义时间戳和水标记提取器, 以读取时间戳值并在此基础上进行窗口处理。下面是相同的代码段。

// set up the streaming execution environment final StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

// env.enableCheckpointing(5000); nv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092");

properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test");

FlinkKafkaConsumer09 myConsumer = new FlinkKafkaConsumer09<>("temp", new SimpleStringSchema(),

properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());


Here we assume that we receive events in Kafka topics as strings and in the format:

Timestamp,Temperature,Sensor-Id

The following an example code to extract timestamp from record:

public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks {

private static final long serialVersionUID = 1L;

@Override

public long extractTimestamp(String arg0, long arg1) { if (null != arg0 && arg0.contains(",")) {

String parts[] = arg0.split(","); return Long.parseLong(parts[0]);

}

return 0;

}

@Override

public Watermark checkAndGetNextWatermark(String arg0, long arg1) { if (null != arg0 && arg0.contains(",")) {

String parts[] = arg0.split(",");

return new Watermark(Long.parseLong(parts[0]));

}

return null;

}

}

Now we simply created keyed data stream and perform average calculation on temperature values as shown in the following code snippet:

DataStream> keyedStream = env.addSource(myConsumer).flatMap(new Splitter()).keyBy(0)

.timeWindow(Time.seconds(300))

.apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() {

@Override

public void apply(Tuple key, TimeWindow window, Iterable> input, Collector> out) throws Exception {

double sum = 0L; int count = 0;

for (Tuple2 record : input) { sum += record.f1;

count++;

}

Tuple2 result = input.iterator().next(); result.f1 = (sum/count);



}

});


out.collect(result);


When execute the preceding given code, and if proper sensor events are published on Kafka topics then we will get the average temperature per sensor every five minutes. 当执行前面给出的代码时, 如果在Kafka主题上发布了正确的传感器事件, 那么我们将每五分钟获得每个传感器的平均温度。

The complete code is available on GitHub at https://github.com/deshpandetanmay/mast ering-flink/tree/master/chapter02/flink-streaming.

Summary总结

In this chapter, we started with Flink's most powerful API: DataStream API. We looked at how data sources, transformations, and sinks work together. Then we looked at various technology connectors such as ElasticSearch, Cassandra, Kafka, RabbitMQ, and so on.

At the end, we also tried to apply our learning to solve a real-world sensor data analytics use case.

In the next chapter, we are going to learn about another very important API from Flink's ecosystem point of view the DataSet API.

在本章中, 我们从Flink最强大的API: DataStream API开始。我们研究了数据源、转换和接收器是如何协同工作的。然后, 我们看了各种技术连接器, ElasticSearch, Cassandra, Kafka, RabbitMQ, 等等。 最后, 我们还尝试应用我们的学习来解决一个真实的传感器数据分析用例。 在下一章中, 我们将从 Flink的生态系统角度了解另一个非常重要的 内容 DataSet API.



0