千家信息网

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

发表于:2024-09-23 作者:千家信息网编辑
千家信息网最后更新 2024年09月23日,本期内容:1、updateStateByKey解密2、mapWithState解密背景:整个Spark Streaming是按照Batch Duractions划分Job的。但是很多时候我们需要算过去
千家信息网最后更新 2024年09月23日(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

本期内容:

1、updateStateByKey解密

2、mapWithState解密

背景:
整个Spark Streaming是按照Batch Duractions划分Job的。但是很多时候我们需要算过去的一天甚至一周的数据,这个时候不可避免的要进行状态管理,而Spark Streaming每个Batch Duractions都会产生一个Job,Job里面都是RDD,

所以此时面临的问题就是怎么对状态进行维护?这个时候就需要借助updateStateByKey和mapWithState方法完成核心的步骤。


1、简单看下updateStateByKey源码:

在DStream中updateStateByKey和mapWithState是通过隐式转换来完成,本身没有这样的方法。

implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):  PairDStreamFunctions[K, V] = {new PairDStreamFunctions[K, V](stream)}
[: ClassTag](    updateFunc: ([]Option[]) => Option[]  ): DStream[()] = ssc.withScope {  updateStateByKey(updateFuncdefaultPartitioner())}

最终会通过StateDStream中的computeUsingPreviousRDD和compute来完成这样的功能,简单的流程图如下:

2、简单看下mapWithState源码

mapWithState是返回MapWithStateDStream对象,维护和更新历史状态都是基于Key,使用一个function对key-value形式的数据进行状态维护

[: ClassTag: ClassTag](    spec: StateSpec[]  ): MapWithStateDStream[] = {MapWithStateDStreamImpl[](    selfspec.asInstanceOf[StateSpecImpl[]]  )}

通过InternalMapWithStateDStream类中的compute来完成,简单的流程图如下:

0