千家信息网

Hadoop2.6.0学习笔记(七)MapReduce分区

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,鲁春利的工作笔记,谁说程序员不能有文艺范?MapReduce中map task任务的数量是由spli分片决定,那么reduce task的数量由什么来确定的呢?就是这里要讨论的MapReduce分区。
千家信息网最后更新 2025年02月04日Hadoop2.6.0学习笔记(七)MapReduce分区

鲁春利的工作笔记,谁说程序员不能有文艺范?



MapReduce中map task任务的数量是由spli分片决定,那么reduce task的数量由什么来确定的呢?就是这里要讨论的MapReduce分区。默认情况下,MapReduce中使用的是HashPartitioner。

/** Partition keys by their {@link Object#hashCode()}. */public class HashPartitioner extends Partitioner {  /** Use {@link Object#hashCode()} to partition. */  public int getPartition(K key, V value, int numReduceTasks) {    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;  }}

在HashPartitioner中getPartition()方法有三个形参,key、value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1。通过取key的hashCode,然后通过和Integer.MAX_VALUE与运算被转换为一个非负整数,任何整数与1相除的余数肯定是0。也就是说getPartition(…)方法的返回值总是0,也就是Mapper任务的输出总是送给一个Reducer任务,最终只能输出到一个文件中。


示例:对于通过不同协议访问某些url数据进行统计(日志五元组)

原始数据

[hadoop@nnode code]$ hdfs dfs -text /http_interceptor_20130913.txt2013-09-13 16:04:08     www.subnetc1.com        192.168.1.7     80      192.168.1.139   18863   FTP     www.subnetc1.com/index.html2013-09-13 16:04:08     www.subnetc2.com        192.168.1.7     80      192.168.1.159   14100   HTTP    www.subnetc2.com/index.html2013-09-13 16:04:08     www.subnetc3.com        192.168.1.7     80      192.168.1.130   4927    HTTPS   www.subnetc3.com/index.html2013-09-13 16:04:08     www.subnetc4.com        192.168.1.7     80      192.168.1.154   39044   HTTP    www.subnetc4.com/index.html[hadoop@nnode code]$


实现Mapper

package com.lucl.hadoop.mapreduce.part;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** *  * @author luchunli * @description 实现Mapper * */public class ProtocolMapper extends Mapper {    @Override    protected void map(LongWritable key, Text value, Context context)            throws IOException, InterruptedException {        String [] values = value.toString().split("\t");        if (null == values || values.length != 8) {            return;        }        Text newKey = new Text();        Text newValue = new Text();        newKey.set(values[6].trim());        newValue.set(values[7].trim());                context.write(newKey, newValue);    }}


实现Reducer

package com.lucl.hadoop.mapreduce.part;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** *  * @author luchunli * @description 实现Reducer * */public class ProtocolReducer extends Reducer {    @Override    protected void reduce(Text key, Iterable values, Context context)            throws IOException, InterruptedException {        StringBuffer sbf = new StringBuffer();        for (Text text : values) {            sbf.append(text.toString());            sbf.append(";");        }        context.write(key, new Text(sbf.toString()));    }}


实现Partitioner

package com.lucl.hadoop.mapreduce.part;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/** *  * @author luchunli * @description 自定义分区类 * */public class ProtocolPartitioner extends Partitioner {    @Override    public int getPartition(Text key, Text value, int numReduceTasks) {        if (key.toString().equals("FTP")) {            return 0;        }         if (key.toString().equals("HTTP")) {            return 1;        }        if (key.toString().equals("HTTPS")) {            return 2;        }        return 0;    }}


实现驱动器类

package com.lucl.hadoop.mapreduce.part;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class ProtocolDriver extends Configured implements Tool {    public static void main(String[] args) {        try {            ToolRunner.run(new ProtocolDriver(), args);        } catch (Exception e) {            e.printStackTrace();        }    }        @Override    public int run(String[] args) throws Exception {        Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());                job.setJarByClass(ProtocolDriver.class);                FileInputFormat.addInputPath(job, new Path(args[0]));                job.setMapperClass(ProtocolMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);                // 设置task reduce的个数        job.setNumReduceTasks(3);        job.setPartitionerClass(ProtocolPartitioner.class);                job.setReducerClass(ProtocolReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);                // job.setOutputFormatClass(ProtocolOutputFormat.class);                FileOutputFormat.setOutputPath(job, new Path(args[1]));                return job.waitForCompletion(true) ? 0 : 1;    }}


调用执行

[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /201512050001815/12/05 21:41:12 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/12/05 21:41:13 INFO input.FileInputFormat: Total input paths to process : 115/12/05 21:41:13 INFO mapreduce.JobSubmitter: number of splits:115/12/05 21:41:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_000815/12/05 21:41:13 INFO impl.YarnClientImpl: Submitted application application_1449302623953_000815/12/05 21:41:14 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0008/15/12/05 21:41:14 INFO mapreduce.Job: Running job: job_1449302623953_000815/12/05 21:41:43 INFO mapreduce.Job: Job job_1449302623953_0008 running in uber mode : false15/12/05 21:41:43 INFO mapreduce.Job:  map 0% reduce 0/12/05 21:42:12 INFO mapreduce.Job:  map 100% reduce 0/12/05 21:42:32 INFO mapreduce.Job:  map 100% reduce 33/12/05 21:42:52 INFO mapreduce.Job:  map 100% reduce 100/12/05 21:42:55 INFO mapreduce.Job: Job job_1449302623953_0008 completed successfully15/12/05 21:42:55 INFO mapreduce.Job: Counters: 50        File System Counters                FILE: Number of bytes read=158                FILE: Number of bytes written=431827                FILE: Number of read operations=0                FILE: Number of large read operations=0                FILE: Number of write operations=0                HDFS: Number of bytes read=532                HDFS: Number of bytes written=130                HDFS: Number of read operations=12                HDFS: Number of large read operations=0                HDFS: Number of write operations=6        Job Counters                 Killed reduce tasks=1                Launched map tasks=1                Launched reduce tasks=4                Data-local map tasks=1                Total time spent by all maps in occupied slots (ms)=26277                Total time spent by all reduces in occupied slots (ms)=105054                Total time spent by all map tasks (ms)=26277                Total time spent by all reduce tasks (ms)=105054                Total vcore-seconds taken by all map tasks=26277                Total vcore-seconds taken by all reduce tasks=105054                Total megabyte-seconds taken by all map tasks=26907648                Total megabyte-seconds taken by all reduce tasks=107575296        Map-Reduce Framework                Map input records=4                Map output records=4                Map output bytes=132                Map output materialized bytes=158                Input split bytes=109                Combine input records=0                Combine output records=0                Reduce input groups=3                Reduce shuffle bytes=158                Reduce input records=4                Reduce output records=3                Spilled Records=8                Shuffled Maps =3                Failed Shuffles=0                Merged Map outputs=3                GC time elapsed (ms)=410                CPU time spent (ms)=4360                Physical memory (bytes) snapshot=515862528                Virtual memory (bytes) snapshot=3399213056                Total committed heap usage (bytes)=167907328        Shuffle Errors                BAD_ID=0                CONNECTION=0                IO_ERROR=0                WRONG_LENGTH=0                WRONG_MAP=0                WRONG_REDUCE=0        File Input Format Counters                 Bytes Read=423        File Output Format Counters                 Bytes Written=130[hadoop@nnode code]$


查看结果

[hadoop@nnode code]$ hdfs dfs -ls /2015120500018Found 4 items-rw-r--r--   2 hadoop hadoop          0 2015-12-05 21:42 /2015120500018/_SUCCESS-rw-r--r--   2 hadoop hadoop         33 2015-12-05 21:42 /2015120500018/part-r-00000-rw-r--r--   2 hadoop hadoop         62 2015-12-05 21:42 /2015120500018/part-r-00001-rw-r--r--   2 hadoop hadoop         35 2015-12-05 21:42 /2015120500018/part-r-00002[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00000FTP     www.subnetc1.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00001HTTP    www.subnetc4.com/index.html;www.subnetc2.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00002HTTPS   www.subnetc3.com/index.html;[hadoop@nnode code]$


上述生成的文件命名格式是MapReduce根据任务自动生成的,我们可以通过自定义OutputFormat来自定义输出文件的名称。


自定义的OutputFormat代码如下,这里和之前的MultipleWorkCount的区别在于本示例中直接通过FSDataOutputStream来实现,而不是之前调用LineRecordWriter的方式。

package com.lucl.hadoop.mapreduce.part;import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.util.HashMap;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/** *  * @author luchunli * @description 自定义OutputFormat */public class ProtocolOutputFormat extends TextOutputFormat {    protected static class ProtocolRecordWriter extends RecordWriter {        private static final String utf8 = "UTF-8";        private static final byte[] newline;        static {          try {            newline = "\n".getBytes(utf8);          } catch (UnsupportedEncodingException uee) {            throw new IllegalArgumentException("can't find " + utf8 + " encoding");          }        }                protected TaskAttemptContext context = null;                protected HashMap recordStream = null;        protected Path workPath = null;                public ProtocolRecordWriter () {}                public ProtocolRecordWriter (TaskAttemptContext context, Path workPath) {            this.context = context;            this.workPath = workPath;            recordStream = new HashMap();        }        @Override        public void write(Text key, Text value) throws IOException, InterruptedException {              boolean nullKey = key == null;              boolean nullValue = value == null;              if (nullKey && nullValue) {                return;              }              DataOutputStream out = recordStream.get(key);              if (null == out) {                  Path file = new Path(workPath, key + ".txt");                  out = file.getFileSystem(this.context.getConfiguration()).create(file, false);                  recordStream.put(key, out);              }              if (!nullKey) {                 out.write(key.getBytes(), 0, key.getLength());              }              if (!(nullKey || nullValue)) {                out.write("\t".getBytes());              }              if (!nullValue) {                 out.write(value.getBytes(), 0, value.getLength());              }              out.write(newline);        }        @Override        public void close(TaskAttemptContext context) throws IOException,                InterruptedException {            for (DataOutputStream out : recordStream.values()) {                out.close();            }            recordStream.clear();            recordStream = null;        }    }         @Override    public RecordWriter getRecordWriter(TaskAttemptContext context)            throws IOException, InterruptedException {        Path workPath = this.getTaskOutputPath(context);        return new ProtocolRecordWriter(context, workPath);    }        private Path getTaskOutputPath(TaskAttemptContext context) throws IOException {        Path workPath = null;        OutputCommitter committer = super.getOutputCommitter(context);                if (committer instanceof FileOutputCommitter) {            // Get the directory that the task should write results into.            workPath = ((FileOutputCommitter) committer).getWorkPath();        } else {            // Get the {@link Path} to the output directory for the map-reduce job.            // context.getConfiguration().get(FileOutputFormat.OUTDIR);            Path outputPath = super.getOutputPath(context);            if (null == outputPath) {                throw new IOException("Undefined job output-path.");            }            workPath = outputPath;        }                return workPath;    }}


再次运行

[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /201512050002015/12/05 21:59:28 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/12/05 21:59:30 INFO input.FileInputFormat: Total input paths to process : 115/12/05 21:59:30 INFO mapreduce.JobSubmitter: number of splits:115/12/05 21:59:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_001015/12/05 21:59:30 INFO impl.YarnClientImpl: Submitted application application_1449302623953_001015/12/05 21:59:31 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0010/15/12/05 21:59:31 INFO mapreduce.Job: Running job: job_1449302623953_001015/12/05 22:00:00 INFO mapreduce.Job: Job job_1449302623953_0010 running in uber mode : false15/12/05 22:00:00 INFO mapreduce.Job:  map 0% reduce 0/12/05 22:00:29 INFO mapreduce.Job:  map 100% reduce 0/12/05 22:00:48 INFO mapreduce.Job:  map 100% reduce 33/12/05 22:01:07 INFO mapreduce.Job:  map 100% reduce 100/12/05 22:01:07 INFO mapreduce.Job: Job job_1449302623953_0010 completed successfully15/12/05 22:01:07 INFO mapreduce.Job: Counters: 50        File System Counters                FILE: Number of bytes read=158                FILE: Number of bytes written=432595                FILE: Number of read operations=0                FILE: Number of large read operations=0                FILE: Number of write operations=0                HDFS: Number of bytes read=532                HDFS: Number of bytes written=130                HDFS: Number of read operations=12                HDFS: Number of large read operations=0                HDFS: Number of write operations=6        Job Counters                 Killed reduce tasks=1                Launched map tasks=1                Launched reduce tasks=4                Data-local map tasks=1                Total time spent by all maps in occupied slots (ms)=26075                Total time spent by all reduces in occupied slots (ms)=92427                Total time spent by all map tasks (ms)=26075                Total time spent by all reduce tasks (ms)=92427                Total vcore-seconds taken by all map tasks=26075                Total vcore-seconds taken by all reduce tasks=92427                Total megabyte-seconds taken by all map tasks=26700800                Total megabyte-seconds taken by all reduce tasks=94645248        Map-Reduce Framework                Map input records=4                Map output records=4                Map output bytes=132                Map output materialized bytes=158                Input split bytes=109                Combine input records=0                Combine output records=0                Reduce input groups=3                Reduce shuffle bytes=158                Reduce input records=4                Reduce output records=3                Spilled Records=8                Shuffled Maps =3                Failed Shuffles=0                Merged Map outputs=3                GC time elapsed (ms)=339                CPU time spent (ms)=4690                Physical memory (bytes) snapshot=513667072                Virtual memory (bytes) snapshot=3405312000                Total committed heap usage (bytes)=167907328        Shuffle Errors                BAD_ID=0                CONNECTION=0                IO_ERROR=0                WRONG_LENGTH=0                WRONG_MAP=0                WRONG_REDUCE=0        File Input Format Counters                 Bytes Read=423        File Output Format Counters                 Bytes Written=130[hadoop@nnode code]$


查看结果

[hadoop@nnode code]$ hdfs dfs -ls /2015120500020Found 4 items-rw-r--r--   2 hadoop hadoop         33 2015-12-05 22:01 /2015120500020/FTP.txt-rw-r--r--   2 hadoop hadoop         62 2015-12-05 22:00 /2015120500020/HTTP.txt-rw-r--r--   2 hadoop hadoop         35 2015-12-05 22:01 /2015120500020/HTTPS.txt-rw-r--r--   2 hadoop hadoop          0 2015-12-05 22:01 /2015120500020/_SUCCESS[hadoop@nnode code]$ hdfs dfs -text /2015120500020/FTP.txtFTP     www.subnetc1.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTP.txtHTTP    www.subnetc4.com/index.html;www.subnetc2.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTPS.txtHTTPS   www.subnetc3.com/index.html;[hadoop@nnode code]$


0