千家信息网

Hadoop中如何分区

发表于:2024-11-23 作者:千家信息网编辑
千家信息网最后更新 2024年11月23日,小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!package partition;
千家信息网最后更新 2024年11月23日Hadoop中如何分区

小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

package partition;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class KpiApp {        public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat";        public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format";        public static void main(String[] args)throws Exception {                Configuration conf = new Configuration();                existsFile(conf);                Job job = new Job(conf, KpiApp.class.getName());                //打成Jar在Linux运行                job.setJarByClass(KpiApp.class);                                //1.1                FileInputFormat.setInputPaths(job, INPUT_PATH);                job.setInputFormatClass(TextInputFormat.class);                                //1.2                job.setMapperClass(MyMapper.class);                job.setOutputKeyClass(Text.class);                job.setOutputValueClass(KpiWritable.class);                                //1.3 自定义分区                job.setPartitionerClass(KpiPartition.class);                job.setNumReduceTasks(2);                                //1.4 排序分组                //1.5 聚合                                //2.1                                //2.2                job.setReducerClass(MyReducer.class);                job.setOutputKeyClass(Text.class);                job.setOutputValueClass(KpiWritable.class);                                //2.3                FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));                job.setOutputFormatClass(TextOutputFormat.class);                                job.waitForCompletion(true);        }        private static void existsFile(Configuration conf) throws IOException,                        URISyntaxException {                FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf);                if(fs.exists(new Path(OUTPUT_PATH))){                        fs.delete(new Path(OUTPUT_PATH), true);                }        }        static class MyMapper extends Mapper{                @Override                protected void map(LongWritable key, Text value, Context context)                                throws IOException, InterruptedException {                        String string = value.toString();                        String[] split = string.split("\t");                        String phone = split[1];                        Text key2 = new Text();                        key2.set(phone);                                                KpiWritable v2= new KpiWritable();                        v2.set(split[6],split[7],split[8],split[9]);                        context.write(key2, v2);                }                        }        static class MyReducer extends Reducer{                @Override                protected void reduce(Text key2, Iterable values,Context context)                                throws IOException, InterruptedException {                                long upPackNum = 0L;                                long downPackNum = 0L;                                long upPayLoad = 0L;                                long downPayLoad = 0L;                                for(KpiWritable writable : values){                                        upPackNum += writable.upPackNum;                                        downPackNum += writable.downPackNum;                                        upPayLoad += writable.upPayLoad;                                        downPayLoad += writable.downPayLoad;                                }                                KpiWritable value3 = new KpiWritable();                                value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad));                                context.write(key2, value3);                }        }}class KpiWritable implements Writable{        long upPackNum;        long downPackNum;        long upPayLoad;        long downPayLoad;        @Override        public void write(DataOutput out) throws IOException {                out.writeLong(this.upPackNum);                out.writeLong(this.downPackNum);                out.writeLong(this.upPayLoad);                out.writeLong(this.downPayLoad);        }        public void set(String string, String string2, String string3,                        String string4) {                this.upPackNum = Long.parseLong(string);                this.downPackNum = Long.parseLong(string2);                this.upPayLoad = Long.parseLong(string3);                this.downPayLoad = Long.parseLong(string4);        }        @Override        public void readFields(DataInput in) throws IOException {                this.upPackNum = in.readLong();                this.downPackNum = in.readLong();                this.upPayLoad = in.readLong();                this.downPayLoad = in.readLong();        }        @Override        public String toString() {                return  upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;        }}class KpiPartition extends Partitioner{        @Override        public int getPartition(Text key, KpiWritable value, int numPartitions) {                String string = key.toString();                return string.length()==11?0:1;        }}

Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。

HashPartitioner是MapReduce的默认Partitioner。

以上是"Hadoop中如何分区"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0