Spark Streaming运行流程是怎样的
发表于:2024-11-18 作者:千家信息网编辑
千家信息网最后更新 2024年11月18日,本篇内容介绍了"Spark Streaming运行流程是怎样的"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学
千家信息网最后更新 2024年11月18日Spark Streaming运行流程是怎样的
本篇内容介绍了"Spark Streaming运行流程是怎样的"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
通过下面的一个简单的例子来理解spark streaming
object OnlineForeachRDD2DB { def main(args: Array[String]){ /* * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息, * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置 * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如 * 只有1G的内存)的初学者 * */ val conf = new SparkConf() //创建SparkConf对象 conf.setAppName("OnlineForeachRDD") //设置应用程序的名称,在程序运行的监控界面可以看到名称// conf.setMaster("spark://Master:7077") //此时,程序在Spark集群 conf.setMaster("local[6]") //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口 val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream("Master", 9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => { val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")" val stmt = connection.createStatement(); stmt.executeUpdate(sql); }) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } } /** * 在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler * 的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法: * 1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job * 2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到 * 数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker * 内部会通过ReceivedBlockTracker来管理接受到的元数据信息 * 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD * 的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个 * 单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),为什么使用线程池呢? * 1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙; * 2,有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持; * */ ssc.start() ssc.awaitTermination() }}
"Spark Streaming运行流程是怎样的"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
运行
线程
程序
生成
信息
数据
方法
集群
配置
流程
不断
内容
名称
对象
方式
是在
更多
知识
中通
作业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
根据网络安全法规定哪些单位
穿越服务器
微信个人数据库设计
数据库技术在经济管理的应用
数据库安全性设计怎么写
北京英贝多嵌入式网络技术
aix登陆数据库
s2d数据库
数据库的技术发展经历了
锦州网络安全办
主控室云服务器管理设备
网络技术公司副总面试题
dnf外网架设接收服务器失败
您试图访问的网络安全存在问题
分析股票专业软件开发
吴江区运营网络技术优势
数据库的用户和角色的设置
数据库远程同步软件
手机app使用的是什么数据库
软科数据库
资料共享服务器主板
数据库工程师都干啥
三级计算机网络技术试题
软件开发学什么职位知乎
dnf外网架设接收服务器失败
数据库放在本地还是云数据库好
平板电脑服务器连接失败
数据库标签老密是什么意思
大智慧收费版最新收费全推服务器
大专学网络技术专业自考英语