三、MapReduce的shuffle工作过程
[TOC]
一、基本流程
1、流程
map端:
1)假设有两个map task并行运行。
2)每个map task任务处理完成后,会通过收集器 collector,将输出结果存入一个环形缓冲区中。
3)环形缓冲区工作原理:
1>环形缓冲区默认大小为100M,可以配置 mapred-site.xml:mapreduce.task.io.sort.mb 来配置大小2>环形缓冲区阈值为 80%,超过就会开始spill溢写。可以配置mapred-site.xml:mapreduce.map.sort.spill.percent 来配置阈值的百分比3>环形缓冲区存储两种数据,一种是元数据:KV的分区号,key的起始位置,value的起始位置,value长度。每个元数据长度固定为4个int长度一种是原始数据:存储key和value原本的数据4>在元数据和原始数据的起始点,会有一个分界线,用于区分两种数据的存储区域,然后两者往相反方向开始写入数据。5>当环形缓冲区超过80%时,会将这80%的数据锁定,然后溢写到磁盘中变成小文件,并且这个过程中,这80%的空间不能写入数据(由后台一个新的线程来执行溢写)。同时剩下的20%可以继续写入数据。直到溢写结束,解除80%的空间锁定。
4)spill:当缓冲器空间超过80%时,一个后台线程会启动,开始溢写成小文件,写入磁盘。在这个过程中,会对缓冲区中的元数据根据先根据分区号(每个分区一个溢写文件),然后同一分区内根据key进行排序(这里的排序算法使用快速排序)。接着根据排好序的元数据,溢写相应的原始数据。最后得到已经分区且分区内已根据key排好序的溢写文件。
同时在溢写最后一步,可以加入combine过程(可选的)。
这里说到是将元数据进行排序,然后根据排好序的元数据溢写相应的原始数据。为什么要这样呢?因为排序过程中涉及到数据的移动,而原始数据比起元数据一般都要大,所以涉及到移动成本(包括内存空间,cpu等成本)比较大。所以这里直接根据原始数据中的key来排序元数据,最后形成一个有序的元数据区域。最后只要依次根据元数据,从原始数据区域中读取对应位置的KV,就可以得到有序的原始数据。
5)归并排序:当溢写完成后,一般会发生多次溢写,从而生成多个已分区,并且区内有序的溢写文件。接着就是将同一分区的多个溢写文件进行归并排序,合成一个大的溢写文件,且里面是有序的。这个过程中也可以加入combine过程(可选的)。其实这个归并的过程是分多次进行的,不是一次性完成的。
6)最后将归并完后的溢写文件经过压缩写入到磁盘中。至此shuffle在map端的流程已经完成
reduce端:一个分区对应一个reduce task
7)reducer中有的MRAppMaster线程定期询问map task输出结果的文件位置,mapper会在map结束后向MRAppMaster汇报信息,从而reducer得到map的状态,得到map的结果文件的目录。接着reduce会自动向多个map拉取同一分区的结果文件到本地。拉取过程中,会先将数据暂存到缓冲区中,默认是100M,也是环形缓冲区。当数据量大于缓冲区大小时,就会将数据写入到磁盘中。
8)归并排序merge:拉取完成后,会将多个结果文件进行归并排序,最终合成一个大的有序文件。这个merge 的过程中,会涉及到数据的输入和输出是在哪的,比如是输入输出都是内存中的,输入内存,输出硬盘的;输入硬盘,输出也是硬盘的。方式不同,明显性能肯定不同。这是MapReduce优化的一个点
9)接着会进行分组group过程。将同一个key的键值对合并成(key, array)的形式。如:(king,1), (king,2) 合并成 (king,[1,2])。这里面可以自定义分组方式。
10)而后面接着的group操作,一个分组就只会调用一次reduce方法,而且默认只会使用分组中的第一个KV作为reduce的输入,剩余的KV不会做处理,直接丢弃。这里可以自定义分组类。
11)merge,group过程完毕后,就会每个KV调用一次reduce方法,最终reduce输出。
2、shuffle过程中的重点过程
partition:分区
spill:溢写
merge:迁移合并
sort:排序。有3次排序。分别是溢写中的快速排序,将多个溢写文件合并的归并排序。以及在reduce端中将多个map的结果文件进行归并排序。
combine:map端的初次合并,业务逻辑就是reduce,只不过是局部的,这个过程是可选的。但是可以作为优化的点,因为可以减少reduce从map拉取数据的数据量。
3、map端merge后的溢出文件的结构
(1)存储结构
在map端溢出的文件其实有两部分,一部分是索引文件,一部分是数据本身。
![](E:\file\big data\picture\assets\MapReduce-mapMerge.png)
索引文件:主要是记录各个分区在数据文件中的偏移量。
数据文件:记录了KV的长度,KV的数据。
(2)特点
其实由此可以看出,map端存储多个分区的溢写结果时,并不是独立存储到独立的文件中的,而是存储到同一个文件中,然后通过索引文件来标识各个分区的数据在总的数据文件中的偏移量来读取不同分区的数据。这个存储方式的好处是,如果分区很多话,单独存储每个分区的数据,就会生成多个文件,占用多个hdfs的索引资源。而采用上述的方式,则只需读取两个文件。