千家信息网

hadoop streaming如何实现多路输出扩展

发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,这篇文章主要介绍hadoop streaming如何实现多路输出扩展,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!PrefixMultipleOutputFormat 实现的功
千家信息网最后更新 2025年02月07日hadoop streaming如何实现多路输出扩展

这篇文章主要介绍hadoop streaming如何实现多路输出扩展,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

PrefixMultipleOutputFormat 实现的功能点有两个

  • 按照key的前缀输入到不同的目录

  • 删除最终输出结果中的tab

##使用方式### ####按照key 的 前缀输出到不同目录中

 $maserati_hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -libjars ./adts.jar     -D mapred.job.name=$name \      -D mapred.reduce.tasks=5 \       -inputformat org.apache.hadoop.mapred.TextInputFormat \      -outputformat com.sogou.adt.adts.PrefixMultipleOutputFormat \      -input $input \      -output $output \      -mapper ./m_mapper.sh \      -reducer ./m_reducer.sh \      -file m_mapper.sh \      -file m_reducer.sh

其中outputformat 指定的是 自己时间的类 -libjars ./adts.jar导入的是自己的jar包

###mapper 和 reduer.sh  ##m_maper.sh## #!/bin/bash awk -F " " '{                         for(i=1;i<=NF;i++)               print $i;      }'  ###m_reduer.sh### #!/bin/bash awk -F "\t" '{if(NR%3==0)        print "A#"$1;if(NR%3==1)        print "B#"$1;if(NR%3==2)        print "C#"$1;    }'

这样就可以将数字分别输入到不同的路径中了

####删除行尾的tab 只需要加入com.sogou.adt.adts.ignoreseparator=true指定忽略行尾的tab 即可

    $maserati_hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -libjars ./adts.jar     -D mapred.job.name=$name \      -D mapred.reduce.tasks=5 \      -D com.sogou.adt.adts.ignoreseparator=true \     -inputformat org.apache.hadoop.mapred.TextInputFormat \      -outputformat com.sogou.adt.adts.PrefixMultipleOutputFormat \      -input $input \      -output $output \      -mapper ./m_mapper.sh \      -reducer ./m_reducer.sh \      -file m_mapper.sh \      -file m_reducer.sh

###PrefixMultipleOutputFormat的实现方式 由于并不熟悉java语言,在大学学的那点java也早就还给老师了^v^ 搭建编译环境费了些时日,不过好在有个现成的eclipse java 环境 还有两年前搭建好的hadoop环境(它稍微修复一点点就ok了, 能跑程序了, 真是万幸)。

###我的环境

  • eclipse

  • jdk1.6.0

  • jar包

    • hadoop-common-2.6.0.jar

  • hadoop-mapreduce-client-core-2.6.0.jar

这个简单介绍一下 编译之前我还在担心hadoop streaming 依赖的jar包哪里去找,用不用自己编译(hadoop所有的源码编译让人有点头疼),后来发现jar 包都可以在 hadoop 运行环境中找到,瞬间释然了。

###源码 这段代码挺好理解的了一个LineRecordWriter类 (大部分都是从现有的TextOutputFormat 类中扒的 只是改动一点 读配置 关闭输出tab) generateFileNameForKeyValue 实现了从前缀读取并输出到不同的目录中,代码一目了然

package com.sogou.adt.adts;  import java.io.DataOutputStream;     import java.io.IOException;     import java.io.UnsupportedEncodingException;  import org.apache.hadoop.fs.FSDataOutputStream;      import org.apache.hadoop.fs.FileSystem;  import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.RecordWriter;import org.apache.hadoop.mapred.Reporter;    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;    import org.apache.hadoop.util.Progressable;import org.apache.hadoop.util.ReflectionUtils; public class PrefixMultipleOutputFormat extends MultipleTextOutputFormat  {[@Override](https://my.oschina.net/u/1162528)protected Text generateActualKey(Text key, Text value) {        // TODO Auto-generated method stub        return super.generateActualKey(key, value);}protected static class LineRecordWriterimplements RecordWriter {private static final String utf8 = "UTF-8";private static final byte[] newline;static {  try {    newline = "\n".getBytes(utf8);  } catch (UnsupportedEncodingException uee) {    throw new IllegalArgumentException("can't find " + utf8 + " encoding");  }}protected DataOutputStream out;private final byte[] keyValueSeparator;public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {  this.out = out;  try {    this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  } catch (UnsupportedEncodingException uee) {    throw new IllegalArgumentException("can't find " + utf8 + " encoding");  }}public LineRecordWriter(DataOutputStream out) {  this(out, "\t");}/** * Write the object to the byte stream, handling Text as a special * case. * [@param](https://my.oschina.net/u/2303379) o the object to print * [@throws](https://my.oschina.net/throws) IOException if the write throws, we pass it on */private void writeObject(Object o) throws IOException {  if (o instanceof Text) {    Text to = (Text) o;    out.write(to.getBytes(), 0, to.getLength());  } else {    out.write(o.toString().getBytes(utf8));  }}public synchronized void write(K key, V value)  throws IOException {  boolean nullKey = key == null || key instanceof NullWritable;  boolean nullValue = value == null || value instanceof NullWritable;  if (nullKey && nullValue) {    return;  }  if (!nullKey) {    writeObject(key);  }  if (!(nullKey || nullValue)) {    out.write(keyValueSeparator);  }  if (!nullValue) {    writeObject(value);  }  out.write(newline);}public synchronized void close(Reporter reporter) throws IOException {  out.close();}  }[@Override](https://my.oschina.net/u/1162528)protected RecordWriter getBaseRecordWriter(FileSystem fs,                JobConf job, String name, Progressable arg3) throws IOException {         boolean isCompressed = getCompressOutput(job);            String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",                                                "\t");                        Boolean ignoreseparator = job.getBoolean("com.sogou.adt.adts.ignoreseparator", false);            if(ignoreseparator)            {                    keyValueSeparator="";            }            if (!isCompressed) {              Path file = FileOutputFormat.getTaskOutputPath(job, name);              fs = file.getFileSystem(job);              FSDataOutputStream fileOut = fs.create(file, arg3);              return new LineRecordWriter(fileOut, keyValueSeparator);            } else {              Class codecClass =                getOutputCompressorClass(job, GzipCodec.class);              // create the named codec              CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);              // build the filename including the extension              Path file =                 FileOutputFormat.getTaskOutputPath(job,                                                    name + codec.getDefaultExtension());               fs = file.getFileSystem(job);              FSDataOutputStream fileOut = fs.create(file, arg3);              return new LineRecordWriter(new DataOutputStream                                                (codec.createOutputStream(fileOut)),                                                keyValueSeparator);            }}[@Override](https://my.oschina.net/u/1162528)protected String generateFileNameForKeyValue(Text key, Text value,                String name) {         int keyLength = key.getLength();          String outputName = name;                  if(keyLength < 2)                return outputName;                Text sep = new Text();        sep.append(key.getBytes(), 1, 1);                if(sep.find("#") != -1)        {                  Text newFlag = new Text();            newFlag.append(key.getBytes(), 0, 1);            String flag = newFlag.toString();            //outputName = name+"-"+flag;            outputName = flag+"/"+name+"-"+flag;                            Text newValue = new Text();            newValue.append(key.getBytes(), 2, keyLength-2);                        key.set(newValue);                                                  }         System.out.printf("[shishuai]System[key [%s]][value:[%s]] output[%s]\n",key.toString(),value.toString(),outputName);        return outputName;  }

}

以上是"hadoop streaming如何实现多路输出扩展"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

0