千家信息网

spark大数据架构初学入门基础详解

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,Spark是什么a) 是一种通用的大数据计算框架b) Spark Core 离线计算Spark SQL 交互式查询Spark Streaming 实时流式计算Spark MLlib 机器学习Spark
千家信息网最后更新 2025年01月23日spark大数据架构初学入门基础详解

Spark是什么


a) 是一种通用的大数据计算框架


b) Spark Core 离线计算


Spark SQL 交互式查询


Spark Streaming 实时流式计算


Spark MLlib 机器学习


Spark GraphX 图计算


c) 特点:


i. 一站式:一个技术堆栈解决大数据领域的计算问题


ii. 基于内存


d) Spark2009年诞生于伯克利大学的AMPLab实验室


2010年正式开源了Spark项目


2013年Spark成为Apache下的项目


2014年飞速发展,成为Apache的顶级项目


2015年在国内兴起,代替mr,hive,storm等


作者:辛湜(shi)


e) Spark和Hive:


Spark优点:


i. 速度快


ii. Spark SQL支持大量不同的数据源


f) Spark 和Storm


i. 计算模型不一样


ii. Spark吞吐量大


g) 特点:快,易用,通用,兼容性


h) spark运行模式


i. local(本地)


ii. standalone(集群)


iii. on yarn(由 yarn作为资源调度Spark负责任务调度和计算)


iv. on mesos(由mesos作为资源调度S)


v. on cloud()


i) 配置步骤


=======================on yarn====================


【说明】


1. spark任务运行在yarn上,由yarn来进行资源调度和管理,spark只负责任务的调度 和计算


2. 不需要配置和启动spark集群


3. 只需要在提交任务的节点上安装并配置spark on yarn 模式


4. 必须找一台节点安装spark


5. 步骤:


i. 安装配置JDK


ii. vi spark-env.sh


1. export JAVA_HOME=/opt/modules/jdk1.7_6.0


2. export HADOOP_CONF_DIR = /opt/modules/hadoop/etc/hadoop


iii. 测试spark on yarn 模式是否安装成功


iv. 网络测试:http://hadoop-yarn1.beicai.com:8088


=====================sdandalone模式==============


【说明】


1. spark运行在spark 集群上,由spark进行资源调度管理,同时还负责任务的调度和 计算


2. 需要配置和启动spark集群


3. 步骤:


i. 安装配置JDK


ii. 上传并解压Spark


iii. 建立软连接 ln -s spark spark 或者修改名称


iv. 配置环境变量


v. 安装配置Spark,修改spark配置文件(spark-env.sh, slaves)


1. vi spark-env.sh


a) export JAVA_HOME=/opt/modules/jdk(jdk位置)


b) export SPARK_MASTER_IP=hadoop-yarn1.beicai.com


c) export SPARK_MASTER_PORT=7077


2. vi slaves(用于指定在哪些节点上启动worker)


a) hadoop-yarn2.beicai.com


hadoop-yarn3.beicai.com


vi. 将spark发送给其他主机


vii. 启动


/opt/modules/spark/bin/start-all.sh


vii. 查看SparkUI界面:http://hadoop-yarn1.beicai.com:8080


4.


j)


一、Spark原理


1、Spark的运行原理


i、分布式


Ii、主要基于内存(少数情况基于磁盘)


Iii、迭代式计算


2、Spark 计算模式 VS MapReduce 计算模式对比



Mr这种计算模型比较固定,只有两种阶段,map阶段和reduce阶段,两个阶段结束 后,任务就结束了,这意味着我们的操作很有限,只能在map阶段和reduce阶段, 也同时意味着可能需要多个mr任务才能处理完这个job



Spark 是迭代式计算,一个阶段结束后,后面可以有多个阶段,直至任务计算完 成,也就意味着我们可以做很多的操作,这就是Spark计算模型比mr 强大的地方



三、什么是Spark RDD?



1、什么是RDD?


弹性的,分布式的,数据集



(RDD在逻辑上可以看出来是代表一个HDFS上的文件,他分为多个分区,散落 在Spark的多个节点上)


3、RDD----弹性


当RDD的某个分区的数据保存到某个节点上,当这个节点的内存有限,保存不了这个 分区的全部数据时,Spark就会有选择性的将部分数据保存到硬盘上,例如:当worker 的内存只能保存20w条数据时,但是RDD的这个分区有30w条数据,这时候Spark就 会将多余的10w条数据,保存到硬盘上去。Spark的这种有选择性的在内存和硬盘之间的权衡机制就是RDD的弹性特点所在



4、Spark的容错性


RDD最重要的特性就是,提供了容错性,可以自动的从失败的节点上恢复过来,即如 果某个节点上的RDD partition(数据),因为节点的故障丢了,那么RDD会自动的通过 自己的数据来源重新计算该partition,这一切对使用者来说是透明的



2、Spark的开发类型



(1)、核心开发:离线批处理 / 演示性的交互式数据处理



(2)、SQL查询:底层都是RDD和计算操作



(3)、底层都是RDD和计算操作



(4)、机器学习



(5)、图计算



3、Spark 核心开发(Spark-core == Spark-RDD)步骤



(1)、创建初始的RDD



(2)、对初始的RDD进行转换操作形成新的RDD,然后对新的RDD再进行操作,直 至操作计算完成



(3)、将最后的RDD的数据保存到某种介质中(hive、hdfs,MySQL、hbase...)



五、Spark原理


Driver,Master,Worker,Executor,Task各个节点之间的联系




Spark中的各节点的作用:


1、driver的作用:


(1)、 向master进行任务的注册


(2)、构建运行任务的基本环境


(3)、接受该任务的executor的反向注册


(4)、向属于该任务的executor分配任务



2、什么是driver?


我们编写的程序打成jar包后,然后找一台能够连接spark集群的节点做任务的driver,具体的表现为SparkSubmit



3、Master的作用:


(1)、监控集群;


(2)、动态感知worker的上下线;


(3)、接受driver端注册请求;


(4)、任务资源的调度



4、Worker的作用:


(1)、定时向master汇报状态;


(2)、接受master资源调度命令,进行资源的调度


(3)、启动任务的容器Executor



5、Executor的作用:


(1)、保存计算的RDD分区数据;


(2)、向Driver反向注册;


(3)、接受Driver端发送来的任务Task,作用在RDD上进行执行




Spark 编程的流程:



1、我们编写的程序打包成jar包,然后调用Spark-Submit 脚本做任务的提交



2、启动driver做任务的初始化



3、Driver会将任务极其参数(core,memory,driver相关的参数)进行封装成ApplicationDescript通过taskSchedulerImpl 提交给Master



4、Master接受到driver端注册任务请求时,会将请求参数进行解析,并封装成APP,然后进行持久化,并且加入到其任务队列中的waitingAPPs



5、当轮到咱们提交的任务运行时,master会调用schedule()这个方法,做任务资源调度



6、Master将调度好的资源封装成launchExecutor,发送给指定的worker



7、Worker接收到发送来的launchExecutor时,会将其解析并封装成ExecutorRunner,然后调用start方法,启动Executor



8、Executor启动后,会向任务的Driver进行反向注册



9、当属于这个任务的所有executor启动成功并反向注册完之后,driver会结束SparkContext对象的初始化



10、当sc 初始化成功后,意味着运行任务的基本环境已经准备好了,driver会继续运行我们编写好的代码



11、开始注册初始的RDD,并且不断的进行转换操作,当触发了一个action算子时,意味着触发了一个job,此时driver就会将RDD之间的依赖关系划分成一个一个的stage,并将stage封装成taskset,然后将taskset中的每个task进行序列化,封装成launchtask,发送给指定的executor执行



12、Executor接受到driver发送过来的任务task,会对task进行反序列化,然后将对应的算子(flatmap,map,reduceByKey。。。。)作用在RDD分区上




六、RDD详解



1、什么是RDD?


RDD(Resilient Disttibuted Dataset)叫做弹性的分布式的数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可并行计算的集合



2、RDD的特点:


自动容错


位置感知性调度


伸缩性



3、RDD的属性:


(1)、一组分片(partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度,用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配到的CPU Core的数目


(2)、一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现computer函数以达到这个目的。Computer函数会对迭代器进行复合,不需要保存每次计算的结果。


(3)、RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。


(4)、一个partition,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于hashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了partition RDD Shuffle输出时的分片数量。


(5)、一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFD文件来说。这个列表保存的就是每个Partition所在的快的位置。按照"移动数据不如移动计算"的理念。Spark在进行任务调度的时候,会尽可能的将计算任务分配到所要处理数据块的存储位置。




4、RDD的创建:


进行Spark核心编程时,首先要做的事就是创建一个初始的RDD。Spark Core提供了三种创建RDD的方式:


(1)、使用程序中的集合创建RDD (调用parallelize()方法)


(2)、使用本地文件创建RDD (调用textFile()方法)


(3)、使用HDFD文件创建RDD (调用textFile()方法)



七、算子



1、什么是算子?


是RDD中定义的作用在每一个RDD分片上的函数,可以对RDD中的数据进行转换 和操作



2、RDD算子的分类


(1)、Transformation算子,这类算子变换不触发提交作业(特点就是lazy特性)


返回的是一个RDD


(2)、Action算子,这类算子会触发SparkContext提交作业(触发一个spark job的运行,从而触发这个action之前所有的transformation的执行)


返回的是一个spark对象



3、常用的Transformation算子


八、RDD分区排序



I、分区


两种实现方式:coalesce 和 repartition(底层调用coalesce)



coalesce(numPartitons,isShuffle)


第一个参数是重分区后的数量,第二个参数是是否进行shuffle


如果原来有N个分区,重分区后有M个分区


如果 M > N ,必须将第二参数设置为true(也就是进行shuffle),等价于 repartition(numPartitons) 如果是false将不起作用


如果M < N


100-->10 重分区后的分区数比原来的小的多,那么久需要使用shuffle,也即是设置为true


100-->90 重分区后的分区数和原来的差不多的,那么就不需要使用shuffle,也就是设置为false



II、排序


sortBy(x => x) 这个算子中带有隐式转换参数



x 能够排序(比较大小),那么这个类就必须有比较大小的功能,也就是实现了compareTo 或者compare



实现二次排序有两种方法:


1、继承Comparable 接口 或者 Ordered


2、隐式转换:可以定义隐式转换函数(Ordered)或者隐式转换值(Ordering)



九、自定义分区



自定义分区


要求:按照key将对应的value输出到指定的分区中


解释:自定义一个自定义分区类,继承partitioner,实现他的两个方法


1、numPartitions


2、getPartition


具体的功能根据项目的要求自定义实现,然后调用partitionBy方法,new出自定义的类,传入参数即可


九、RDD持久化原理



1、持久化场景:对于一个rdd会被多次引用到,并且这个rdd计算过程复杂,计算时间特变耗时



2、如何进行持久化,调用rdd.persist方法或cache方法,cache方法底层就是调用persist方法



******************persist(StorageLevel.MEMORY_ONLY)*******************


如果对RDD做持久化,默认持久化级别是storageLevel.MEMORY_ONLY ,也就是持久化到内存中去,这种持久化级别是效率最快的,但是由于是纯Java 对象,保存到内存中,那么内存可能保存的数量就会较少


***************persist(StorageLevel.MEMORY_ONLY_SER)****************


如果当我们集群资源有限时,那么我们可以采用MEMORY_ONLY_SER,也就是将Java对象进行序列化之后持久到内存中去,这种持久化的好处是能够持久化更多的数据到内存中,但是由于在持久化时需要序列化,取出来之后又需要反序列化这一过程,这个过程会消耗CPU计算资源,性能相对于MEMORY_ONLY 这种持久化级别来说稍微弱点,但是还是比较高效的



3、如何选择RDD持久化策略?


Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍,下面是一些通用的持久化级别的选择建议:


1)、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略,因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作


2)、如果MEMORY_ONLY策略,无法存储所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快的,只是要消耗CPU进行反序列化


3)、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了


4、能不使用DISK相关的策略,就不要使用,有的时候,从磁盘读取数据,还不如重新计算一次




十一、共享变量



1、共享变量分为两种:广播变量 和 累加器



广播变量(broadcast)



2、日常所遇问题


因为每个task都需要拷贝这样的一个副本到executor去执行,那么我们可以想象一下,如果有1000 个task在某个worker上执行,而这个副本有100M,那么意味着我们需要拷贝100G的数据都到某个worker上执行,这样的话会大大消耗我们的网络流量,同时会加大executor的内存消耗,从而增加了我们spark作业的运行时间,大大降低了spark作业的运行效率,增加了作业失败的概率



3、如何解决以上问题,也就是说什么时候使用广播变量?


当RDD引用到了一个外部变量并且这个外部变量数据量不小,同时这个RDD对应的task数量特别多,那么此时使用广播共享变量再合适不过了


我们可以将这种大的外部变量做成广播变量,外部变量做成广播变量的时候,那么每个executor的内存中只会有一个外部变量,而这个副本针对所有的task都是共享的,这样的话就减少了网络流量消耗,降低了executor的内存消耗,提高了spark作业运行效率和缩短了运行时间,同时降低了作业失败的概率




4、广播变量的使用流程:


1)、某个executor的第一个task先执行,首先会从自己的blockManager中查找外部变量,如果没有就从邻居的executor的blockManager的内存中获取这个外部变量,如果还是获取不到,就从driver端获取,拷贝这个外部变量到本地的executor的blockManager


2)、当这个executor的其他task执行时,就不需要从外面获取这个外部变量的副本,直接从本地的blockManager中获取即可




5、如何获取广播变量的值?


可以直接调用广播变量的value() 这个方法即可



【注意】广播变量是只读的,不可写




累加器(Accumulator)



Spark提供的Accumulator ,主要用于多个节点对一个变量进行共享性的操作,Accumulator只是提供了累加的功能。但是却给我们提供了多个task对一个变量并行操作的功能,但是task只能对Accumulator进行累加操作


【注意】task只能对Accumulator进行类加操作,只有Driver程序可以读取Accumulator的值


RDD分区和容错机制讲解

1、RDD 的Lineage血统


RDD只支持粗粒度转换,即在大量记录上执行的单个操作,将创建RDD的一系列Lineage(血统)记录下来。以便恢复丢失的分区



2、RDD的依赖关系


RDD和它的父RDD的关系有两种不同的类型:


1)、窄依赖(一对一,多对一)


形象的比喻:独生子女


2)、宽依赖(多对多)


形象的比喻:超生



注释:划分stage的依据就是宽依赖,也就是RDD之间是否有shuffle,shuffle过程就是一个宽依赖过程,shuffle之前的tasks就属于一个stage,shuffle之后的也属于一个stage,shuffle之前和之后的操作都是窄依赖


【注意】shuffle过程分为:shuffle Write过程 和 shuffle read过程



4、DAG的生成(有向无环图)和任务的划分


DAG(Directed Acyclic Graph)叫做有向无环图(有方向无循环的图)



5、一个wordCount过程会产生多少个RDD?


至少会产生五个RDD,


第一个,从HDFS中加载后得到一个RDD(即使用sc.textFile()算子),即HadoopRDD


在sc.textFile()过程中还会产生一个RDD(调用map算子),产生一个MapPartitionRDD


第二个,使用flatMap算子,得到一个MapPartitionRDD


第三个,使用map算子,得到一个MapPartitionRDD


第四个,使用reduceByKey算子,也就是在经过了shuffle过程后又会得到一个shuffledRDD


第五个,使用saveAsTextFile算子,再产生一个MapPartitionRDD



spark程序提交流程讲解


Spark任务简介:


Spark-submit--->SparkSubmit-->main-->submit-->doRunMain-->RunMain-->通过反射创建我们编写的主类的实例对象,调用main方法-->开始执行我们编写的代码-->初始化SparkContext对象-->创建初始的RDD-->触发action算子-->提交job-->worker执行任务-->任务结束



Spark任务详解:


1)、将我们编写的程序打成jar包



2)、调用spark-submit脚本提交任务到集群上运行



3)、运行sparkSubmit的main方法,在这个方法中通过反射的方式创建我们编写的主类的实例对象,然后调用main方法,开始执行我们的代码(注意,我们的spark程序中的driver就运行在sparkSubmit进程中)



4)、当代码运行到创建SparkContext对象时,那就开始初始化SparkContext对象了



5)、在初始化SparkContext对象的时候,会创建两个特别重要的对象,分别是:DAGScheduler


和TaskScheduler



【DAGScheduler的作用】将RDD的依赖切分成一个一个的stage,然后将stage作为taskSet提交给DriverActor



6)、在构建taskScheduler的同时,会创建两个非常重要的对象,分别是DriverActor和ClientActor



【clientActor的作用】向master注册用户提交的任务


【DriverActor的作用】接受executor的反向注册,将任务提交给executor



7)、当clientActor启动后,会将用户提交的任务和相关的参数封装到ApplicationDescription对象中,然后提交给master进行任务的注册



8)、当master接受到clientActor提交的任务请求时,会将请求参数进行解析,并封装成Application,然后将其持久化,然后将其加入到任务队列waitingApps中



9)、当轮到我们提交的任务运行时,就开始调用schedule(),进行任务资源的调度



10)、master将调度好的资源封装到launchExecutor中发送给指定的worker



11)、worker接受到Maseter发送来的launchExecutor时,会将其解压并封装到ExecutorRunner中,然后调用这个对象的start(), 启动Executor



12)、Executor启动后会向DriverActor进行反向注册



13)、driverActor会发送注册成功的消息给Executor



14)、Executor接受到DriverActor注册成功的消息后会创建一个线程池,用于执行DriverActor发送过来的task任务



15)、当属于这个任务的所有的Executor启动并反向注册成功后,就意味着运行这个任务的环境已经准备好了,driver会结束SparkContext对象的初始化,也就意味着new SparkContext这句代码运行完成



16)、当初始化sc成功后,driver端就会继续运行我们编写的代码,然后开始创建初始的RDD,然后进行一系列转换操作,当遇到一个action算子时,也就意味着触发了一个job



17)、driver会将这个job提交给DAGScheduler



18)、DAGScheduler将接受到的job,从最后一个算子向前推导,将DAG依据宽依赖划分成一个一个的stage,然后将stage封装成taskSet,并将taskSet中的task提交给DriverActor



19)、DriverActor接受到DAGScheduler发送过来的task,会拿到一个序列化器,对task进行序列化,然后将序列化好的task封装到launchTask中,然后将launchTask发送给指定的Executor



20)、Executor接受到了DriverActor发送过来的launchTask时,会拿到一个反序列化器,对launchTask进行反序列化,封装到TaskRunner中,然后从Executor这个线程池中获取一个线程,将反序列化好的任务中的算子作用在RDD对应的分区上



【注意】


Spark的任务分为为两种:


a、shuffleMapTask:shuffle之前的任务


b、resultTask:shuffle之后的任务



Spark任务的本质:


将RDD的依赖关系切分成一个一个的stage,然后将stage作为TaskSet分批次的发送到Executor上执行




十三、Checkpoint



1、使用checkpoint的场景:


某个RDD会被多次引用,计算特别复杂,计算特别耗时


担心中间某些关键的,在后面会反复几次使用的RDD,可能会因为节点的故障,导致持久化数据的丢失



2、如何对RDD进行checkpoint?


1)、设置还原点目录,设置checkpoint目录


2)、调用RDD的checkpoint的方法对该RDD进行checkpoint



3、checkpoint的原理


1)、RDD调用了checkpoint方法之后,就接受RDDCheckpointData对象的管理


2)、RDDCheckpointData对象会负责将调用了checkpoint的RDD 的状态设置为MarkedForCheckpoint


3)、当这个RDD所在的job运行结束后,会调用最后一个RDD的doCheckpoint,根据其血统向上查找,查找到被标注为MarkedForCheckpoint状态的RDD,将其状态改变为checkpointingInProgress


4)、启动一个单独的job,将血统中标记为checkpointingInProgress的RDD进行checkpoint,也就是将RDD的数据写入到checkpoint的目录中去


5)、当某个节点发生故障,导致包括持久化的数据全部丢失,此时会从还原点目录还原RDD的每个分区的数据,这样就不需要从头开始计算一次



4、checkpoint需要注意的地方


因为RDD在做checkpoint的时候,会单独启动一个job对需要进行checkpoint的RDD进行重新计算,这样就会增加spark作业运行时间,所以spark强烈建议在做checkpoint之前,应该对需要进行checkpoint的RDD进行持久化(即调用 .cache)



5、checkpoint 和持久化的区别


1)、是否改变血统:


持久化(.cache):不会改变RDD的依赖关系,也就是不会改变其血统


Checkpoint:会改变RDD的血统,做了checkpoint的RDD会清除其所有的依赖关系,并将其父RDD强制设置为checkpointRDD,并且将RDD的状态更改为checkpointed



2)、RDD的数据的可靠性:


持久化:只是将RDD的数据持久化到内存或磁盘中,但是如果节点发生故障,那么持久化的数据还是会丢失


Checkpoint:checkpoint的数据保存在第三方高可靠的分布式的文件系统中,机试节点发生故障,数据也不会丢失,所以checkpoint比持久化可靠性更高




6、后续


我们实现了checkpoint 之后,在某个task 又调用了该RDD的iterator() 方法时,就实现了高容错机制,即使RDD的持久化数据丢失,或者压根儿就没有持久化,但是还是可以通过readCheckpointOrComputer() 方法,优先从父RDD-----checkpointRDD中读取,HDFS(外部文件系统)的数据










第二部分 spark-sql



一、Spark-SQL前世今生



1、Spark SQL的特点


1)、支持多种数据源:Hive、RDD、Parquet、JSON、JDBC等。


2)、多种性能优化技术:in-memory columnar storage、byte-code generation、cost model动态评估等。


3)、组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户都可以自己重新开发,并且动态扩展



2、Spark SQL的性能优化技术简介


1)、内存列存储(in-memory columnar storage)


2)、字节码生成技术(byte-code generation)


3)、Scala代码编写的优化




3、Spark SQL and DataFrame


Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。同时Spark SQL还可以作为分布式的SQL查询引擎。Spark SQL最重要的功能之一,就是从Hive中查询数据。



DataFrame,可以理解为是,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。




二、Spark-sql的使用



1、RDD转换为DataFrame(两种)


1)、使用反射的方式来推断包含了特定数据类型的RDD的元数据


2)、通过编程接口来创建DataFrame



2、UDF自定义函数和UDAF自定义聚合函数


UDF,其实更多的是针对单行输入,返回一个输出


UDAF,则可以针对多行输入,进行聚合计算,返回一个输出,功能更加强大



3、Spark-SQL工作原理


SqlParse ---------> 解析器



Analyser ---------> 分析器



Optimizer ---------> 优化器



SparkPlan ---------> 物理计划



流程:



1)、自己编写的SQL语句


大家要知道,只要在数据库类型的技术里面,比如:最传统的MySQL,Oracle等,包括现在大数据领域的数据仓库,比如hive,他的基本的SQL执行的模型,都是类似的,首先都要生成一条SQL语句的执行计划



2)、通过SqlParser(解析器)生成未解析的逻辑计划(unresolved LogicalPlan)


3)、通过Analyzer(分析器)生成解析后的逻辑计划(resolved LogicalPlan)


4)、通过Optimizer(优化器)生成优化后的逻辑计划(optimized LogicalPlan)


实际上,比如传统的Oracle等数据库,通常都会生成多个执行计划,然后呢,最后有一个优化器,针对多个计划,选择一个最好的计划,而SparkSql这儿的优化指的是,比如说,刚生成的执行计划中,有些地方的性能是显而易见的,不太好,举例说明:


比如说,我们有一个SQL语句,select name from (select ... from ...) where ..=..;


此时,在执行计划解析出来的时候,其实就是按照他原封不动的样子,来解析成可以执行的计划,但是呢,Optimizer 在这里其实就会对执行计划进行优化,比如说,发现where 条件,其实可以放在子查询中,这样,子查询的数量大大变小,可以优化执行速度,此时,可能就会变成如下这样:select name from (select name from ...where ..=..)



5)、通过SparkPlan,生成最后的物理计划(PhysicalPlan)


到物理计划这里,那么其实就是非常"接地气"的计划了。就是说,已经很明朗了,从那几个文件读取什么数据,从那几个文件中读取,如何进行关联等等



6)、在executor中执行物理计划


逻辑的执行计划,更多的是偏向于逻辑,比如说吧,大致就是这种样子的,


From table students=>filter ... => select name ...


这里基本上,逻辑计划都是采用Tree ,树形结构



7)、生成RDD


Select name from students => 解析,从哪里去查询,students表,在哪个文件里,从哪个文件中查询哪些数据,比如说是name这个列,此外,复杂的SQL,还有,比如说查询时,是否对表中的数据进行过滤和筛选,更不用说,复杂时,需要有多表的JOIN(咋传统数据库中,比如MySQL,执行计划还涉及到如何扫描和利用索引)





4、spark-SQL性能优化



1)、设置shuffle过程的并行度:spark.sql.shuffle.partitions(SQLContext.setConf())



2)、在hive数据仓库建设过程中,合理设置数据类型,比如能设置为int的,就不要设置为bigInt,减少数据类型导致不必要的内存开销



3)、编写SQL时,尽量给出明确的列名,比如select name from students。不要写select * 的方式。



4)、并行处理查询结果:对于spark-SQL查询的结果,如果数据量比较大,比如超过1000条,那么就不要一次性的collect()到driver再处理,使用foreach()算子,并行处理查询结果


5)、缓存表:对于一条SQL语句可能对此使用到的表,可以对其进行缓存,使用 sqlContext.cacheTable(tableName),或者DataFrame.cache()即可,spark-SQL会用内存列存储的格式进行表的缓存,然后spark-sql就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存使用和GC开销,SQLContext.uncacheTable(tableName)可以将表从缓存中移除,用SQLContext。setConf(),设置spark.sql.inMemoryColumnarStorage.batchSize参数(默认10000),可以设置列存储的单位


6)、广播join表:spark.sql.autoBroadcastJoinThreshold,默认10485760 (10 MB)。在内存够用的情况下,可以增加其大小,参数设置了一个表在join的时候,最大在多大以内,可以被广播出去优化性能



5、Hive on Spark配置


1)、安转配置好Hive和Spark


2)、Set hive.execution.engine=spark;


3)、set spark.master=spark://mini1:7077





第三部分 spark-streaming



1, Dstream



Dstream是sparkStreaming的数据模型,本质就是一连串不间断的RDD,但是它是一个时间段的RDD.这些时间段的RDD源源不断的连接在一起。


这个时间可以自己设置,时间设置的越短,实时性越高,但是性能消耗也越大。




2, spark streaming从kafka获取数据,有哪几种方式?



有两种方式:


1.通过receiver的方式,


2,通过direct的方式,dirrect的方式需要自己来管理偏移量。




3, sparkStreaming和storm的区别



sparkStreaming是spark里面的一个做流式准实时计算的组件,它使用的数据结构是Dstream,Dstream里面是一连串时间片的rdd。


相比于storm,sparkStreaming在实时性,保证数据不丢失方面都不占用优势,spark streaming在spark支持者眼中的优势是spark Streaming具有高吞吐性,最本质来说,sparkStreaming相比于storm的优势是sparkStreaming可以和spark core,spark SQL无缝整合。




4.对于需要多次引用的,并且这个dstream计算时间特别耗时,数据特别重要,那么我们就需要对dstream进行checkpoint,(只有多次引用的,进行持久化就可以了),因为即使对这个dstream进行持久化,数据也可能会丢失,而checkpoint数据丢失的可能性小,但是这样会影响spark-streaming的数据吞吐量,因为在做计算的同时,还需要将数据写入到外部存储系统中,会降低spark性能,影响吞吐量,非必要情况下不建议使用



5.如何对dstream做checkpoint



首先设置还原点目录,其次调用dstream的checkpoint方法


【注意】:dstream的checkpoint的周期一定要是产生batch时间的整数倍,同时spark官方建议将checkpoint的时间设置为至少10秒。通常来说,将checkpoint间隔设置为窗口操作的滑动间隔的5-10倍




6.spark程序在启动时,会去这个checkpointPath目录下查看是否有保存的driver的元数据(1.dstream的操作转换关系,2.未处理完的batch)信息,当spark-streaming程序在二次启动后就会去checkpointPath目录下还原这个程序,加载未处理的batch元数据信息在内存中恢复,继续进行任务处理





7.为了保证spark-streaming程序7*24小时运行,那么我们程序应该具备高可靠性,怎样具备高可靠性?



a.程序出现故障,driver死掉了,流式程序应该具备自动重启的功能


b.没有计算完成的rdd在程序异常停止后,下次启动后还会将未处理的rdd进行处理


【注意】:要在spark_submit中,添加--deploy-mode参数,默认其值为client,即在提交应用的机器上启动driver,但是要能够自动重启driver,就必须将其值设置为cluster;此外,需要添加--supervise参数,失败后自动重启


//spark_submit --executor-memory 1g --total-execute-cores 5 --deploy-model cluster --supervise





8.启用预写机制


a.预写日志机制,简写为WAL,全称为Write Ahead Log,从spark1.2版本开始,就引入了基于容错的文件系统的WAL机制。如果启用该机制,Receiver接收到的所有数据都会写入配置的checkpoint目录中的预写日志。这中机制可以让driver在恢复的时候,避免数据丢失,并且可以确保整个实时计算过程中零数据丢失



0