千家信息网

【总结】Hadoop中的MultipleOutputs实践

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本例子采用hadoop1.1.2版本,附件中有例子的数据文件采用气象数据作为处理数据1、MultipleOutputs例子,具体解释在代码中有注释package StationPatitioner;i
千家信息网最后更新 2025年01月24日【总结】Hadoop中的MultipleOutputs实践

本例子采用hadoop1.1.2版本,附件中有例子的数据文件

采用气象数据作为处理数据


1、MultipleOutputs例子,具体解释在代码中有注释

package StationPatitioner;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.MultipleOutputs;import org.apache.hadoop.mapred.lib.NullOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * hadoop Version 1.1.2 * MultipleOutputs例子 * @author 巧克力黑 * */public class PatitionByStationUsingMultipleOutputs extends Configured implements Tool {        enum Counter         {                LINESKIP,       //出错的行        }        static class StationMapper extends MapReduceBase implements Mapper{                private NcdcRecordParser parser = new NcdcRecordParser();                @Override                public void map(LongWritable key, Text value,                                OutputCollector output, Reporter reporter)                                throws IOException {                        try {                                parser.parse(value);                                output.collect(new Text(parser.getStationid()), value);                        } catch (Exception e) {                                reporter.getCounter(Counter.LINESKIP).increment(1);     //出错令计数器+1                        }                                        }        }        static class MultipleOutputReducer extends MapReduceBase implements Reducer{                private MultipleOutputs multipleOutputs;                @Override                public void configure(JobConf jobconf) {                        multipleOutputs = new MultipleOutputs(jobconf);//初始化一个MultipleOutputs                }                                @Override                public void reduce(Text key, Iterator values,                                OutputCollector output, Reporter reporter)                                throws IOException {                        //得到OutputCollector                        OutputCollector collector = multipleOutputs.getCollector("station", key.toString().replace("-", ""), reporter);                        while(values.hasNext()){                                collector.collect(NullWritable.get(), values.next());//MultipleOutputs用OutputCollector输出数据                        }                }                                @Override                public void close() throws IOException {                        multipleOutputs.close();                }        }                @Override        public int run(String[] as) throws Exception {                System.setProperty("HADOOP_USER_NAME", "root");//windows下用户与linux用户不一直,采用此方法避免报Permission相关错误                JobConf conf = new JobConf();                                conf.setMapperClass(StationMapper.class);                conf.setReducerClass(MultipleOutputReducer.class);                conf.setMapOutputKeyClass(Text.class);                conf.setOutputKeyClass(NullWritable.class);                conf.setOutputFormat(NullOutputFormat.class);            FileInputFormat.setInputPaths(conf, new Path("hdfs://ubuntu:9000/sample1.txt"));//input路径            FileOutputFormat.setOutputPath(conf, new Path("hdfs://ubuntu:9000/temperature"));//output路径                                MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, NullWritable.class, Text.class);                                JobClient.runJob(conf);                return 0;        }                public static void main(String[] args) throws Exception{                int exitCode = ToolRunner.run(new PatitionByStationUsingMultipleOutputs(), args);                System.exit(exitCode);        }        }


2、解析气象数据的类

package StationPatitioner;import org.apache.hadoop.io.Text;public class NcdcRecordParser {        private static final int MISSING_TEMPERATURE = 9999;        private String year;        private int airTemperature;        private String quality;        private String stationid;        public void parse(String record) {                stationid = record.substring(0, 5);                year = record.substring(15, 19);                String airTemperatureString;                // Remove leading plus sign as parseInt doesn't like them                if (record.charAt(87) == '+') {                        airTemperatureString = record.substring(88, 92);                } else {                        airTemperatureString = record.substring(87, 92);                }                airTemperature = Integer.parseInt(airTemperatureString);                quality = record.substring(92, 93);        }                public String getStationid(){                return stationid;        }        public void parse(Text record) {                parse(record.toString());        }        public boolean isValidTemperature() {                return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");        }        public String getYear() {                return year;        }        public int getAirTemperature() {                return airTemperature;        }}


0