千家信息网

如何实现基于Flink实时数据处理

发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,小编给大家分享一下如何实现基于Flink实时数据处理,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!基于Flink 1.11
千家信息网最后更新 2025年01月19日如何实现基于Flink实时数据处理

小编给大家分享一下如何实现基于Flink实时数据处理,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

基于Flink 1.11的网络流量实时解析,主要针对基于Pcap的原始网络流量数据进行TCP/UDP/ICMP的协议数据实时解析,并将解析数据装成数据帧Frame,以便进行实时网络流量分析。

为完成以上功能,需要了解Pcap数据解析、TCP/UDP层协议解析、Flink的序列化和反序列化、Flink自定义函数以及基于Stream sql的Flink实时数据分析。

1、Pcap数据解析

要进行基于Pcap格式的网络流量数据解析,就必须了解Pcap文件格式定义:

如上所示,标准Pcap数据由Pcap文件头、数据桢Frame头、数据桢Frame组成。

在Pcap文件头中:Magic :0x1A2B3C 4D,用于表示Pcap数据的开始;Major:用于标示Pcap数据主版本号;Minor:用于标示Pcap数据次版本号;ThisZone:本地标准时间;SigFigs: 时间戳精度;SnapLen:最大的存储长度;LinkType:链路类型。

在数据桢头中:Timestamp1:时间戳高位,精确到S;Timestamp2:时间戳低位,精确到ms;CapLen:当前数据桢长度;

Len:网络中实际数据桢的长度。

注意:目前LinkType链路类型,支持EN10MB、RAW、LOOP、LINUX_SLI;通过以上基本结构,在Pcap文件头中,我们获取最有用的信息即时LinkType,后面我们需要根据不同的LinkType类型,进行数据桢Frame的解析。

除此之外,根据数据桢头,可以获得数据桢的封装时间;

这里根据以太网数据桢类型为例:也就是Ipv4、Ipv6、ARP数据桢,如上图所示,该类型的数据桢数据部分的偏移是14。如果是Ipv4或者Ipv6的协议类型,可以解析获取Mac地址。接下来,其实就是解析TCP/IP层的协议。

2、TCP/UDP协议解析

(1)、TCP协议

// 获取TCP头大小tcpOrUdpHeaderSize = getTcpHeaderLength(packetData, ipStart + ipHeaderLen);packet.put(Packet.TCP_HEADER_LENGTH, tcpOrUdpHeaderSize);// Store the sequence and acknowledgement numbers --M// 获取TCP 请求序列号packet.put(Packet.TCP_SEQ, PcapReaderUtil.convertUnsignedInt(packetData, ipStart + ipHeaderLen + PROTOCOL_HEADER_TCP_SEQ_OFFSET));// 获取TCP 确认序列号packet.put(Packet.TCP_ACK, PcapReaderUtil.convertUnsignedInt(packetData, ipStart + ipHeaderLen + PROTOCOL_HEADER_TCP_ACK_OFFSET));// Flags stretch two bytes starting at the TCP header offsetint flags = PcapReaderUtil.convertShort(new byte[] { packetData[ipStart + ipHeaderLen + TCP_HEADER_DATA_OFFSET],packetData[ipStart + ipHeaderLen + TCP_HEADER_DATA_OFFSET + 1] })& 0x1FF; // Filter first 7 bits. First 4 are the data offset and the other 3 reserved for future use.packet.put(Packet.TCP_FLAG_NS, (flags & 0x100) == 0 ? false : true);packet.put(Packet.TCP_FLAG_CWR, (flags & 0x80) == 0 ? false : true);packet.put(Packet.TCP_FLAG_ECE, (flags & 0x40) == 0 ? false : true);packet.put(Packet.TCP_FLAG_URG, (flags & 0x20) == 0 ? false : true);packet.put(Packet.TCP_FLAG_ACK, (flags & 0x10) == 0 ? false : true);packet.put(Packet.TCP_FLAG_PSH, (flags & 0x8) == 0 ? false : true);packet.put(Packet.TCP_FLAG_RST, (flags & 0x4) == 0 ? false : true);packet.put(Packet.TCP_FLAG_SYN, (flags & 0x2) == 0 ? false : true);packet.put(Packet.TCP_FLAG_FIN, (flags & 0x1) == 0 ? false : true);

2、UDP协议


tcpOrUdpHeaderSize = UDP_HEADER_SIZE;if (ipProtocolHeaderVersion == 4) {int cksum = getUdpChecksum(packetData, ipStart, ipHeaderLen);if (cksum >= 0)packet.put(Packet.UDP_SUM, cksum);}int udpLen = getUdpLength(packetData, ipStart, ipHeaderLen);packet.put(Packet.UDP_LENGTH, udpLen);

3、Kafka的序列化和反序列化

基于分布式消息队列Kafka作为网络流量数据的中间临时缓存,通过FlinkKafkaConsumer进行网络流数据的解析,这里我们自定义了PcapResover的解析器,使用自定义的解序列化函数PcapDataDeSerializer。

Kafka Producer,负责转发已采集的网络流量,这里配置使用了Kafka内部的序列化类


props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());this.consumer = new FlinkKafkaConsumer<>(this.topic,(KafkaDeserializationSchema)new PcapDataDeSerializer(Object.class),props);public class PcapDataDeSerializer implements KafkaDeserializationSchema {private static final Logger log= LoggerFactory.getLogger(PcapDataDeSerializer.class);private static final long serialVersionUID = 1L;private Class clazz;public PcapDataDeSerializer(Class clazz) {this.clazz=clazz;}List packetList = new ArrayList<>();@Overridepublic boolean isEndOfStream(Object nextElement) {return false;}@Overridepublic Object deserialize(ConsumerRecord record) throws IOException {DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));DataInputStream dataInputStream=new DataInputStream((InputStream) in);PcapReader reader = new PcapReader(dataInputStream);for (Packet packet : reader) {packetList.add(packet);}log.info("finish deserialize pcap data ,"+record.key()+" , topic is "+record.topic()+", "+"partition is "+record.partition()+" , "+" offset is " +record.offset());return JSON.toJSON(packetList);}@Overridepublic TypeInformation getProducedType() {return TypeExtractor.getForClass(this.clazz);}}

PcapDataDeSerializer主要实现KafkaDeserializationSchema中的deserialze即可,在这个函数中,会解析网络流量,并解析的网络流量封装成Pcaket List对象中,进行返回。

KafkaConsumer的创建使用自定义解序列化函数,主要是为了根据1、2 部分对于Pcap网络流量格式的分析,解析网络流量,并封装成数据桢。

4、Flink自定义函数

基于以上创建的FlinkKafkaConsumer,可以配置Flink Stream DAG,DataStreamSouce ->flatMap->Map->Stream

DataStreamSource stream =executionEnvironment.addSource(this.consumer);log.info("start to build pcap dataStream DAG graph , transform packet into frame stream, " +"and default parallelism is 4 !");return stream.flatMap(new FrameFlatMap()).map(new FrameMapFunction()).setParallelism(4);

这里其实返回的是DataStream,也就是说,我们将原始网络流量解析,最后按照数据桢的方式输出数据流,以便与进行数据分析。接下来,为了基于Stream sql做一些数据分析,其实就可以将DataStream注册成临时表视图,然后使用类sql的语法进行实时分析了。

5、Flink实时分析示例

聚合统计10s的窗口内,目的mac地址的计数。当然这里sql的表达方式很多,而且表达能力足够强大。可以根据不同的业务诉求,进行不同的分析。

aggregationSql = "select dstMac,count(1) as c from " + KafkaProperties.FRAME_VIEW_NAME +" group by tumble(PROCTIME() ,interval '10' SECOND) " +", dstMac";

之后就是进行sink了,完成DAG 构建完成,Excute提交任务到集群。


Table result = streamTableEnvironment.sqlQuery(sql);DataStream resultData = streamTableEnvironment.toAppendStream(result, Row.class);resultData.print();

总结一下,基本流程如下图所示:

主要通过配置FlinkKafkaConsumer,实现PcapDataDesrializer负责对Pcap数据包中的Frame进行反序列化处理和解析,形式基于Frame的流数据,之后通过自定义FlatMapFunction、MapFunction函数对流数据进行处理和封装成为原始派生流DataStream。

以上是"如何实现基于Flink实时数据处理"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0