千家信息网

Shuffle流程是什么

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容介绍了"Shuffle流程是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!shuffle
千家信息网最后更新 2025年01月23日Shuffle流程是什么

本篇内容介绍了"Shuffle流程是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

shuffle流程源码解读

1、从WordCountMapper类中的map方法中写出kv后,进入shuffle流程        --context.write(outK,outV);进入TaskInputOutputContext中的write()方法                     --看下就过进入WrappedMapper.java中的mapContext.write(key, value);方法      //112行进入TaskInputOutputContextImpl.java 中output.write(key, value);方法   //89行最终定位到MapTask的write()方法内,        //726行
2、重点步骤,收集器对象将kv收集到缓冲区,并在收集前将kv的分区号计算出来.collector.collect(key, value,partitioner.getPartition(key, value, partitions));第一次进入该方法时,因为没有设置reduce的个数,所以最终返回的永远是0号分区
3、定位到MapTask类中的collect方法并进入              //1082行bufferRemaining -= METASIZE;  //计算缓冲区剩余大小,该行代码前面的代码是对kv类型的一个判断如果bufferRemaining < 0 则开始进行溢写操作,内部是对数据的一些校验和计算
4、定位到startSpill(); --1126行    //只有当溢写数据大小满足80%时,才会触发该操作WordCountMapper持续往缓冲区写数据,当达到溢写条件80%时,开始溢写
5、进入到startSpill()方法内部               --MapTask类1590行spillReady.signal(); //1602行             --线程通信, 通知溢写线程开始干活//执行溢写线程(MapTask内部类SpillThread)的run方法//run方法中调用MapTask$MapOutputBuffer中的sortAndSpill()方法直接执行下面的排序和溢写方法          --sortAndSpill()方法    --MapTask的1605行
6、定位到1615行final SpillRecord spillRec = new SpillRecord(partitions); //根据分区数创建溢写记录对象--排序按照分区排序,溢写按照分区溢写final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//获取溢写文件名称 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),这时还没有溢写文件,只有目录out = rfs.create(filename);           //创建执行改步后,在上述的目录下生成溢写文件spill0.out文件

7、继续向下走,定位到MapTask类的1625行sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);               //溢写前排序8、定位到1629行,进入for循环      --按照分区进行溢写9、分析for循环内代码,看具体溢写过程        9.1 先有一个writer对象,通过该对象来完成数据溢写                writer = new Writer(job, partitionOut, keyClass, valClass, codec,        9.2 判断是否有设置combinerRunner对象                如果有,则按照设置的combinerRunner业务去处理;                如果没有,则走默认的溢写规则10、执行到1667行,即writer.close();方法,本次溢写完毕,此时我们再去看溢写文件spill0.out文件有数据

11、if (totalIndexCacheMemory >= indexCacheMemoryLimit(大小为:1M)) {}      //MapTask类的1685行// 如果索引数据超过指定的内存大小,也需要溢写到文件中.(该现象一般情况很难发生.)
12、当本次溢写完毕之后,继续回到WordCountMapper类中的map方法内的context.write(outk,outv);方法处--说明:因为我们使用本地debug模式调试,所以看不到并行的效果,只能是串行效果,因此看到的是当内存内读取满足80%时,发生溢写操作,其实溢写并未停止,只不过我们看不到,剩余的溢写数据在20%内存进行
13、如上溢写过程,在整个mapTask中会出现N次,具体多少看数据量. 如果map中最后的数据写到缓冲区,但是没有满足80%溢写条件的情况,最终也需要将缓冲区的数据刷写到磁盘(最后一次溢写)。最后一次会发生在 MapTask中关闭 NewOutputCollector对象的时候.即在该行代码处发生    output.close(mapperContext);   --MapTask的805行14、进入output.close(mapperContext);方法内    --MapTask的732行定位到collector.flush();方法 // 735行-->将缓冲区的数据刷写到磁盘-->重新走sortAndSpill()方法(最后一次刷写)

上述流程,每发生一次溢写就会生成一个溢写小文件(溢写文件内的数据是排好序的)最终所有的数据都写到磁盘中后,在磁盘上就是多个溢写文件, 比如:spill0.out,spill1.out,...spillN.out
15、溢写全部完成之后,就进入归并操作          --MapTask的1527行mergeParts();方法,进入该方法,定位到MapTask的1844行filename[0]: /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out

16、继续向下走,定位到MapTask的1880行Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);   --归并后,最终输出的文件路径/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/file.out17、继续向下走,定位到MapTask的1882行Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);   --归并后,最终输出文件的索引文件/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/file.out.index18、创建file.out 文件        FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);19、for (int parts = 0; parts < partitions; parts++) {}  //1925行,按照分区进行归并排序20、for循环内具体的归并操作        //1950行        RawKeyValueIterator kvIter = Merger.merge(job, rfs,                         keyClass, valClass, codec,                         segmentList, mergeFactor,                         new Path(mapId.toString()),                         job.getOutputKeyComparator(), reporter, sortSegments,                         null, spilledRecordsCounter, sortPhase.phase(),                         TaskType.MAP);
21、归并后的数据写出到文件Writer writer = new Writer(job, finalPartitionOut, keyClass, valClass, codec,spilledRecordsCounter); //1961行//归并也可以使用combiner,但是前提条件是设置了combiner,并且溢写次数大于等于3 if (combinerRunner == null || numSpills < minSpillsForCombine(3)) {      Merger.writeFile(kvIter, writer, reporter, job);} else {      combineCollector.setWriter(writer);      combinerRunner.combine(kvIter, combineCollector);}22、归并完成writer.close();         //1972行

23、写出索引文件spillRec.writeToFile(finalIndexFile, job); //1986行24、删除所有的溢写文件spill0.out spill1.out ... spill0.out,只保留最终的输出文件。for(int i = 0; i < numSpills; i++) {       rfs.delete(filename[i],true);}

"Shuffle流程是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0