【总结】Hadoop中的MultipleOutputs实践
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本例子采用hadoop1.1.2版本,附件中有例子的数据文件采用气象数据作为处理数据1、MultipleOutputs例子,具体解释在代码中有注释package StationPatitioner;i
千家信息网最后更新 2025年01月24日【总结】Hadoop中的MultipleOutputs实践
本例子采用hadoop1.1.2版本,附件中有例子的数据文件
采用气象数据作为处理数据
1、MultipleOutputs例子,具体解释在代码中有注释
package StationPatitioner;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.MultipleOutputs;import org.apache.hadoop.mapred.lib.NullOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * hadoop Version 1.1.2 * MultipleOutputs例子 * @author 巧克力黑 * */public class PatitionByStationUsingMultipleOutputs extends Configured implements Tool { enum Counter { LINESKIP, //出错的行 } static class StationMapper extends MapReduceBase implements Mapper{ private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { try { parser.parse(value); output.collect(new Text(parser.getStationid()), value); } catch (Exception e) { reporter.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1 } } } static class MultipleOutputReducer extends MapReduceBase implements Reducer { private MultipleOutputs multipleOutputs; @Override public void configure(JobConf jobconf) { multipleOutputs = new MultipleOutputs(jobconf);//初始化一个MultipleOutputs } @Override public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { //得到OutputCollector OutputCollector collector = multipleOutputs.getCollector("station", key.toString().replace("-", ""), reporter); while(values.hasNext()){ collector.collect(NullWritable.get(), values.next());//MultipleOutputs用OutputCollector输出数据 } } @Override public void close() throws IOException { multipleOutputs.close(); } } @Override public int run(String[] as) throws Exception { System.setProperty("HADOOP_USER_NAME", "root");//windows下用户与linux用户不一直,采用此方法避免报Permission相关错误 JobConf conf = new JobConf(); conf.setMapperClass(StationMapper.class); conf.setReducerClass(MultipleOutputReducer.class); conf.setMapOutputKeyClass(Text.class); conf.setOutputKeyClass(NullWritable.class); conf.setOutputFormat(NullOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path("hdfs://ubuntu:9000/sample1.txt"));//input路径 FileOutputFormat.setOutputPath(conf, new Path("hdfs://ubuntu:9000/temperature"));//output路径 MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, NullWritable.class, Text.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new PatitionByStationUsingMultipleOutputs(), args); System.exit(exitCode); } }
2、解析气象数据的类
package StationPatitioner;import org.apache.hadoop.io.Text;public class NcdcRecordParser { private static final int MISSING_TEMPERATURE = 9999; private String year; private int airTemperature; private String quality; private String stationid; public void parse(String record) { stationid = record.substring(0, 5); year = record.substring(15, 19); String airTemperatureString; // Remove leading plus sign as parseInt doesn't like them if (record.charAt(87) == '+') { airTemperatureString = record.substring(88, 92); } else { airTemperatureString = record.substring(87, 92); } airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93); } public String getStationid(){ return stationid; } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public String getYear() { return year; } public int getAirTemperature() { return airTemperature; }}
数据
例子
气象
用户
路径
代码
巧克力
文件
此方法
注释
版本
计数器
错误
附件
处理
解释
输出
实践
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
深圳软件开发收费报价表
长沙轶之隆网络技术有限公司
吉林市2020年网络安全宣传周
公安部网络安全守护行动
单招软件开发的专科生就业前景
服务器之间如何内网互联
2u服务器供应厂家
惠普服务器修改磁盘阵列
兴化环保网络技术保养
内乡客户管理软件开发
网络安全产品的国产化替代
怎么认定是软件开发服务
绘画网络安全展板设计
泰山服务器BIOS初始化硬盘
服务器管理器添加用户
地理数据库建库
宝德服务器好还是浪潮服务器好
阿里云服务器怎么进
阿里云服务器4核2g
麒麟10服务器安装
攻城过程中服务器维护
义乌哪有软件开发培训学校
网络数据库列表
日产逍客音乐找不到服务器
赴美软件开发工程师
nga 数据库
渗透测试网络安全公司排名
ipv6访问阿里云服务器
如何选服务器
服务器管理特别慢怎么办