Shuffle流程是什么
发表于:2025-02-24 作者:千家信息网编辑
千家信息网最后更新 2025年02月24日,本篇内容介绍了"Shuffle流程是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!shuffle
千家信息网最后更新 2025年02月24日Shuffle流程是什么
本篇内容介绍了"Shuffle流程是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
shuffle流程源码解读
1、从WordCountMapper类中的map方法中写出kv后,进入shuffle流程 --context.write(outK,outV);进入TaskInputOutputContext中的write()方法 --看下就过进入WrappedMapper.java中的mapContext.write(key, value);方法 //112行进入TaskInputOutputContextImpl.java 中output.write(key, value);方法 //89行最终定位到MapTask的write()方法内, //726行
2、重点步骤,收集器对象将kv收集到缓冲区,并在收集前将kv的分区号计算出来.collector.collect(key, value,partitioner.getPartition(key, value, partitions));第一次进入该方法时,因为没有设置reduce的个数,所以最终返回的永远是0号分区
3、定位到MapTask类中的collect方法并进入 //1082行bufferRemaining -= METASIZE; //计算缓冲区剩余大小,该行代码前面的代码是对kv类型的一个判断如果bufferRemaining < 0 则开始进行溢写操作,内部是对数据的一些校验和计算
4、定位到startSpill(); --1126行 //只有当溢写数据大小满足80%时,才会触发该操作WordCountMapper持续往缓冲区写数据,当达到溢写条件80%时,开始溢写
5、进入到startSpill()方法内部 --MapTask类1590行spillReady.signal(); //1602行 --线程通信, 通知溢写线程开始干活//执行溢写线程(MapTask内部类SpillThread)的run方法//run方法中调用MapTask$MapOutputBuffer中的sortAndSpill()方法直接执行下面的排序和溢写方法 --sortAndSpill()方法 --MapTask的1605行
6、定位到1615行final SpillRecord spillRec = new SpillRecord(partitions); //根据分区数创建溢写记录对象--排序按照分区排序,溢写按照分区溢写final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//获取溢写文件名称 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),这时还没有溢写文件,只有目录out = rfs.create(filename); //创建执行改步后,在上述的目录下生成溢写文件spill0.out文件
7、继续向下走,定位到MapTask类的1625行sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢写前排序8、定位到1629行,进入for循环 --按照分区进行溢写9、分析for循环内代码,看具体溢写过程 9.1 先有一个writer对象,通过该对象来完成数据溢写 writer = new Writer(job, partitionOut, keyClass, valClass, codec, 9.2 判断是否有设置combinerRunner对象 如果有,则按照设置的combinerRunner业务去处理; 如果没有,则走默认的溢写规则10、执行到1667行,即writer.close();方法,本次溢写完毕,此时我们再去看溢写文件spill0.out文件有数据
11、if (totalIndexCacheMemory >= indexCacheMemoryLimit(大小为:1M)) {} //MapTask类的1685行// 如果索引数据超过指定的内存大小,也需要溢写到文件中.(该现象一般情况很难发生.)
12、当本次溢写完毕之后,继续回到WordCountMapper类中的map方法内的context.write(outk,outv);方法处--说明:因为我们使用本地debug模式调试,所以看不到并行的效果,只能是串行效果,因此看到的是当内存内读取满足80%时,发生溢写操作,其实溢写并未停止,只不过我们看不到,剩余的溢写数据在20%内存进行
13、如上溢写过程,在整个mapTask中会出现N次,具体多少看数据量. 如果map中最后的数据写到缓冲区,但是没有满足80%溢写条件的情况,最终也需要将缓冲区的数据刷写到磁盘(最后一次溢写)。最后一次会发生在 MapTask中关闭 NewOutputCollector对象的时候.即在该行代码处发生 output.close(mapperContext); --MapTask的805行14、进入output.close(mapperContext);方法内 --MapTask的732行定位到collector.flush();方法 // 735行-->将缓冲区的数据刷写到磁盘-->重新走sortAndSpill()方法(最后一次刷写)
上述流程,每发生一次溢写就会生成一个溢写小文件(溢写文件内的数据是排好序的)最终所有的数据都写到磁盘中后,在磁盘上就是多个溢写文件, 比如:spill0.out,spill1.out,...spillN.out
15、溢写全部完成之后,就进入归并操作 --MapTask的1527行mergeParts();方法,进入该方法,定位到MapTask的1844行filename[0]: /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out
16、继续向下走,定位到MapTask的1880行Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); --归并后,最终输出的文件路径/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/file.out17、继续向下走,定位到MapTask的1882行Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); --归并后,最终输出文件的索引文件/tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/file.out.index18、创建file.out 文件 FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);19、for (int parts = 0; parts < partitions; parts++) {} //1925行,按照分区进行归并排序20、for循环内具体的归并操作 //1950行 RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase(), TaskType.MAP);
21、归并后的数据写出到文件Writerwriter = new Writer (job, finalPartitionOut, keyClass, valClass, codec,spilledRecordsCounter); //1961行//归并也可以使用combiner,但是前提条件是设置了combiner,并且溢写次数大于等于3 if (combinerRunner == null || numSpills < minSpillsForCombine(3)) { Merger.writeFile(kvIter, writer, reporter, job);} else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector);}22、归并完成writer.close(); //1972行
23、写出索引文件spillRec.writeToFile(finalIndexFile, job); //1986行24、删除所有的溢写文件spill0.out spill1.out ... spill0.out,只保留最终的输出文件。for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true);}
"Shuffle流程是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
方法
文件
数据
定位
对象
缓冲区
缓冲
流程
排序
代码
大小
磁盘
输出
内存
情况
条件
索引
线程
过程
循环
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
久诚互联网络科技有限公司
高校网络安全建设专家有话说
jsp数据库怎么使用
分布式系统数据库的前景
网络安全比赛奖杯
QQ飞车手游可以转移服务器吗
网络安全主题草稿1000字
网络安全宣传周知识手册
嵌入式视频服务器
数据库无法打开远程连接
cli启动服务器配置
丰台区品质软件开发技术指导
广州教学软件开发
目标检测服务器
安徽网络技术分类市场价
做好网络安全需把握五个要点
方舟服务器满了
软件开发测试岗靠谱吗
软件开发没有规定开发内容
大学生视角下网络安全问题研究
计算机的网络安全是指
医院网络安全建设奖项
绝地求生连接不上服务器网络错误
个人软件开发收回
数据库日志的类型
么是软件开发模型
网络技术实训指南
墨攻网络安全平台
警察网络安全宣传周简报
中国对网络安全的太封闭