hadoop如何自定义格式化输出
发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。import java.io.IOException;import
千家信息网最后更新 2024年12月12日hadoop如何自定义格式化输出
这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
import java.io.IOException;import java.net.URI;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CustomizeOutputFormat { static final Log LOG = LogFactory.getLog(CustomizeOutputFormat.class); public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(CustomizeOutputFormat.class); job.setMapperClass(CustMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //此处这只自定义的格式化输出 job.setOutputFormatClass(CustOutputFormat.class); String jobName = "Customize outputformat test!"; job.setJobName(jobName); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean b = job.waitForCompletion(true); if(b) { LOG.info("Job "+ jobName +" is done."); }else { LOG.info("Job "+ jobName +"is going wrong,now exit."); System.exit(0); } }}class CustMapper extends Mapper{ String[] textIn = null; Text outkey = new Text(); Text outvalue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { /** * 假设文件的内容为如下: * boys girls * firends goodbye * down up * fly to * neibors that * */ textIn = value.toString().split("\t"); outkey.set(textIn[0]); outvalue.set(textIn[1]); context.write(outkey, outvalue); } }//自定义OutoutFormatclass CustOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { //获得configration Configuration conf = context.getConfiguration(); //获得FileSystem FileSystem fs = FileSystem.newInstance(conf); //获得输出路径 Path path = CustOutputFormat.getOutputPath(context); URI uri = path.toUri(); //创建两个文件,得到写入流 FSDataOutputStream foa = fs.create(new Path(uri.toString()+"/out.a")); FSDataOutputStream fob = fs.create(new Path(uri.toString()+"/out.b")); //创建自定义RecordWriter 传入 两个流 CustRecordWriter rw = new CustRecordWriter(foa,fob); return rw; } class CustRecordWriter extends RecordWriter { FSDataOutputStream foa = null; FSDataOutputStream fob = null; CustRecordWriter(FSDataOutputStream foa,FSDataOutputStream fob){ this.foa = foa; this.fob = fob; } @Override public void write(Text key, Text value) throws IOException, InterruptedException { String mText = key.toString(); //根据可以长度的不同分别输入到不同的文件 if(mText.length()>=5) { foa.writeUTF(mText+"\t"+value.toString()+"\n"); }else { fob.writeUTF(mText+"\t"+value.toString()+"\n"); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { //最后将两个写入流关闭 if(foa!=null) { foa.close(); } if(fob!=null) { fob.close(); } } } }//使用MultipleInputs,c处理多个来源的文件package hgs.multipuleinput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import hgs.custsort.SortBean;import hgs.custsort.SortDriver;import hgs.custsort.SortMapper;import hgs.custsort.SortReducer;public class MultipuleInputDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SortDriver.class); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); job.setOutputKeyClass(SortBean.class); job.setOutputValueClass(NullWritable.class); MultipleInputs.addInputPath(job, new Path("/sort"), TextInputFormat.class,SortMapper.class); MultipleInputs.addInputPath(job, new Path("/sort1"), TextInputFormat.class,SortMapper.class); //FileInputFormat.setInputPaths(job, new Path("/sort")); FileOutputFormat.setOutputPath(job, new Path("/sortresult")); System.exit(job.waitForCompletion(true)==true?0:1); }}
感谢各位的阅读!关于"hadoop如何自定义格式化输出"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
输出
文件
格式
两个
内容
不同
更多
篇文章
不错
实用
多个
文章
来源
看吧
知识
路径
长度
参考
处理
帮助
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
查看dns服务器地址
数据库安全代理的类别包括
网络安全教育团日
悟空科技服务器
薄荷dns服务器
网络安全的配图
检索数据库概念
云之海服务器会被清空吗
精益软件开发pdf百度云
四川直销会员软件开发
面试中对软件开发的理解
学生选课数据库vfp
海南计算机网络技术专升本
长沙旭峰软件开发有限公司
怎么用数据库造十万条数据
浦东新区信息软件开发管理方法
竞技场视频软件开发
百度云服务器安全组规则不起作用
数据库后端功能有什么
质量高的数据库运维考试题
未落实网络安全技术保护措施
手机软件开发培训需要学什么
北京数据库安全箱销售
湖北安卓软件开发师培训
云南 网络安全和信息化
免费领荣耀水晶的软件开发
php类数据库操作数据库
未房网络技术有限公司
要把源码放进服务器才能做网站吗
惠州网络技术推广