spark streaming窗口聚合操作后怎么管理offset
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章主要介绍"spark streaming窗口聚合操作后怎么管理offset",在日常操作中,相信很多人在spark streaming窗口聚合操作后怎么管理offset问题上存在疑惑,小编查阅
千家信息网最后更新 2025年02月02日spark streaming窗口聚合操作后怎么管理offset
这篇文章主要介绍"spark streaming窗口聚合操作后怎么管理offset",在日常操作中,相信很多人在spark streaming窗口聚合操作后怎么管理offset问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"spark streaming窗口聚合操作后怎么管理offset"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
对于spark streaming来说窗口操作之后,是无法管理offset的,因为offset的存储于HasOffsetRanges。只有kafkaRDD继承了他,所以假如我们对KafkaRDD进行了转化之后就无法再获取offset了。
还有窗口之后的offset的管理,也是很麻烦的,主要原因就是窗口操作会包含若干批次的RDD数据,那么提交offset我们只需要提交最近的那个批次的kafkaRDD的offset即可。如何获取呢?
对于spark 来说代码执行位置分为driver和executor,我们希望再driver端获取到offset,在处理完结果提交offset,或者直接与结果一起管理offset。
说到driver端执行,其实我们只需要使用transform获取到offset信息,然后在输出操作foreachrdd里面使用提交即可。
package bigdata.spark.SparkStreaming.kafka010
import java.util.Properties
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, TaskContext}
import scala.collection.JavaConverters._
import scala.collection.mutable
object kafka010NamedRDD {
def main(args: Array[String]) {
// 创建一个批处理时间是2s的context 要增加环境变量
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("/opt/checkpoint")
// 使用broker和topic创建DirectStream
val topicsSet = "test".split(",").toSet
val kafkaParams = Map[String, Object]("bootstrap.servers" -> "mt-mdh.local:9093",
"key.deserializer"->classOf[StringDeserializer],
"value.deserializer"-> classOf[StringDeserializer],
"group.id"->"test4",
"auto.offset.reset" -> "latest",
"enable.auto.commit"->(false: java.lang.Boolean))
// 没有接口提供 offset
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams,getLastOffsets(kafkaParams ,topicsSet)))//
var A:mutable.HashMap[String,Array[OffsetRange]] = new mutable.HashMap()
val trans = messages.transform(r =>{
val offsetRanges = r.asInstanceOf[HasOffsetRanges].offsetRanges
A += ("rdd1"->offsetRanges)
r
}).countByWindow(Seconds(10), Seconds(5))
trans.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
val offsetRanges = A.get("rdd1").get//.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
println(rdd.count())
println(offsetRanges)
// 手动提交offset ,前提是禁止自动提交
messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
// A.-("rdd1")
})
// 启动流
ssc.start()
ssc.awaitTermination()
}
def getLastOffsets(kafkaParams : Map[String, Object],topics:Set[String]): Map[TopicPartition, Long] ={
val props = new Properties()
props.putAll(kafkaParams.asJava)
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(topics.asJavaCollection)
paranoidPoll(consumer)
val map = consumer.assignment().asScala.map { tp =>
println(tp+"---" +consumer.position(tp))
tp -> (consumer.position(tp))
}.toMap
println(map)
consumer.close()
map
}
def paranoidPoll(c: Consumer[String, String]): Unit = {
val msgs = c.poll(0)
if (!msgs.isEmpty) {
// position should be minimum offset per topicpartition
msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
val tp = new TopicPartition(m.topic, m.partition)
val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)
acc + (tp -> off)
}.foreach { case (tp, off) =>
c.seek(tp, off)
}
}
}
}
到此,关于"spark streaming窗口聚合操作后怎么管理offset"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
管理
学习
批次
更多
结果
帮助
实用
接下来
代码
位置
信息
前提
原因
变量
只有
就是
手动
接口
数据
文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全攻击量化工具
湖南学电脑软件开发培训哪家好
海康威视ntp时间同步服务器
mcgs实时数据库变量找不到
虚拟网络技术主要是
新一线城市互联网科技
互联网基础科技股
fireeye网络安全问题
camhi服务器
软件开发流程和团队
饥荒联机版专属服务器怎么换角色
win7网络安全模式蓝屏
优秀物业管理软件开发
民生银行 软件开发
苏州蓝宇软件用的什么数据库
信息网络安全分级
长宁区发展网络技术咨询怎么样
昆明网络安全学习
拼多多网店挂阿里云服务器
2018国家网络安全活动
时序数据库 中标
软件开发公司管理模式
油乐网络技术有限公司怎么样
数据库三个数比大小思路
如何设置用户拒绝接入服务器
电脑软件开发毕业去向
戴尔服务器清除外部配置
网络安全随时预防
勇士岛服务器
固始县公安局网络安全大队