Hadoop的多文件输出及自定义文件名方法是什么
发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,本篇内容介绍了"Hadoop的多文件输出及自定义文件名方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够
千家信息网最后更新 2025年02月06日Hadoop的多文件输出及自定义文件名方法是什么
本篇内容介绍了"Hadoop的多文件输出及自定义文件名方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
首先是输出格式的类,也就是job.setOutputFormatClass(……)参数列表中的类:
public class MoreFileOutputFormat extends Multiple{ @Override protected String generateFileNameForKeyValue(Text key, Text value,Configuration conf) { return "Your name"; }}
这里,继承Multiple类后必须重写generateFileNameForKeyValue()方法,这个方法返回的字符串作为输出文件的文件名。内容有各位自己根据需要编写。同时,key和value的值也根据自己的需要更换。
接下来是Multiple模板类的代码:
import java.io.DataOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public abstract class Multiple, V extends Writable> extends FileOutputFormat { // 接口类,需要在调用程序中实现generateFileNameForKeyValue来获取文件名 private MultiRecordWriter writer = null; public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } /** * get task output path * * @param conf * @return * @throws IOException */ private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } //继承后重写以获得文件名 protected abstract String generateFileNameForKeyValue(K key, V value,Configuration conf); //实现记录写入器RecordWriter类 (内部类) public class MultiRecordWriter extends RecordWriter { /** RecordWriter的缓存 */ private HashMap > recordWriters = null; private TaskAttemptContext job = null; /** 输出目录 */ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap >(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator > values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { // 得到输出文件名 String baseName = generateFileNameForKeyValue(key, value,job.getConfiguration()); // 如果recordWriters里没有文件名,那么就建立。否则就直接写值。 RecordWriter rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); // 查看是否使用解码器 boolean isCompressed = getCompressOutput(job); RecordWriter recordWriter = null; if (isCompressed) { Class extends CompressionCodec> codecClass = getOutputCompressorClass( job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); // 这里我使用的自定义的OutputFormat recordWriter = new MyRecordWriter (new DataOutputStream( codec.createOutputStream(fileOut))); } else { Path file; System.out.println("workPath = " + workPath + ", basename = " + baseName); file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); // 这里我使用的自定义的OutputFormat recordWriter = new MyRecordWriter (fileOut); } return recordWriter; } }}
现在来实现Multiple的内部类MultiRecordWriter中的MyRecordWriter类以实现自己想要的输出方式:
public class MyRecordWriterextends RecordWriter { private static final String utf8 = "UTF-8";//定义字符编码格式 protected DataOutputStream out; public MyRecordWriter(DataOutputStream out) { this.out = out; } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { //输出成字节流。如果不是文本类的,请更改此处 out.write(o.toString().getBytes(utf8)); } } /** * 将mapreduce的key,value以自定义格式写入到输出流中 */ public synchronized void write(K key, V value) throws IOException { writeObject(value); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
这个类中还有其它集中方法,不过笔者不需要那些方法,所以把它们都删除了,但最初的文件也删除了- -,所以现在找不到了。请大家见谅。
现在,只需在main()或者run()函数中将job的输出格式设置成MoreFileOutputFormat类就行了,如下:
job.setOutputFormatClass(MoreFileOutputFormatClass);
"Hadoop的多文件输出及自定义文件名方法是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
文件
输出
文件名
方法
格式
内容
接下来
字符
更多
知识
实用
学有所成
中将
也就是
代码
函数
参数
只需
同时
困境
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
gbase数据库怎么释放表空间
方舟手游如何快速升级服务器
软件开发性能由谁提
手机怎么查服务器
软件开发最小功能集合
蓬江区粤信网络技术
宁夏卤悠网络技术有限公司
keep跑步软件开发
中国如何建设网络安全
没有主根服务器还能上网吗
消费者网络安全法
服务器开机连续四声短鸣报警
瑞丽航空软件开发部
tp3定时推送文件到指定服务器
华为mate11手机数据库
交换机和服务器的端口聚合
网络安全防护设备招投标文件
网络安全突发事件监控组
政务软件开发公司哪家服务好
node搭建本地服务器
舆情系统数据库设计
国企下属网络安全公司
sql数据库去除空格
服务器显卡电源不足
学网络安全能创业吗
江苏通信网络技术服务
联想哪款笔记本软件开发好
敏捷软件开发 java版
进口工业实时数据库价格
一加7pro小白数据库