千家信息网

hadoop datajoin有什么用

发表于:2024-09-21 作者:千家信息网编辑
千家信息网最后更新 2024年09月21日,本篇内容主要讲解"hadoop datajoin有什么用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"hadoop datajoin有什么用"吧!hado
千家信息网最后更新 2024年09月21日hadoop datajoin有什么用

本篇内容主要讲解"hadoop datajoin有什么用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"hadoop datajoin有什么用"吧!

hadoop datajoin 之reduce side join

hadoop提供了一个叫datajoin的jar包,用于解决两张表关联的问题。jar位于/hadoop/contrib/datajoin将jar包引入进行开发。

涉及的几个概念:

1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式)

      Customers                 Orders
      1,Stephanie Leung,555-555-5555      3,A,12.95,02-Jun-2008
      2,Edward Kim,123-456-7890         1,B,88.25,20-May-2008
      3,Jose Madriz,281-330-8004         2,C,32.00,30-Nov-2007
      4,David Stork,408-555-0000         3,D,25.02,22-Jan-2009

2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中。在这个目的下,我们将使用每个record自身的Data source名称标记每个record。

3.Group Key:Group Key类似于关系数据库中的链接键(join key),在我们的例子中,group key就是Customer ID(第一列的3)。由于datajoin包允许用户自定义group key,所以其较之关系数据库中的join key更一般、平常。

使用以下样例数据

customers-20140716

1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000

orders-20140716

3,A,12.95,02-Jun-2008
1,B,88.25,20-May-2008
2,C,32.00,30-Nov-2007
3,D,25.02,22-Jan-2009
请看流程图

第一部分,自定义的数据类型。数据类型主要包含两部分tag和data,tag是给数据打上的标签用来表示数据来源于哪个文件。data是数据记录。

上代码:

public class TaggedWritable extends TaggedMapOutput {    private Text data;        public TaggedWritable() {        this.tag = new Text("");        this.data = new Text("");    }        public TaggedWritable(Text data) {        this.data = data;    }    @Override    public void readFields(DataInput in) throws IOException {        this.tag.readFields(in);        this.data.readFields(in);    }    @Override    public void write(DataOutput out) throws IOException {        this.tag.write(out);        this.data.write(out);    }    @Override    public Text getData() {        return data;    }}

第二部分,map函数

public class Mapclass extends DataJoinMapperBase{    @Override    protected Text generateGroupKey(TaggedMapOutput aRecord) {        String line = aRecord.getData().toString();        String[] tokens = line.split(",");        String groupKey = tokens[0];        return new Text(groupKey);    }    @Override    protected Text generateInputTag(String inputFile) {        String datasource = inputFile.split("-")[0];        return new Text(datasource);    }    @Override    protected TaggedWritable generateTaggedMapOutput(Object value) {        TaggedWritable retv = new TaggedWritable(new Text(value.toString()));        retv.setTag(this.inputTag);        return retv;    }}

map函数中要特别注意protected TaggedWritable generateTaggedMapOutput(Object value) 该方法的返回类型为你第一步定义的类型。

第三部分,reduce

public class Reduce extends DataJoinReducerBase{    @Override    protected TaggedMapOutput combine(Object[] tags, Object[] values) {        if (tags.length < 2) {            return null;            }                String joinedStr = "";           for (int i=0; i 0){                joinedStr += ",";             }            TaggedWritable tw = (TaggedWritable) values[i];              String line = tw.getData().toString();              System.out.println("line=========:"+line);            String[] tokens = line.split(",", 2);              joinedStr += tokens[1];          }          TaggedWritable retv = new TaggedWritable(new Text(joinedStr));          retv.setTag((Text) tags[0]);           return retv;      }}

reduce过程会将主键与数据组合在一起输出,你拼接的字符串中无需写入主键。

public class Datajoin extends Configured implements Tool {    @Override    public int run(String[] args) throws Exception {        Configuration conf = this.getConf();        JobConf job = new JobConf(conf, Datajoin.class);        job.setJarByClass(Datajoin.class);        Path in = new Path("hdfs://172.16.0.87:9000/user/jeff/datajoin/");        Path out = new Path("hdfs://172.16.0.87:9000/user/jeff/datajoin/out");        FileInputFormat.setInputPaths(job, in);        FileOutputFormat.setOutputPath(job, out);        job.setJobName("DataJoin");                job.setMapperClass(Mapclass.class);        job.setReducerClass(Reduce.class);                job.setInputFormat(TextInputFormat.class);        job.setOutputFormat(TextOutputFormat.class);                job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(TaggedWritable.class);                job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        job.set("mapred.textoutputformat.separator", ",");        JobClient.runJob(job);        return 0;    }    public static void main(String[] args) throws Exception {        int res = ToolRunner.run(new Configuration(), new Datajoin(), args);        System.exit(res);    }}

运行mapreduce任务后的输出为:

1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008
2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007
3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008
3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009

可以在reduce的combin函数中控制函数的输出形式,左联,或者右联。

到此,相信大家对"hadoop datajoin有什么用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0