千家信息网

Apache Flink Task执行之数据流如何处理

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这篇文章主要介绍Apache Flink Task执行之数据流如何处理,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!获取流数据用户提交的代码最终被封装成了org.apache.
千家信息网最后更新 2025年01月24日Apache Flink Task执行之数据流如何处理

这篇文章主要介绍Apache Flink Task执行之数据流如何处理,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

获取流数据

用户提交的代码最终被封装成了org.apache.flink.runtime.taskmanager.Task,Task是一个Runnable因此核心代码就在run方法,run方法调用了doRun方法,在doRun中调用了invokable.invoke(),Task的整个处理流程其实就在这里面。org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable是一个抽象类,它的子类是不同类型的Task,这里我们主要关注流处理任务相关的org.apache.flink.streaming.runtime.tasks.StreamTask,StreamTask的invoke方法执行了runMailboxLoop()方法。

runMailboxLoop()方法就是执行org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor的runMailboxLoop方法。MailboxProcessor是一种线程模型,runMailboxLoop就是在while轮询中不断执行任务和默认动作,其中默认动作就是StreamTask的processInput方法,该方法调用了StreamInputProcessor的inputProcessor方法,在这个方法中获取并处理了流数据。StreamInputProcessor的子类StreamOneInputProcessor和StreamTwoInputProcessor分别用来处理有1个和2个入度的Task(StreamMultipleInputProcessor先不管)。StreamOneInputProcessor中有1个StreamTaskInput用来获取数据,1个DataOutput用来收集从StreamTaskInput获取的数据;同理,StreamTwoInputProcessor有2个StreamTaskInput和2个DataOutput。StreamTaskInput的子类StreamTaskNetworkInput用来从网络中获取流数据,通过调用他它的emitNext不仅处理流数据还处理了checkpoint barrier,本篇文章只关注数据流的处理流程。StreamTaskNetworkInput从反序列化器中获取到完整流数据后把数据交给DataOutput。DataOutput也有处理1个入度和2个入度的子类,它们都持有OperatorChain中第一个operator的引用,称为headOperator,DataOutput从StreamTaskInput那里获取到数据后会交给headOperator来处理。到此为止,流数据被获取并传入了OperatorChain。 这里总结一下:StreamTask的processInput方法在MailboxProcessor中被反复调用,在processInput方法中StreamTask使用StreamInputProcessor来获取并处理流数据。StreamInputProcessor中的StreamTaskInput用来获取数据,获取的数据交给DataOutput,DataOutput将数据传入OperatorChain的第一个operator。其中StreamTask,StreamInputProcessor和DataOutput都有处理1个入度和2个入度的子类。

数据流过OperatorChain

OperatorChain的第一个operator获取数据后,数据是怎样在OperatorChain中流动的呢?首先说说OperatorChain,StreamOperatorWrapper是chain的每个节点,每个节点都有指向下一个或上一个节点的引用,因此OperatorChain是一个双向链表。但是数据的流动并不依靠这个链式结构。上文我们提到DataOutput将数据交给了headOperator,OperatorChain的第一个节点都是StreamOperator的子类,我们编写的filer算子,map算子等最终都会被封装成StreamOperator,例如子类StreamFlatMap就是执行flatMap方法,StreamFilter就是执行fliter方法等。这些方法执行的时候用org.apache.flink.streaming.api.operators.Output对处理后的结果进行收集。例如StreamFilter当FilterFunction返回true时收集数据,而StreamFlatMap将Output传入flatMap方法中由用户代码进行收集数据。收集的数据是怎样向OperatorChain的下一个节点传递的呢?原来Output中持有OneInputStreamOperator变量指向了chain中下一个节点的算子,调用Output的collect方法会调用下一个算子的processElement,数据就这样在整个OperatorChain中传递了。

发向下游Task

当数据传到OperatorChain的最后一个算子时数据是怎样发向下个Task的呢?最后一个算子拥有的Output实现类是org.apache.flink.streaming.runtime.io.RecordWriterOutput。RecordWriterOutput的collect方法会调用的org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit方法用来发送数据,该方法会将序列化器中的数据复制到BufferBuilder中。BufferBuilder维护了一个内存片段MemorySegment并且可以创建相应的消费者。RecordWriter有2个实现类ChannelSelectorRecordWriter和BroadcastRecordWriter。Task向下游节点的多个并行度发送数据,每个并行度都对应一个channel。ChannelSelectorRecordWriter为每个chanel都保存一个BufferBuilder并分别添加BufferConsumer:

BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel);//按channel获取BufferBuilderaddBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);//按channel添加BufferConsumerbufferBuilders[targetChannel] = bufferBuilder;

BroadcastRecordWriter只有一个BufferBuilder,使用同一个BufferBuilder给所有的channel添加BufferConsumer:

try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {    for (int channel = 0; channel < numberOfChannels; channel++) {        addBufferConsumer(bufferConsumer.copy(), channel);//所有channel用同一个BufferBuilder达到广播的目的    }}

RecordWriter#requestNewBufferBuilder方法会获取BufferBuilder,如果获取失败会导致Task执行线程阻塞造成反压。

public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {    BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);//尝试获取,获取不到返回null    if (builder == null) {        long start = System.currentTimeMillis();        builder = targetPartition.getBufferBuilder(targetChannel);//阻塞获取,导致反压        idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);    }    return builder;}

BufferBuilder最终来自LocalBufferPool,LocalBufferPool有几个重要的属性:

//taskmanager的网络缓存池,MemorySegment从这里获取private final NetworkBufferPool networkBufferPool;//已经获取的MemorySegment被组织成一个队列private final ArrayDeque availableMemorySegments = new ArrayDeque();//当前localBufferPool的大小private int currentPoolSize;//已经获取的MemorySegmentprivate int numberOfRequestedMemorySegments;//每个channel能同时获取的最大BufferBuilder数private final int maxBuffersPerChannel;//subpartition就是channel,数组存储了每个channel同时使用的BufferBuilder数private final int[] subpartitionBuffersCount;

BufferBuilder由requestMemorySegment方法和requestMemorySegmentBlocking方法获取,requestMemorySegmentBlocking方法也是调用requestMemorySegment方法并在没有获取到MemorySegment时通过AvailableFuture的get方法来阻塞直到获取成功为止,AvailableFuture是一个用CompletableFuture表示的状态位,这里用到了CompletableFuture的get方法会阻塞直到complete的特性,没有完成的future表示unavailable,完成了的表示available。requestMemorySegment方法中如果已经获取的MemorySegment(numberOfRequestedMemorySegments)大于了localBufferPool的大小(currentPoolSize)需要将多余的MemorySegment先归还给networkBufferPool。之后获取MemorySegment,如果获取不到就设置AvailableFuture为不可用,否则记录channel使用的MemorySegment数量,如果大于maxBuffersPerChannel,也设置AvailableFuture为不可用。

@Nullableprivate MemorySegment requestMemorySegment(int targetChannel) throws IOException {    MemorySegment segment = null;    synchronized (availableMemorySegments) {        returnExcessMemorySegments();//将多余的segment归还给networkBufferPool        if (availableMemorySegments.isEmpty()) {            segment = requestMemorySegmentFromGlobal();//全局获取        }        // segment may have been released by buffer pool owner        if (segment == null) {            segment = availableMemorySegments.poll();//局部获取        }        if (segment == null) {            availabilityHelper.resetUnavailable();//获取不到设置为不可用        }        //记录channel正在使用segment数,如果超了设置为不可用        if (segment != null && targetChannel != UNKNOWN_CHANNEL) {            if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) {                unavailableSubpartitionsCount++;                availabilityHelper.resetUnavailable();            }        }    }    return segment;}

反压的采集

上面说的AvailableFuture设置为不可用其实和反压有关,Task的isBackPressured方法返回了该Task是否产生了反压。

public boolean isBackPressured() {    if (invokable == null || consumableNotifyingPartitionWriters.length == 0 || !isRunning()) {        return false;    }    //获取所有的AvailableFuture,如果有没完成了则有反压    final CompletableFuture[] outputFutures = new CompletableFuture[consumableNotifyingPartitionWriters.length];    for (int i = 0; i < outputFutures.length; ++i) {        outputFutures[i] = consumableNotifyingPartitionWriters[i].getAvailableFuture();    }    return !CompletableFuture.allOf(outputFutures).isDone();}

以上是"Apache Flink Task执行之数据流如何处理"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

数据 方法 处理 子类 节点 就是 算子 阻塞 数据流 代码 任务 内容 动作 同时 大小 序列 指向 流程 用户 篇文章 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 流媒体管理服务器 近期网络安全事件 各种数据库经典电子书分享 家庭用什么网络安全 动态读取数据库生成树结构 厦门域网网络技术有限公司更名 方舟服务器管理权限 泰拉瑞亚进服务器联机卡顿 云主机没有数据库权限 负责网络安全监督管理的部门 怎样可以修改数据库后直接查询 软件开发项目的个人报告 丰赢互联网科技有限公司 理想连线网络技术股份怎么样 网络安全银行高管 网络技术支持岗位是做什么的 网络安全教育会议讲话 委托软件开发成本核算 数据库如何复制数据表 确保我市网络安全 数据库 课程设计 题目 初级软件开发是什么 某软件开发公司因业务发展需要 多举措筑牢网络安全 关于网络安全信息安全 计算机网络技术的发展标志性技术 不同数据库的表如何关联查询 在网络安全和认证 华为账号服务器无响应 明日之后跨服匹配服务器
0