MapTask流程是怎样的
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章主要讲解了"MapTask流程是怎样的",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"MapTask流程是怎样的"吧!MapTask流程源码解读
千家信息网最后更新 2025年02月04日MapTask流程是怎样的
这篇文章主要讲解了"MapTask流程是怎样的",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"MapTask流程是怎样的"吧!
MapTask流程源码解读
1、从job提交流程的24步,开始mapTask的流程分析,进入submitJob --LocalJobRunner.java中的788行Job job = new Job(JobID.downgrade(jobid), jobSubmitDir); //创建一个可以真正执行的Job该Job: LocalJobRunner$Job , 且是一个线程 $表示内部类
2、因为当前的Job对象是一个线程,所有执行线程要执行run方法,因此直接找到 LocalJobRunner的run方法进行查看 --定位到537行TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);//读取切片的metainfo信息,即提交job过程中在临时目录中生成的job.splitmetainfo文件
3、向下走断点,定位到下方代码 --547行ListmapRunnables = getMapTaskRunnables( taskSplitMetaInfos, jobId, mapOutputFiles);//根据切片的metainfo信息,可以得出有多少个切片,再生成对应个数的Runnable对象.每个Runnable对象对应一个线程,每一个MapTask运行在一个线程中(基于本地模式的分析)Runnable : LocalJobRunnber$Job$MapTaskRunnable ---联想到线程
4、ExecutorService mapService = createMapExecutor(); //创建线程池对象runTasks(mapRunnables, mapService, "map");// 将所有的LocalJobRunnber$Job$MapTaskRunnable对象提交给线程池执行,进入到runTasks方法内部。 --LocalJobRunner中的466行
5、//每个线程负责一个Runnable执行,定位到每个Runnable内部的run方法,查看具体执行(以内部类的方式嵌套)for (Runnable r : runnables) { service.submit(r); }LocalJobRunnber$Job$MapTaskRunnable交给每个线程执行时,会执行到 LocalJobRunnber$Job$MapTaskRunnable的run方法,因此接下来看LocalJobRunnber$Job$MapTaskRunnable的run方法 --LocalJobRunner中的248行
6、进入到run方法内部,定位到254行MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId, info.getSplitIndex(), 1); //创建MapTask对象 --在每一个线程中都会执行,会创建一个mapTask对象
7、进入map.run(localConf, Job.this); --271行 //执行MapTask的run方法,关联到MapTask方法中的run
进入到MapTask的run方法内首先进行分区设置partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner () { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; }8、定位到MapTask中run方法的347行,并进入runNewMapper()方法,提前判断下是否使用新的api进入runNewMapper()方法,定位到MapTask的745行开始读源码
9、--反射的方式创建Mapper对象. 例如: WordCountMapper org.apache.hadoop.mapreduce.Mappermapper = (org.apache.hadoop.mapreduce.Mapper ) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); --反射的方式创建Inputformat对象, 例如: TextInputFormat(默认) org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat ) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); --获取当前MapTask所负责的切片信息 org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); --获取RecordReader对象 org.apache.hadoop.mapreduce.RecordReader input = new NewTrackingRecordReader (split, inputFormat, reporter, taskContext);
10、向下读取,定位到MapTask的782行 output = new NewOutputCollector(taskContext, job, umbilical, reporter);方法进入
11、定位到MapTask的710行 collector = createSortingCollector(job, reporter); //收集器对象,可以理解为缓冲区对象12、进入到createSortingCollector方法, --MapTask中的388行13、collector.init(context); --初始化缓冲区对象 collector: MapTask$MapOutputBuffer14、进入到init方法中 --MapTask的968行
15、①:定位到init方法的980行--//获取溢写百分比 80%,通过mapreduce.map.sort.spill.percent参数来配置 final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);--//获取缓冲区大小 100M, 通过 mapreduce.task.io.sort.mb 参数来配置 final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB, MRJobConfig.DEFAULT_IO_SORT_MB);--//获取排序对象 QuickSort.class, 只排索引sorter = ReflectionUtils.newInstance(job.getClass( MRJobConfig.MAP_SORT_CLASS, QuickSort.class, IndexedSorter.class), job);--//获取key的比较器对象 comparator = job.getOutputKeyComparator();--//获取key的序列化对象 k/v serialization 获取kv的序列化对象--//获取计数器对象 output counters--//compression 获取编解码器,进行压缩操作--//combiner 获取Combiner对象,在溢写及归并可以使用combiner--//spillThread.start(); 启动溢写线程 ,只有达到溢写百分比才会发生溢写操作
16、mapper.run(mapperContext);执行到Mapper对象中的run方法,例如WordCountMapper中的run方法进入到mapper.run()方法内执行 setup(context); --143行执行 map(context.getCurrentKey(), context.getCurrentValue(), context); --146行,进入到wordCount中的map()方法,是一个循环执行的过程context.wirte(outK,outV);将map方法中处理好的kv写出执行cleanup(context);
感谢各位的阅读,以上就是"MapTask流程是怎样的"的内容了,经过本文的学习后,相信大家对MapTask流程是怎样的这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
方法
对象
线程
定位
流程
信息
方式
缓冲区
学习
缓冲
内容
参数
序列
源码
百分
过程
分析
反射
配置
个数
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库表物理设计是什么意思
报送人行金融信息基础数据库
国家网络安全永久会址
erp网页版 服务器 iis
mac pro数据库
tp数据库查询模型封装
语音对讲软件开发优势
山东哪里有存储服务器供应商
数据库应用技术 第二版
用户管理数据库设计
100万人数据库
数据库字段默认值
sql建立数据库附带编码
服务器空间最大的手机
数据库中修改表中字段的类型
ibm服务器3650管理口
南昌警示教育展馆软件开发
徐州软件开发工资高吗
国开大专计算机网络技术专业
电子政务网络安全管理自查
赤峰做app的软件开发多少钱
上海华众互联网科技有限公司
敏捷软件开发基于
修改数据库字段输入长度
新罗县网络安全
网络安全是否有前途
vue搭服务器
视频处理软件开发方法
我的世界外国服务器加速器
购买软件开发服务费怎么入账