flink中如何实现有状态stateful的计算
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,小编给大家分享一下flink中如何实现有状态stateful的计算,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!impor
千家信息网最后更新 2025年01月23日flink中如何实现有状态stateful的计算
小编给大家分享一下flink中如何实现有状态stateful的计算,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state.ValueStateimport org.apache.flink.util.Collectorimport org.apache.flink.configuration.Configurationimport org.apache.flink.api.common.state.ValueStateDescriptorimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment class CountWindowAverage extends RichFlatMapFunction[(Long, Double), (Long, Double)] { private var sum: ValueState[(Long, Double)] = _ override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = { // access the state valueval tmpCurrentSum = sum.value // If it hasn't been used before, it will be null val currentSum = if (tmpCurrentSum != null) { tmpCurrentSum } else { (0L, 0d) } // update the count val newSum = (currentSum._1 + 1, currentSum._2 + input._2) // update the state sum.update(newSum) // if the count reaches 2, emit the average and clear the state if (newSum._1 >= 2) { out.collect((input._1, newSum._2 / newSum._1)) //将状态清除 //sum.clear() } } override def open(parameters: Configuration): Unit = { sum = getRuntimeContext.getState( new ValueStateDescriptor[(Long, Double)]("average", classOf[(Long, Double)]) ) }}import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object ECountWindowAverage { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection(List( (1L, 3d), (1L, 5d), (1L, 7d), (1L, 4d), (1L, 2d) )).keyBy(_._1) .flatMap(new CountWindowAverage()) .print() /*.keyBy(_._1) .flatMap(new CountWindowAverage()) .print()*/ // the printed output will be (1,4) and (1,5) env.execute("ExampleManagedState") }}
以上是"flink中如何实现有状态stateful的计算"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
状态
篇文章
内容
不怎么
大部分
更多
知识
行业
资讯
资讯频道
频道
参考
学习
帮助
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全审查多久才能结束
梦想城镇服务器怎么进
供电网络安全宣传周
魔兽阿古斯服务器还有部落么
ibm服务器 说明书
网络安全配置教程
校园网络安全漏洞有哪些
海湾战争网络技术
大众互联网科技
知乎 华为软件开发云
无线网络安全模式怎么解除
双奥期间网络安全保障情况总结
健全网络安全标准体系
山东办公oa软件开发
虚拟网络技术的应用ppt
网络服务器能用个人电脑吗
数据库出现在什么年代
数据库当前日期
泓洆互联网科技
cs go躲猫猫服务器
机房服务器系统漏洞防护
dtl 数据库
庆余年手游更新维护进不去服务器
服务器如何加ssd
中山嵌入式软件开发流程
mac连接公司服务器
南皮县网络安全和信息化
农业计算机网络技术就业方向
网络服务器能用个人电脑吗
水务行业数据库审计技术原理