hadoop如何自定义格式化输出
发表于:2025-01-26 作者:千家信息网编辑
千家信息网最后更新 2025年01月26日,这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。import java.io.IOException;import
千家信息网最后更新 2025年01月26日hadoop如何自定义格式化输出
这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
import java.io.IOException;import java.net.URI;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CustomizeOutputFormat { static final Log LOG = LogFactory.getLog(CustomizeOutputFormat.class); public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(CustomizeOutputFormat.class); job.setMapperClass(CustMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //此处这只自定义的格式化输出 job.setOutputFormatClass(CustOutputFormat.class); String jobName = "Customize outputformat test!"; job.setJobName(jobName); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean b = job.waitForCompletion(true); if(b) { LOG.info("Job "+ jobName +" is done."); }else { LOG.info("Job "+ jobName +"is going wrong,now exit."); System.exit(0); } }}class CustMapper extends Mapper{ String[] textIn = null; Text outkey = new Text(); Text outvalue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { /** * 假设文件的内容为如下: * boys girls * firends goodbye * down up * fly to * neibors that * */ textIn = value.toString().split("\t"); outkey.set(textIn[0]); outvalue.set(textIn[1]); context.write(outkey, outvalue); } }//自定义OutoutFormatclass CustOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { //获得configration Configuration conf = context.getConfiguration(); //获得FileSystem FileSystem fs = FileSystem.newInstance(conf); //获得输出路径 Path path = CustOutputFormat.getOutputPath(context); URI uri = path.toUri(); //创建两个文件,得到写入流 FSDataOutputStream foa = fs.create(new Path(uri.toString()+"/out.a")); FSDataOutputStream fob = fs.create(new Path(uri.toString()+"/out.b")); //创建自定义RecordWriter 传入 两个流 CustRecordWriter rw = new CustRecordWriter(foa,fob); return rw; } class CustRecordWriter extends RecordWriter { FSDataOutputStream foa = null; FSDataOutputStream fob = null; CustRecordWriter(FSDataOutputStream foa,FSDataOutputStream fob){ this.foa = foa; this.fob = fob; } @Override public void write(Text key, Text value) throws IOException, InterruptedException { String mText = key.toString(); //根据可以长度的不同分别输入到不同的文件 if(mText.length()>=5) { foa.writeUTF(mText+"\t"+value.toString()+"\n"); }else { fob.writeUTF(mText+"\t"+value.toString()+"\n"); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { //最后将两个写入流关闭 if(foa!=null) { foa.close(); } if(fob!=null) { fob.close(); } } } }//使用MultipleInputs,c处理多个来源的文件package hgs.multipuleinput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import hgs.custsort.SortBean;import hgs.custsort.SortDriver;import hgs.custsort.SortMapper;import hgs.custsort.SortReducer;public class MultipuleInputDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SortDriver.class); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); job.setOutputKeyClass(SortBean.class); job.setOutputValueClass(NullWritable.class); MultipleInputs.addInputPath(job, new Path("/sort"), TextInputFormat.class,SortMapper.class); MultipleInputs.addInputPath(job, new Path("/sort1"), TextInputFormat.class,SortMapper.class); //FileInputFormat.setInputPaths(job, new Path("/sort")); FileOutputFormat.setOutputPath(job, new Path("/sortresult")); System.exit(job.waitForCompletion(true)==true?0:1); }}
感谢各位的阅读!关于"hadoop如何自定义格式化输出"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
输出
文件
格式
两个
内容
不同
更多
篇文章
不错
实用
多个
文章
来源
看吧
知识
路径
长度
参考
处理
帮助
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
双路服务器性能
不进服务器直接能进入数据库吗
gis软件开发流程
软件开发大数据的基础知识
文山服务器上门回收中心
热动专业做软件开发
网络安全知识测试简报
360网络安全中标
git文件上传到服务器
淘宝省数据库
网络安全专项保卫工作
考核点数据库设计
软件开发工具上机
西安服务器有哪些品牌
工会网络安全宣传制度
sql数据库数据已满
大一计算机网络技术题目
天津科研项目管控软件开发平台
计算机三级数据库技术难吗
数据库集群地址
网络安全能开公司么
黑龙江服务器数据迁移
设备管理服务器地址怎么写
天津廊坊金蝶网络技术
网络安全0基础免费
哔哩哔哩服务器账号
自学软件开发可以找到工作吗
iphone软件开发工具
软件开发猎头公司
登陆路由器后无法连接服务器