大数据 : Hadoop reduce阶段
Mapreduce中由于sort的存在,MapTask和ReduceTask直接是工作流的架构。而不是数据流的架构。在MapTask尚未结束,其输出结果尚未排序及合并前,ReduceTask是又有数据输入的,因此即使ReduceTask已经创建也只能睡眠等待MapTask完成。从而可以从MapTask节点获取数据。一个MapTask最终的数据输出是一个合并的spill文件,可以通过Web地址访问。所以reduceTask一般在MapTask快要完成的时候才启动。启动早了浪费container资源。
ReduceTask是个线程,这个线程运行在YarnChild的Java虚拟机上,我们从ReduceTask.run开始看Reduce阶段。 获取更多大数据视频资料请加QQ群:947967114
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
if (isMapOrReduce()) {
/添加reduce过程需要经过的几个阶段。以便通知TaskTracker目前运 行的情况/
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical);
// 设置并启动reporter进程以便和TaskTracker进行交流
boolean useNewApi = job.getUseNewReducer();
//在job client中初始化job时,默认就是用新的API,详见Job.setUseNewAPI()方法
initialize(job, getJobID(), reporter, useNewApi);
/用来初始化任务,主要是进行一些和任务输出相关的设置,比如创建commiter,设置工作目录等/
// check if it is a cleanupJobTask
/以下4个if语句均是根据任务类型的不同进行相应的操作,这些方 法均是Task类的方法,所以与任务是MapTask还是ReduceTask无关/
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;//只是为了JobCleanup,做完就停
}
if () {
runJobSetupTask(umbilical, reporter);
return;
//主要是创建工作目录的FileSystem对象
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
//设置任务目前所处的阶段为结束阶段,并且删除工作目录
}
下面才是真正要成为reducer
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
//如果需要就创建combineCollector
Classextends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
//配置文件找mapreduce.job.reduce.shuffle.consumer.plugin.class默认是shuffle.class
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
//创建shuffle类对象
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
//创建context对象,ShuffleConsumerPlugin.Context
shuffleConsumerPlugin.init(shuffleContext);
//这里调用的起始是shuffle的init函数,重点摘要如下。
this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
//创建shuffle所需的调度器
merger = createMergeManager(context);
//创建shuffle内部的merge,createMergeManager里面源码:
return new MergeManagerImpl(reduceId, jobConf, context.getLocalFS(),
context.getLocalDirAllocator(), reporter, context.getCodec(),
context.getCombinerClass(), context.getCombineCollector(),
context.getSpilledRecordsCounter(),
context.getReduceCombineInputCounter(),
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
context.getMapOutputFile());
//创建MergeMnagerImpl对象和Merge线程
rIter = shuffleConsumerPlugin.run();
//从各个Mapper复制其输出文件,并加以合并排序,等待直到完成为止
// free up the data structures
mapOutputFilesOnDisk.clear();
sortPhase.complete();
//排序阶段完成
setPhase(TaskStatus.Phase.REDUCE);
//进入reduce阶段
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
//3.Reduce 1.Reduce任务的最后一个阶段。它会准备好Map的 keyClass("mapred.output.key.class""mapred.mapoutput.key.class"),valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和 Comparator ("mapred.output.value.groupfn.class"或"mapred.output.key.comparator.class")
if (useNewApi) {
//2.根据参数useNewAPI判断执行runNewReduce还是runOldReduce。分析润runNewReduce
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
//0.像报告进程书写一些信息,1.获得一个TaskAttemptContext对象。通过这个对象创建reduce、output及用于跟踪的统计output的RecordWrit、最后创建用于收集reduce结果的Context,2.reducer.run(reducerContext)开始执行reduce
} else {//老API
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
(1)reduce分为三个阶段(copy就是远程拷贝Map的输出数据、sort就是对所有的数据做排序、reduce做聚集就是我们自己写的reducer),为这三个阶段分别设置Progress,用来和TaskTracker通信报道状态。
(2)上面代码的15-40行和MapReduce的MapTask任务的运行源码级分析中对应部分基本相同,可参考之;
(3)codec = initCodec()这句是检查map的输出是否是压缩的,压缩的则返回压缩codec实例,否则返回null,这里讨论不压缩的;
(4)我们讨论完全分布式的hadoop,即isLocal==false,然后构造一个ReduceCopier对象reduceCopier,并调用reduceCopier.fetchOutputs()方法拷贝各个Mapper的输出,到本地;
(5)然后copy阶段完成,设置接下来的阶段是sort阶段,更新状态信息;
(6)根据isLocal来选择KV迭代器,完全分布式的会使用reduceCopier.createKVIterator(job, rfs, reporter)作为KV迭代器;
(7)sort阶段完成,设置接下来的阶段是reduce阶段,更新状态信息;
(8)然后获取一些配置信息,并根据是否使用新API选择不同的处理方式,这里是新的API,调用runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass)会执行reducer;
(9)done(umbilical, reporter)这个方法用于做结束任务的一些清理工作:更新计数器updateCounters();如果任务需要提交,设置Taks状态为COMMIT_PENDING,并利用TaskUmbilicalProtocol,汇报Task完成,等待提交,然后调用commit提交任务;设置任务结束标志位;结束Reporter通信线程;发送最后一次统计报告(通过sendLastUpdate方法);利用TaskUmbilicalProtocol报告结束状态(通过sendDone方法)。
有些人将Reduce Task分为了5个阶段:一、shuffle阶段:也称为Copy阶段,就是从各个MapTask上远程拷贝一片数据,如果大小超过一定阈值就写到磁盘,否则放入内存;二、Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多和磁盘文件过多;三、sort阶段:用户编写的reduce方法的输入数据是按key进行聚集的,需要对copy过来的数据排序,这里用的是归并排序,因为Map Task的结果是有序的;四、Reduce阶段:将每组数据依次交给用户编写的Reduce方法处理;五、write阶段:就是将结果写入HDFS。
上面的5个阶段分的比较细了,代码里分为3个阶段copy、sort、reduce,我们在eclipse运行MR程序时,控制台看到的reduce阶段的百分比就分为3个阶段各占33.3%。
这里的shuffleConsumerPlugin是实现了ShuffleConsumerPlugin的某个类对象。具体可以通过配置文件mapreduce.job.reduce.shuffle.consumer.plugin.class选项设置,默认情况下是使用shuffle。我们在代码中分析过完成shuffleConsumerPlugin.run,通常是shuffle.run,因为有了这个过程Mapper的合成的spill文件才能通过HTTP协议传输到Reducer端。有了数据才能进行runNewReducer或者runOldReducer。可以说shuffle对象就是MapTask的搬运工。而且shuffle的搬运方式不是一遍搬运一遍Reducer处理,而是要把MapTask所有的数据都搬运过来,并且进行合并排序之后才开始提供给对应的Reducer。
一般而言,MapTask和ReduceTask是多对多的关系,假如有M个Mapper有N个Reducer。我们知道N个Reducer对应着N个partition,所以每个Mapper都会被划分成N个Partition,每个Reducer承担着一个Partition部分的操作。这样每一个Reducer从每个不同的Mapper内拿来属于自己的那部分数据,这样每个Reducer就有M份不同Mapper的数据,把M份数据合并在一起就是一个最终完整的Partition,有必要还会进行排序,这时候才成为了Reducer的具体输入数据。这个数据搬运和重组的过程被叫做shuffle过程。shuffle这个过程开销颇大,会占用较大的网络流量,因为涉及到大量数据的传输,shuffle过程也会有延迟,因为M个Mapper的计算有快有慢,但是shuffle要所有的Mapper完成才能开始,Reduce又必须等shuffle完成才能开始,当然这种延迟不是shuffle造成的,如果Reducer不需要全部Partition数据到位并排序,就不用与最慢的Mapper同步,这是排序付出的代价。
所以shuffle在MapReduce框架中起着非常重要的作用。我们先看shuffle的摘要: 获取更多大数据视频资料请加QQ群:947967114
public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter
private ShuffleConsumerPlugin.Context context;
private TaskAttemptID reduceId;
private JobConf jobConf;
private TaskUmbilicalProtocol umbilical;
private ShuffleSchedulerImpl scheduler;
private MergeManager merger;
private Task reduceTask; //Used for status updates
private Map localMapFiles;
public void init(ShuffleConsumerPlugin.Context context)
public RawKeyValueIterator run() throws IOException, InterruptedException
在ReduceTask.run中看到调用了shuffle.init,在run理创建了ShuffleSchedulerImpl和MergeManagerImpl对象。后面会讲解就是是做什么用的。
之后就是对shuffle.run的调用,shuffle虽然有一个run但是并非是一个线程,只是用了这个名字而已。
我们看:ReduceTask.run->Shuffle.run
public RawKeyValueIterator run() throws IOException, InterruptedException {
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher eventFetcher =
new EventFetcher(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
//通过查看EventFetcher我们看到他继承了Thread,所以他是一个线程
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher[] fetchers = new Fetcher[numFetchers];
//创建了一个线程池
if (isLocal) {
//如果Mapper和Reducer在同一台机器上,就在本地fetche
fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
//LocalFetcher是对Fetcher的扩展,也是线程。
fetchers[0].start();//本地Fecher只有一个
} else {
//Mapper集合Reducer不在同一个机器上,需要跨多个节点Fecher
for (int i=0; i < numFetchers; ++i) {
//启动所有的Fecher
fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
//创建Fecher线程
fetchers[i].start();
//跨节点的Fecher需要好多个,都需要开启
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
//等待所有的Fecher都完成,如果有超时情况就报告进度
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
//关闭eventFetcher,代表shuffle操作完成,所有的MapTask的数据都拷贝过来了
// Stop the map-output fetcher threads
for (Fetcher fetcher : fetchers) {
fetcher.shutDown();//关闭所有的fetcher。
}
// stop the scheduler
scheduler.close();
//也不需要shuffle的调度,所以关闭
copyPhase.complete(); // copy is already complete
//文件复制阶段结束
以下就是Reduce阶段的MergeSort了
taskStatus.setPhase(TaskStatus.Phase.SORT);
//完成排序
reduceTask.statusUpdate(umbilical);
//通过umbilical向MRAppMaster汇报,更新状态
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
//合并和排序,完成后返回一个队列kvIter 。
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
数据从MapTask转移到ReduceTask就两种方式,一MapTask送,二ReduceTask取,hadoop采用的是第二种方式,就是文件的复制。在Shuffle进入run之前,RduceTask.run调用过他的init函数shuffleConsumerPlugin.init(shuffleContext),在init里创建了scheduler和用于合并排序的merge,进入run后又创建了EventFetcher线程和若干个Fetcher线程。Fetcher的作用就是拿取,向MapTask节点提取数据。但是我们要清楚EventFetcher虽然也是Fetcher,但是提取的是event,不是数据本身。我们可以认为它只是对Fetcher过程的一个事件的控制。
Fetcher线程的数量也不一定,Uber模式下,MapTask和ReduceTask在同一个节点上,并且只有一个MapTask,所以只有一个Fetcher就能够完成,而且这个Fetcher是localFetcher。如果不是Uber模式可能会有很多MapTask并且一般和ReduceTask不在同一个节点。这时Fetcher的数量可以进行配置,默认有5个。数组fetchers就相当于Fetcher的线程池。
创建了EventFetcher和Fetcher线程池后,进入了while循环,但是while循环什么都不做,一直等待,所以实际的操作都是在线程完成的,也就是通过EventFetcher和若干的Fetcher完成。EventFetcher起到了非常关键的枢纽的作用。
我们查看EventFetcher的源代码摘要,我们提取关键的东西:
class EventFetcher extends Thread {
private final TaskAttemptID reduce;
private final TaskUmbilicalProtocol umbilical;
private final ShuffleScheduler scheduler;
private final int maxEventsToFetch;
public void run() {
int failures = 0;
LOG.info(reduce + " Thread started: " + getName());
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {//线程没有被打断
try {
int numNewMaps = getMapCompletionEvents();
//获取Map的完成的事件,接着我们看getMapCompletionEvents源代码:
protected int getMapCompletionEvents()
throws IOException, InterruptedException {
int numNewMaps = 0;
TaskCompletionEvent events[] = null;
do {
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(
(org.apache.hadoop.mapred.JobID)reduce.getJobID(),
fromEventIdx,
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID)reduce);
//汇报umbilical从MRAppMaster获取Map完成的时间的报告
events = update.getMapTaskCompletionEvents();
//获取有关具体的MapTask结束运行的情况
LOG.debug("Got " + events.length + " map completion events from " +
fromEventIdx);
assert !update.shouldReset() : "Unexpected legacy state";
//做了一个断言 获取更多大数据视频资料请加QQ群:947967114
// Update the last seen event ID
fromEventIdx += events.length;
// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
// fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
// outputs at all.
for (TaskCompletionEvent event : events) {
//对于获取的每个事件的报告
scheduler.resolve(event);
//这里使用了ShuffleSchedullerImpl.resolve函数,源代码如下:
public void resolve(TaskCompletionEvent event) {
switch (event.getTaskStatus()) {
case SUCCEEDED://如果成功
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());//获取其URI
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
//记录这个MapTask的节点主机记录下来,供Fetcher使用,getBaseURI的源代码:
static URI getBaseURI(TaskAttemptID reduceId, String url) {
StringBuffer baseUrl = new StringBuffer(url);
if (!url.endsWith("/")) {
baseUrl.append("/");
}
baseUrl.append("mapOutput?job=");
baseUrl.append(reduceId.getJobID());
baseUrl.append("&reduce=");
baseUrl.append(reduceId.getTaskID().getId());
baseUrl.append("&map=");
URI u = URI.create(baseUrl.toString());
return u;
获取各种信息,然后添加都URI对象中。
}
回到源代码
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
//最大的尝试时间
break;
case FAILED:
case KILLED:
case OBSOLETE://如果MapTask运行失败
obsoleteMapOutput(event.getTaskAttemptId());//获取TaskId
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");//写日志
break;
case TIPFAILED://如果失败
tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");//写日志
break;
}
}
回到源代码
if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {//如果事件成功
++numNewMaps;//增加map数量
}
}
} while (events.length == maxEventsToFetch);
return numNewMaps;
}
回到源代码
failures = 0;
if (numNewMaps > 0) {
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
}
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.info("EventFetcher is interrupted.. Returning");
return;
} catch (IOException ie) {
LOG.info("Exception in getting events", ie);
// check to see whether to abort
if (++failures >= MAX_RETRIES) {
throw new IOException("too many failures downloading events", ie);//失败数量大于重试的数量
}
// sleep for a bit
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(RETRY_PERIOD);
}
}
}
} catch (InterruptedException e) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
return;
}
}
MapTask和ReduceTask没有直接的关系,MapTask不知道ReduceTask在哪些节点上,它只是把进度的时间报告给MRAppMaster。ReduceTask通过"脐带"执行getMapCompletionEvents操作想MRAppMaster获取MapTask结束运行的时间报告。有个别的MapTask可能会失败,但是绝大多数都会成功,只要成功的就通过Fetcher去索取输出数据,这个信息就是通过shcheduler完成的也就是ShuffleSchedulerImpl对象,ShuffleSchedulerImpl对象并不多,只是个普通的对象。
fetchers就像线程池,里面有若干线程(默认有5个),这些线程等待EventFetcher的通知,一旦有MapTask完成就前往提取数据。
获取更多大数据视频资料请加QQ群:947967114
我们看Fetcher线程类的run方法:
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
//从scheduler获取一个已经成功完成的MapTask的节点。
metrics.threadBusy();
//线程变成繁忙状态
// Shuffle
copyFromHost(host);
//开始复制这个节点的数据
} finally {
if (host != null) {//maphost还有运行中的
scheduler.freeHost(host);
//状态设置成空闲状态,等待其完成。
metrics.threadFree();
}
}
}
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}
这里的重点是copyFromHost获取数据的函数。
protected void copyFromHost(MapHost host) throws IOException {
// reset retryStartTime for a new host
//这是在ReduceTask的节点上运行的
retryStartTime = 0;
// Get completed maps on 'host'
List
//获取目标节点上的MapTask集合。
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (maps.size() == 0) {
return;//没有完成的直接返回
}
if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
- maps);
}
// List of maps to be fetched yet
Set remaining = new HashSet(maps);
//已经完成、等待shuffle的MapTask集合。
// Construct the url and connect
DataInputStream input = null;
URL url = getMapOutputURL(host, maps);
//生成MapTask所在节点的URL,下面要看getMapOutputURL源码:
private URL getMapOutputURL(MapHost host, Collection maps
) throws MalformedURLException {
// Get the base url
StringBuffer url = new StringBuffer(host.getBaseUrl());
boolean first = true;
for (TaskAttemptID mapId : maps) {
if (!first) {
url.append(",");
}
url.append(mapId);//在URL后面加上mapid
first = false;
}
LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
//写日志
return new URL(url.toString());
//返回URL
}
回到主代码:
try {
setupConnectionsWithRetry(host, remaining, url);
//和对方主机建立HTTP连接,setupConnectionsWithRetry使用了openConnectionWithRetry函数打开链接。
openConnectionWithRetry(host, remaining, url);
这段源代码有使用了openConnection(url);方式,继续查看。
如下是链接的主要过程:
protected synchronized void openConnection(URL url)
throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
//使用的是HTTPURL进行连接
if (sslShuffle) {//如果是有信任证书的
HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
//强转conn类型
try {
httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());//添加一个证书socket的工厂
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
}
connection = conn;
}
在setupConnectionsWithRetry中继续写到:
setupShuffleConnection(encHash);
//建立了Shuffle链接
connect(connection, connectionTimeout);
// verify that the thread wasn't stopped during calls to connect
if (stopped) {
return;
}
verifyConnection(url, msgToEncode, encHash);
}
//至此连接通过。
if (stopped) {
abortConnect(host, remaining);
//这里边是关闭连接,可以点进去看一下,满足列表和等待的两个条件
return;
}
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
回到主代码
input = new DataInputStream(connection.getInputStream());
//实例一个输入流对象。
try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
//如果需要fetcher的列表不空,并且失败的task数量没有
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
//复制数据出来copyMapOutput的源代码如下:
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
mapId = TaskAttemptID.forName(header.mapId);
//获取mapID
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
//Don't know which one was bad, so consider all of them as bad
return remaining.toArray(new TaskAttemptID[remaining.size()]);
}
InputStream is = input;
is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
//如果需要解压或解密
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
return new TaskAttemptID[] {mapId};
}
if(LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
}
try {
mapOutput = merger.reserve(mapId, decompressedLength, id);
//为merge预留一个MapOutput:是内存还是磁盘上。
} catch (IOException ioe) {
// kill this reduce attempt
ioErrs.increment(1);
scheduler.reportLocalError(ioe);
//报告错误
return EMPTY_ATTEMPT_ID_ARRAY;
}
// Check if we can shuffle now ...
if (mapOutput == null) {
LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
}
// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed
try {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map "
mapOutput.getMapId() + " decomp: " + decompressedLength
- " len: " + compressedLength + " to " + mapOutput.getDescription());
mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
//跨节点把Mapper的文件内容拷贝到reduce的内存或者磁盘上。
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e);
}
// Inform the shuffle scheduler
long endTime = Time.monotonicNow();
// Reset retryStartTime as map task make progress if retried before.
retryStartTime = 0;
scheduler.copySucceeded(mapId, host, compressedLength,
startTime, endTime, mapOutput);
//告诉调度器完成了一个节点的Map输出的文件拷贝。
remaining.remove(mapId);
//这个MapTask的输出已经shuffle完毕
metrics.successFetch();
return null;后面的异常失败信息我们不管。
这里的mapOutput是用来容纳MapTask输出文件的存储空间,根据输出文件的内容大小和内存的情况,可以是内存的Output也可以是DiskOutput。 如果是内存需要预约,因为不止一个Fetcher。我们以InMemoryMapOutput为例。
代码结构;
Fetcher.run-->copyFromHost-->copyMapOutput-->merger.reserve(MergeManagerImpl.reserve)-->InmemoryMapOutput.shuffle
public void shuffle(MapHost host, InputStream input,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
//跨节点从Mapper拷贝spill文件
IFileInputStream checksumIn =
new IFileInputStream(input, compressedLength, conf);
//校验和的输入流
input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
//如果涉及到了压缩
decompressor.reset();
//重启解压器
input = codec.createInputStream(input, decompressor);
//加了解压器的输入流
}
try {
IOUtils.readFully(input, memory, 0, memory.length);
//从Mapper方把特定的Partition数据读入Reducer的内存缓冲区。
metrics.inputBytes(memory.length);
reporter.progress();//汇报进度
LOG.info("Read " + memory.length + " bytes from map-output for " +
getMapId());
/**
We've gotten the amount of data we were expecting. Verify the
decompressor has nothing more to offer. This action also forces the
decompressor to read any trailing bytes that weren't critical
for decompression, which is necessary to keep the stream
- in sync.
*/
if (input.read() >= 0 ) {
throw new IOException("Unexpected extra bytes from input stream for " +
getMapId());
}
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input);
// Re-throw
throw ioe;
} finally {
CodecPool.returnDecompressor(decompressor);
//释放解压器
}
}
从对方把spill文件中属于本partition数据复制过来,回到copyFromHost中,通过scheduler.copySuccessed告知scheduler,并把这个MapTask的ID从remaining集合中删除,进入下一个循环,复制下一个MapTask数据。直到把所有的属于本Partition的数据都复制过来。
以上是Reducer端Fetcher的过程,它向Mapper端发送HTTP GET请求,下载文件。在MapTask就有一个与之对应的Server,这个网络协议的源代码不做深究,课下有兴趣自己研究。 获取更多大数据视频资料请加QQ群:947967114