Shuffle流程是什么
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容介绍了"Shuffle流程是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!shuffle
千家信息网最后更新 2025年01月23日Shuffle流程是什么
本篇内容介绍了"Shuffle流程是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
shuffle流程源码解读
1、从WordCountMapper类中的map方法中写出kv后,进入shuffle流程 --context.write(outK,outV);进入TaskInputOutputContext中的write()方法 --看下就过进入WrappedMapper.java中的mapContext.write(key, value);方法 //112行进入TaskInputOutputContextImpl.java 中output.write(key, value);方法 //89行最终定位到MapTask的write()方法内, //726行
2、重点步骤,收集器对象将kv收集到缓冲区,并在收集前将kv的分区号计算出来.collector.collect(key, value,partitioner.getPartition(key, value, partitions));第一次进入该方法时,因为没有设置reduce的个数,所以最终返回的永远是0号分区
3、定位到MapTask类中的collect方法并进入 //1082行bufferRemaining -= METASIZE; //计算缓冲区剩余大小,该行代码前面的代码是对kv类型的一个判断如果bufferRemaining < 0 则开始进行溢写操作,内部是对数据的一些校验和计算
4、定位到startSpill(); --1126行 //只有当溢写数据大小满足80%时,才会触发该操作WordCountMapper持续往缓冲区写数据,当达到溢写条件80%时,开始溢写
5、进入到startSpill()方法内部 --MapTask类1590行spillReady.signal(); //1602行 --线程通信, 通知溢写线程开始干活//执行溢写线程(MapTask内部类SpillThread)的run方法//run方法中调用MapTask$MapOutputBuffer中的sortAndSpill()方法直接执行下面的排序和溢写方法 --sortAndSpill()方法 --MapTask的1605行
6、定位到1615行final SpillRecord spillRec = new SpillRecord(partitions); //根据分区数创建溢写记录对象--排序按照分区排序,溢写按照分区溢写final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//获取溢写文件名称 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),这时还没有溢写文件,只有目录out = rfs.create(filename); //创建执行改步后,在上述的目录下生成溢写文件spill0.out文件
7、继续向下走,定位到MapTask类的1625行sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢写前排序8、定位到1629行,进入for循环 --按照分区进行溢写9、分析for循环内代码,看具体溢写过程 9.1 先有一个writer对象,通过该对象来完成数据溢写 writer = new Writer(job, partitionOut, keyClass, valClass, codec, 9.2 判断是否有设置combinerRunner对象 如果有,则按照设置的combinerRunner业务去处理; 如果没有,则走默认的溢写规则10、执行到1667行,即writer.close();方法,本次溢写完毕,此时我们再去看溢写文件spill0.out文件有数据
11、if (totalIndexCacheMemory >= indexCacheMemoryLimit(大小为:1M)) {} //MapTask类的1685行// 如果索引数据超过指定的内存大小,也需要溢写到文件中.(该现象一般情况很难发生.)
12、当本次溢写完毕之后,继续回到WordCountMapper类中的map方法内的context.write(outk,outv);方法处--说明:因为我们使用本地debug模式调试,所以看不到并行的效果,只能是串行效果,因此看到的是当内存内读取满足80%时,发生溢写操作,其实溢写并未停止,只不过我们看不到,剩余的溢写数据在20%内存进行
13、如上溢写过程,在整个mapTask中会出现N次,具体多少看数据量. 如果map中最后的数据写到缓冲区,但是没有满足80%溢写条件的情况,最终也需要将缓冲区的数据刷写到磁盘(最后一次溢写)。最后一次会发生在 MapTask中关闭 NewOutputCollector对象的时候.即在该行代码处发生 output.close(mapperContext); --MapTask的805行14、进入output.close(mapperContext);方法内 --MapTask的732行定位到collector.flush();方法 // 735行-->将缓冲区的数据刷写到磁盘-->重新走sortAndSpill()方法(最后一次刷写)
上述流程,每发生一次溢写就会生成一个溢写小文件(溢写文件内的数据是排好序的)最终所有的数据都写到磁盘中后,在磁盘上就是多个溢写文件, 比如:spill0.out,spill1.out,...spillN.out
15、溢写全部完成之后,就进入归并操作 --MapTask的1527行mergeParts();方法,进入该方法,定位到MapTask的1844行filename[0]: /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out
16、继续向下走,定位到MapTask的1880行Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); --归并后,最终输出的文件路径/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/file.out17、继续向下走,定位到MapTask的1882行Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); --归并后,最终输出文件的索引文件/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/file.out.index18、创建file.out 文件 FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);19、for (int parts = 0; parts < partitions; parts++) {} //1925行,按照分区进行归并排序20、for循环内具体的归并操作 //1950行 RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase(), TaskType.MAP);
21、归并后的数据写出到文件Writerwriter = new Writer (job, finalPartitionOut, keyClass, valClass, codec,spilledRecordsCounter); //1961行//归并也可以使用combiner,但是前提条件是设置了combiner,并且溢写次数大于等于3 if (combinerRunner == null || numSpills < minSpillsForCombine(3)) { Merger.writeFile(kvIter, writer, reporter, job);} else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector);}22、归并完成writer.close(); //1972行
23、写出索引文件spillRec.writeToFile(finalIndexFile, job); //1986行24、删除所有的溢写文件spill0.out spill1.out ... spill0.out,只保留最终的输出文件。for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true);}
"Shuffle流程是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
方法
文件
数据
定位
对象
缓冲区
缓冲
流程
排序
代码
大小
磁盘
输出
内存
情况
条件
索引
线程
过程
循环
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
青少年网络安全个人体会
湖北数字化城管软件开发
网络安全大会5点主张
下列属于数据库管理系统有
网络安全监管部门职责
农业专题数据库的检索路径
调用数据库的方法有哪些
视频设备 网络安全认证
无线网络技术支撑工资
镇网络安全总结
数据库储存怎么
崂山区定制软件开发解决方案
软件开发还是会计专业好
育碧服务器
网络安全宣传校园日活动方案
网络安全类型及应对措施
网络技术实验教师评语
央视快评网络技术
四级的网络安全等级
有哪些网络技术适合女生可以学
服务器的卡槽指的是显卡吗
软件开发培训专业
流媒体服务器软件生产厂
不常用的服务器
计算机网络技术学院报名
网络安全人
四节点服务器
网络文明网络安全 内容
数据库安全性试题
浙江华为服务器维修维保哪家好