Spark Streaming运行流程是怎样的
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,本篇内容介绍了"Spark Streaming运行流程是怎样的"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学
千家信息网最后更新 2025年02月02日Spark Streaming运行流程是怎样的
本篇内容介绍了"Spark Streaming运行流程是怎样的"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
通过下面的一个简单的例子来理解spark streaming
object OnlineForeachRDD2DB { def main(args: Array[String]){ /* * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息, * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置 * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如 * 只有1G的内存)的初学者 * */ val conf = new SparkConf() //创建SparkConf对象 conf.setAppName("OnlineForeachRDD") //设置应用程序的名称,在程序运行的监控界面可以看到名称// conf.setMaster("spark://Master:7077") //此时,程序在Spark集群 conf.setMaster("local[6]") //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口 val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream("Master", 9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => { val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")" val stmt = connection.createStatement(); stmt.executeUpdate(sql); }) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } } /** * 在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler * 的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法: * 1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job * 2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到 * 数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker * 内部会通过ReceivedBlockTracker来管理接受到的元数据信息 * 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD * 的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个 * 单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),为什么使用线程池呢? * 1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙; * 2,有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持; * */ ssc.start() ssc.awaitTermination() }}
"Spark Streaming运行流程是怎样的"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
运行
线程
程序
生成
信息
数据
方法
集群
配置
流程
不断
内容
名称
对象
方式
是在
更多
知识
中通
作业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
中山大学人网络安全
网络技术兴起意义
泉州餐厅微信点餐机软件开发
计算机服务器结构测试
工业网络技术班徽
长沙有游戏软件开发行业
网络安全管理局怎么样
网络安全的纲领性文件
为什么服务器建立不了角色
512事件 网络安全
武威软件开发生产销售
is平台服务器失败
数据库语言dowhile
宿城区网络技术保养
长沙软件开发集中在哪个区域
网络安全销售应聘简历
自己制作服务器不备案可以吗
探究计算机网络技术的应用与发展
网络安全的英语博文
服务器怎么设置云桌面
如何留住软件开发的核心人员
dm数据库导出至excel
东莞创富网络技术有限公司
微软数据库加载数据库文件
宁夏极致网络技术有限公司
全光网络技术分类
嘉峪关网络安全工程师招聘网
软件开发商为一个人
拨打电话时服务器错误
云厂商什么服务器好