Hadoop2.6.0学习笔记(七)MapReduce分区
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,鲁春利的工作笔记,谁说程序员不能有文艺范?MapReduce中map task任务的数量是由spli分片决定,那么reduce task的数量由什么来确定的呢?就是这里要讨论的MapReduce分区。
千家信息网最后更新 2025年02月04日Hadoop2.6.0学习笔记(七)MapReduce分区
鲁春利的工作笔记,谁说程序员不能有文艺范?
MapReduce中map task任务的数量是由spli分片决定,那么reduce task的数量由什么来确定的呢?就是这里要讨论的MapReduce分区。默认情况下,MapReduce中使用的是HashPartitioner。
/** Partition keys by their {@link Object#hashCode()}. */public class HashPartitionerextends Partitioner { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }}
在HashPartitioner中getPartition()方法有三个形参,key、value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1。通过取key的hashCode,然后通过和Integer.MAX_VALUE与运算被转换为一个非负整数,任何整数与1相除的余数肯定是0。也就是说getPartition(…)方法的返回值总是0,也就是Mapper任务的输出总是送给一个Reducer任务,最终只能输出到一个文件中。
示例:对于通过不同协议访问某些url数据进行统计(日志五元组)
原始数据
[hadoop@nnode code]$ hdfs dfs -text /http_interceptor_20130913.txt2013-09-13 16:04:08 www.subnetc1.com 192.168.1.7 80 192.168.1.139 18863 FTP www.subnetc1.com/index.html2013-09-13 16:04:08 www.subnetc2.com 192.168.1.7 80 192.168.1.159 14100 HTTP www.subnetc2.com/index.html2013-09-13 16:04:08 www.subnetc3.com 192.168.1.7 80 192.168.1.130 4927 HTTPS www.subnetc3.com/index.html2013-09-13 16:04:08 www.subnetc4.com 192.168.1.7 80 192.168.1.154 39044 HTTP www.subnetc4.com/index.html[hadoop@nnode code]$
实现Mapper
package com.lucl.hadoop.mapreduce.part;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** * * @author luchunli * @description 实现Mapper * */public class ProtocolMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] values = value.toString().split("\t"); if (null == values || values.length != 8) { return; } Text newKey = new Text(); Text newValue = new Text(); newKey.set(values[6].trim()); newValue.set(values[7].trim()); context.write(newKey, newValue); }}
实现Reducer
package com.lucl.hadoop.mapreduce.part;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * * @author luchunli * @description 实现Reducer * */public class ProtocolReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { StringBuffer sbf = new StringBuffer(); for (Text text : values) { sbf.append(text.toString()); sbf.append(";"); } context.write(key, new Text(sbf.toString())); }}
实现Partitioner
package com.lucl.hadoop.mapreduce.part;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/** * * @author luchunli * @description 自定义分区类 * */public class ProtocolPartitioner extends Partitioner{ @Override public int getPartition(Text key, Text value, int numReduceTasks) { if (key.toString().equals("FTP")) { return 0; } if (key.toString().equals("HTTP")) { return 1; } if (key.toString().equals("HTTPS")) { return 2; } return 0; }}
实现驱动器类
package com.lucl.hadoop.mapreduce.part;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class ProtocolDriver extends Configured implements Tool { public static void main(String[] args) { try { ToolRunner.run(new ProtocolDriver(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); job.setJarByClass(ProtocolDriver.class); FileInputFormat.addInputPath(job, new Path(args[0])); job.setMapperClass(ProtocolMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置task reduce的个数 job.setNumReduceTasks(3); job.setPartitionerClass(ProtocolPartitioner.class); job.setReducerClass(ProtocolReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // job.setOutputFormatClass(ProtocolOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; }}
调用执行
[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /201512050001815/12/05 21:41:12 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/12/05 21:41:13 INFO input.FileInputFormat: Total input paths to process : 115/12/05 21:41:13 INFO mapreduce.JobSubmitter: number of splits:115/12/05 21:41:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_000815/12/05 21:41:13 INFO impl.YarnClientImpl: Submitted application application_1449302623953_000815/12/05 21:41:14 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0008/15/12/05 21:41:14 INFO mapreduce.Job: Running job: job_1449302623953_000815/12/05 21:41:43 INFO mapreduce.Job: Job job_1449302623953_0008 running in uber mode : false15/12/05 21:41:43 INFO mapreduce.Job: map 0% reduce 0/12/05 21:42:12 INFO mapreduce.Job: map 100% reduce 0/12/05 21:42:32 INFO mapreduce.Job: map 100% reduce 33/12/05 21:42:52 INFO mapreduce.Job: map 100% reduce 100/12/05 21:42:55 INFO mapreduce.Job: Job job_1449302623953_0008 completed successfully15/12/05 21:42:55 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=158 FILE: Number of bytes written=431827 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=532 HDFS: Number of bytes written=130 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=26277 Total time spent by all reduces in occupied slots (ms)=105054 Total time spent by all map tasks (ms)=26277 Total time spent by all reduce tasks (ms)=105054 Total vcore-seconds taken by all map tasks=26277 Total vcore-seconds taken by all reduce tasks=105054 Total megabyte-seconds taken by all map tasks=26907648 Total megabyte-seconds taken by all reduce tasks=107575296 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=132 Map output materialized bytes=158 Input split bytes=109 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=158 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=410 CPU time spent (ms)=4360 Physical memory (bytes) snapshot=515862528 Virtual memory (bytes) snapshot=3399213056 Total committed heap usage (bytes)=167907328 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=423 File Output Format Counters Bytes Written=130[hadoop@nnode code]$
查看结果
[hadoop@nnode code]$ hdfs dfs -ls /2015120500018Found 4 items-rw-r--r-- 2 hadoop hadoop 0 2015-12-05 21:42 /2015120500018/_SUCCESS-rw-r--r-- 2 hadoop hadoop 33 2015-12-05 21:42 /2015120500018/part-r-00000-rw-r--r-- 2 hadoop hadoop 62 2015-12-05 21:42 /2015120500018/part-r-00001-rw-r--r-- 2 hadoop hadoop 35 2015-12-05 21:42 /2015120500018/part-r-00002[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00000FTP www.subnetc1.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00001HTTP www.subnetc4.com/index.html;www.subnetc2.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00002HTTPS www.subnetc3.com/index.html;[hadoop@nnode code]$
上述生成的文件命名格式是MapReduce根据任务自动生成的,我们可以通过自定义OutputFormat来自定义输出文件的名称。
自定义的OutputFormat代码如下,这里和之前的MultipleWorkCount的区别在于本示例中直接通过FSDataOutputStream来实现,而不是之前调用LineRecordWriter的方式。
package com.lucl.hadoop.mapreduce.part;import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.util.HashMap;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/** * * @author luchunli * @description 自定义OutputFormat */public class ProtocolOutputFormat extends TextOutputFormat{ protected static class ProtocolRecordWriter extends RecordWriter { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected TaskAttemptContext context = null; protected HashMap recordStream = null; protected Path workPath = null; public ProtocolRecordWriter () {} public ProtocolRecordWriter (TaskAttemptContext context, Path workPath) { this.context = context; this.workPath = workPath; recordStream = new HashMap (); } @Override public void write(Text key, Text value) throws IOException, InterruptedException { boolean nullKey = key == null; boolean nullValue = value == null; if (nullKey && nullValue) { return; } DataOutputStream out = recordStream.get(key); if (null == out) { Path file = new Path(workPath, key + ".txt"); out = file.getFileSystem(this.context.getConfiguration()).create(file, false); recordStream.put(key, out); } if (!nullKey) { out.write(key.getBytes(), 0, key.getLength()); } if (!(nullKey || nullValue)) { out.write("\t".getBytes()); } if (!nullValue) { out.write(value.getBytes(), 0, value.getLength()); } out.write(newline); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { for (DataOutputStream out : recordStream.values()) { out.close(); } recordStream.clear(); recordStream = null; } } @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Path workPath = this.getTaskOutputPath(context); return new ProtocolRecordWriter(context, workPath); } private Path getTaskOutputPath(TaskAttemptContext context) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(context); if (committer instanceof FileOutputCommitter) { // Get the directory that the task should write results into. workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { // Get the {@link Path} to the output directory for the map-reduce job. // context.getConfiguration().get(FileOutputFormat.OUTDIR); Path outputPath = super.getOutputPath(context); if (null == outputPath) { throw new IOException("Undefined job output-path."); } workPath = outputPath; } return workPath; }}
再次运行
[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /201512050002015/12/05 21:59:28 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/12/05 21:59:30 INFO input.FileInputFormat: Total input paths to process : 115/12/05 21:59:30 INFO mapreduce.JobSubmitter: number of splits:115/12/05 21:59:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_001015/12/05 21:59:30 INFO impl.YarnClientImpl: Submitted application application_1449302623953_001015/12/05 21:59:31 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0010/15/12/05 21:59:31 INFO mapreduce.Job: Running job: job_1449302623953_001015/12/05 22:00:00 INFO mapreduce.Job: Job job_1449302623953_0010 running in uber mode : false15/12/05 22:00:00 INFO mapreduce.Job: map 0% reduce 0/12/05 22:00:29 INFO mapreduce.Job: map 100% reduce 0/12/05 22:00:48 INFO mapreduce.Job: map 100% reduce 33/12/05 22:01:07 INFO mapreduce.Job: map 100% reduce 100/12/05 22:01:07 INFO mapreduce.Job: Job job_1449302623953_0010 completed successfully15/12/05 22:01:07 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=158 FILE: Number of bytes written=432595 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=532 HDFS: Number of bytes written=130 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=26075 Total time spent by all reduces in occupied slots (ms)=92427 Total time spent by all map tasks (ms)=26075 Total time spent by all reduce tasks (ms)=92427 Total vcore-seconds taken by all map tasks=26075 Total vcore-seconds taken by all reduce tasks=92427 Total megabyte-seconds taken by all map tasks=26700800 Total megabyte-seconds taken by all reduce tasks=94645248 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=132 Map output materialized bytes=158 Input split bytes=109 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=158 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=339 CPU time spent (ms)=4690 Physical memory (bytes) snapshot=513667072 Virtual memory (bytes) snapshot=3405312000 Total committed heap usage (bytes)=167907328 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=423 File Output Format Counters Bytes Written=130[hadoop@nnode code]$
查看结果
[hadoop@nnode code]$ hdfs dfs -ls /2015120500020Found 4 items-rw-r--r-- 2 hadoop hadoop 33 2015-12-05 22:01 /2015120500020/FTP.txt-rw-r--r-- 2 hadoop hadoop 62 2015-12-05 22:00 /2015120500020/HTTP.txt-rw-r--r-- 2 hadoop hadoop 35 2015-12-05 22:01 /2015120500020/HTTPS.txt-rw-r--r-- 2 hadoop hadoop 0 2015-12-05 22:01 /2015120500020/_SUCCESS[hadoop@nnode code]$ hdfs dfs -text /2015120500020/FTP.txtFTP www.subnetc1.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTP.txtHTTP www.subnetc4.com/index.html;www.subnetc2.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTPS.txtHTTPS www.subnetc3.com/index.html;[hadoop@nnode code]$
任务
输出
数量
文件
也就是
数据
整数
方法
示例
结果
生成
笔记
不同
原始
三个
个数
也就是说
代码
余数
再次
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
转发软件开发商团队
人工智能对软件开发有影响吗
湖南城管通软件开发公司
部署php项目到阿里云服务器
软件开发简历怎么写的介绍
呼和浩特互联网科技学校文凭
网络安全应急处置范文
软件测试和网络安全
网络安全威胁范文
广东省网络技术比赛
服务器开ftp安全性
山西柠檬网络技术
qq服务器怎么上传资料
进存销软件开发
网络安全约谈记录会有影响吗
软件开发是数字媒体技术的范畴吗
网络安全主题调查
浙江通用软件开发市场价
sql数据库最大存储量
找不到有效组态数据库
数据库为什么会不同
车用材料技术数据库
云服务器2016和2019区别
数据库驱动文件未下载
北京服务器硬盘报价
抖音网络安全有你有我
配置maven服务器
四川ipfs云服务器云主机
网络安全变脸
十一国庆假期网络安全