千家信息网

flink中如何实现有状态stateful的计算

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,小编给大家分享一下flink中如何实现有状态stateful的计算,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!impor
千家信息网最后更新 2025年01月23日flink中如何实现有状态stateful的计算

小编给大家分享一下flink中如何实现有状态stateful的计算,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state.ValueStateimport org.apache.flink.util.Collectorimport org.apache.flink.configuration.Configurationimport org.apache.flink.api.common.state.ValueStateDescriptorimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment class CountWindowAverage extends RichFlatMapFunction[(Long, Double), (Long, Double)] {  private var sum: ValueState[(Long, Double)] = _  override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {    // access the state valueval tmpCurrentSum = sum.value    // If it hasn't been used before, it will be null    val currentSum = if (tmpCurrentSum != null) {      tmpCurrentSum    } else {      (0L, 0d)    }    // update the count    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)    // update the state    sum.update(newSum)    // if the count reaches 2, emit the average and clear the state    if (newSum._1 >= 2) {      out.collect((input._1, newSum._2 / newSum._1))      //将状态清除      //sum.clear()    }  }  override def open(parameters: Configuration): Unit = {    sum = getRuntimeContext.getState(      new ValueStateDescriptor[(Long, Double)]("average", classOf[(Long, Double)])    )  }}import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object ECountWindowAverage {   def main(args: Array[String]): Unit = {  val env = StreamExecutionEnvironment.getExecutionEnvironment   env.fromCollection(List(      (1L, 3d),      (1L, 5d),      (1L, 7d),      (1L, 4d),      (1L, 2d)    )).keyBy(_._1)      .flatMap(new CountWindowAverage())      .print()      /*.keyBy(_._1)    .flatMap(new CountWindowAverage())    .print()*/    // the printed output will be (1,4) and (1,5)  env.execute("ExampleManagedState")  }}

以上是"flink中如何实现有状态stateful的计算"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0