千家信息网

Hadoop中JOB怎么实现提交任务

发表于:2024-11-21 作者:千家信息网编辑
千家信息网最后更新 2024年11月21日,本篇内容介绍了"Hadoop中JOB怎么实现提交任务"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1
千家信息网最后更新 2024年11月21日Hadoop中JOB怎么实现提交任务

本篇内容介绍了"Hadoop中JOB怎么实现提交任务"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1.MapReduce的简单概念

百度百科:MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。至于什么是函数式编程语言和矢量编程语言,自己也搞得不太清楚,见解释链接:

http://www.cnblogs.com/kym/archive/2011/03/07/1976519.html.

自己的理解:MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.当你向MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map 任务,然后分配到不同的节点上去执行,每一个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个Map 的输出汇总到一起并输出.就是说HDFS已经为我们提供了高性能、高并发的服务,但是并行编程可不是所有程序员都玩得转的活儿,如果我们的应用本身不能并发,那Hadoop的HDFS也都是没有意义的。MapReduce的伟大之处就在于让不熟悉并行编程的程序员(比如像我这的)也能充分发挥分布式系统的威力。这里说明以下:Hadoop本身这个框架就是洋人基于洋人公司谷歌的三大论文GFS,BigTable,MapReduce(编程模型),用Java语言实现的框架.谷歌它就用的C++实现,而MapReduce编程模型(是高度抽象的)大体离不开下面这张图.Spark并行运算框架(和Hadoop的MapReduce)的不同点:在于它将中间结果即map函数结果直接放入内存中,而不是放入本地磁盘的HDFS中.这些都不是重点,重点是下面图的流程:

上图是论文里给出的流程图。一切都是从最上方的user program开始的,user program链接了MapReduce库,实现了最基本的Map函数和Reduce函数。图中执行的顺序都用数字标记了。1.MapReduce库先把user program的输入文件划分为M份(M为用户定义),每一份通常有16MB到64MB,如图左方所示分成了split0~4;然后使用fork将用户进程拷贝到集群内其它机器上。2.user program的副本中有一个称为master,其余称为worker,master是负责调度的,为空闲worker分配作业(Map作业3或者Reduce作业),worker的数量也是可以由用户指定的。3.被分配了Map作业的worker,开始读取对应分片的输入数据,Map作业数量是由M决定的,和split一一对应;Map作业从输入数据中抽取出键值对,每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。4.缓存的中间键值对会被定期写入本地磁盘,而且被分为R个区,R的大小是由用户定义的,将来每个区会对应一个Reduce作业;这些中间键值对的位置会被通报给master,master负责将信息转发给Reduce worker。5.master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map作业产生的中间键值对都可能映射到所有R个不同分区),当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个Reduce作业(谁让分区少呢),所以排序是必须的。6.reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出会添加到这个分区的输出文件中。7.当所有的Map和Reduce作业都完成了,master唤醒正版的user program,MapReduce函数调用返回user program的代码所有执行完毕后,MapReduce输出放在了R个分区的输出文件中(分别对应一个Reduce作业)。用户通常并不需要合并这R个文件,而是将其作为输入交给另一个MapReduce程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS)的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件系统(GFS)的。而且我们要注意Map/Reduce作业和map/reduce函数的区别:Map作业处理一个输入数据的分片,可能需要调用多次map函数来处理每个输入键值对;Reduce作业处理一个分区的中间键值对,期间要对每个不同的键调用一次reduce函数,Reduce作业最终也对应一个输出文件。

至于下面一张图Hadoop MapReduce(彩色的)的模型实现则如下图(当然这也不是我画的,只是大自然的搬运工):

(input)  -> map ->  -> combine ->  -> reduce ->  (output)

参考链接:怎样向妻子解释MapReduce.

2.Hadoop1.x中的MapReduce

在Hadoop里面的MapReduce的是有存在两个不同的时期.刚开始的Hadoop中的MapReduce实现是做到很多的事情,而该框架的核心Job Tracker(作业跟踪者)则是既当爹又当妈的意思.看下图:

原 MapReduce 程序的流程及设计思路:1.首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败、重启等操作。2.TaskTracker 是 Map-reduce 集群中每台机器都有的一个部分,他做的事情主要是监视自己所在机器的资源情况。3.TaskTracker 同时监视当前机器的 tasks 运行状况。TaskTracker 需要把这些信息通过 heartbeat发送给JobTracker,JobTracker 会搜集这些信息以给新提交的 job 分配运行在哪些机器上。

既然出现Hadoop2改进它,那它就有一些问题咯。主要的问题如下:

1.JobTracker 是 Map-reduce 的集中处理点,存在单点故障。2.JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。3.在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。4.在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是前面提过的集群资源利用的问题。源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。5.从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。

3.Hadoop2.x中新方案YARN+MapReduce

首先的不要被YARN给迷惑住了,它只是负责资源调度管理,而MapReduce才是负责运算的家伙,所以YARN != MapReduce2.这是大师说的:

YARN并不是下一代MapReduce(MRv2),下一代MapReduce与第一代MapReduce(MRv1)在编程接口、数据处理引擎(MapTask和ReduceTask)是完全一样的, 可认为MRv2重用了MRv1的这些模块,不同的是资源管理和作业管理系统,MRv1中资源管理和作业管理均是由JobTracker实现的,集两个功能于一身,而在MRv2中,将这两部分分开了, 其中,作业管理由ApplicationMaster实现,而资源管理由新增系统YARN完成,由于YARN具有通用性,因此YARN也可以作为其他计算框架的资源管理系统,不仅限于MapReduce,也是其他计算框架(Spark).

看上图我们可以知道Hadoop1中mapreduce可以说是啥事都干,而Hadoop2中的MapReduce的话则是专门处理数据分析.而YARN的话则做为资源管理器存在.

有了YARN之后,官网上这么说Apache Hadoop NextGen MapReduce (YARN).它的架构图如下:

在Hadoop2中将JobTracker两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度/监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。一个应用程序无非是一个单独的传统的 MapReduce 任务或者是一个 DAG( 有向无环图 ) 任务。ResourceManager 和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织。1.事实上,每一个应用的ApplicationMaster是一个详细的框架库,它结合从ResourceManager获得的资源和 NodeManagr 协同工作来运行和监控任务。2.在上图中ResourceManager支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,它在执行过程中不对应用进行监控和状态跟踪。同样,它也不能重启因应用失败或者硬件错误而运行失败的任务。ResourceManager 是基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源包括:内存,CPU,磁盘,网络等等。可以看出,这同现 Mapreduce 固定类型的资源使用模型有显著区别,它给集群的使用带来负面的影响。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。3.在上图中 NodeManager 是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。4.在上图中,每一个应用的 ApplicationMaster的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。

再次总结,在Hadoop2集群里,一个客户端提交任务的一整套的流程图:

1.客户端的mapreduce程序通过hadoop shell提交到hadoop的集群中.2.程序会通过RPC通信将打成jar包的程序的有关信息传递给Hadoop集群中RM(ResourceManager),可称为领取JOBID的过程3.RM更加提交上来的信息给任务分配一个唯一的ID,同时会将run.jar的在HDFS上的存储路径发送给客户端.4.客户端得到那个存储路径之后,会相应的拼接出最终的存放路径目录,然后将run.jar分多份存储在HDFS目录中,默认情况下备份数量为10份.可配置.5.客户端提交一些配置信息,例如:最终存储路径,JOB ID等.6.RM会将这些配置信息放入一个队列当中,所谓的调度器.至于调度的算法,则不必深究.7.NM(NodeManager)和RM是通过心跳机制保持着通信的,NM会定期的向RM去领取任务.8.RM会在任意的一台或多台的NM中,启动任务监控的进程Application Master.用来监控其他NM中YARN CHild的执行的情况9.NM在领取到任务之后,得到信息,会去HDFS的下载run.jar.然后在本地的机器上启动YARN Child进程来执行map或者reduce函数.map函数的处理之后的中间结果数据会放在本地文件系统中的.10.在结束程序之后,将结果数据写会HDFS中.整个流程大概就是这样子的.

4.YARN出现的意义----引用

随着 YARN 的出现,您不再受到更简单的 MapReduce 开发模式约束,而是可以创建更复杂的分布式应用程序。实际上,您可以将 MapReduce 模型视为 YARN 架构可运行的一些应用程序中的其中一个,只是为自定义开发公开了基础框架的更多功能。这种能力非常强大,因为 YARN 的使用模型几乎没有限制,不再需要与一个集群上可能存在的其他更复杂的分布式应用程序框架相隔离,就像 MRv1 一样。甚至可以说,随着 YARN 变得更加健全,它有能力取代其他一些分布式处理框架,从而完全消除了专用于其他框架的资源开销,同时还简化了整个系统。

为了演示 YARN 相对于 MRv1 的效率提升,可考虑蛮力测试旧版本的 LAN Manager Hash 的并行问题,这是旧版 Windows® 用于密码散列运算的典型方法。在此场景中,MapReduce 方法没有多大意义,因为 Mapping/Reducing 阶段涉及到太多开销。相反,更合理的方法是抽象化作业分配,以便每个容器拥有密码搜索空间的一部分,在其之上进行枚举,并通知您是否找到了正确的密码。这里的重点是,密码将通过一个函数 来动态确定(这确实有点棘手),而不需要将所有可能性映射到一个数据结构中,这就使得 MapReduce 风格显得不必要且不实用。

归结而言,MRv1 框架下的问题仅是需要一个关联数组,而且这些问题有专门朝大数据操作方向演变的倾向。但是,问题一定不会永远仅局限于此范式中,因为您现在可以更为简单地将它们抽象化,编写自定义客户端、应用程序主程序,以及符合任何您想要的设计的应用程序。

5.编写简单MapReduce Yarn的应用程序

我们直接拿Apache Hadoop官网中的wordcount的例子来说明MapReduce程序的编写.

Source Code

import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {  //编写自己的Mapper,需要继承org.apache.hadoop.mapreduce.Mapper  public static class TokenizerMapper       extends Mapper{   //输入的的类型,输出的    //作为类中成员变量    private final static IntWritable one = new IntWritable(1);    private Text word = new Text();     //key : offset     偏移量,几乎可以忽略     //value : one line string    一行的数据     //context : the context of computer  计算的上下文    public void map(Object key, Text value, Context context                    ) throws IOException, InterruptedException {      StringTokenizer itr = new StringTokenizer(value.toString());      while (itr.hasMoreTokens()) {        word.set(itr.nextToken());        context.write(word, one);      }    }  }  //编写自己的Reducer,需要继承org.apache.hadoop.mapreduce.Reducer  public static class IntSumReducer       extends Reducer {    private IntWritable result = new IntWritable();    public void reduce(Text key, Iterable values,                       Context context                       ) throws IOException, InterruptedException {      int sum = 0;      for (IntWritableval : values) {        sum += val.get();      }      result.set(sum);      context.write(key, result);    }  }  //主函数开始运行JOB  public static void main(String[] args) throws Exception {    Configuration conf = new Configuration();    Job job = Job.getInstance(conf, "word count");    job.setJarByClass(WordCount.class);    job.setMapperClass(TokenizerMapper.class);    job.setCombinerClass(IntSumReducer.class);    job.setReducerClass(IntSumReducer.class);    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(IntWritable.class);    FileInputFormat.addInputPath(job, new Path(args[0]));    FileOutputFormat.setOutputPath(job, new Path(args[1]));    System.exit(job.waitForCompletion(true) ? 0 : 1);     //提交JOB成功,退出JVM虚拟机  }}

6.Hadoop2.0中提交Job的源码分析

----------------------------至此,与服务器RM的通信已建立.---------

-----------------------接下来的话,就是提交job任务了.

"Hadoop中JOB怎么实现提交任务"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0