千家信息网

学习日志---partitioner和采样器

发表于:2024-11-11 作者:千家信息网编辑
千家信息网最后更新 2024年11月11日,Mapreduce中:shuffle阶段是在map和reduce之间,可以自定义排序,自定义分区和自定义分组!Mapreduce中,map出的数据是键值对,默认的是hashPatitionner来对m
千家信息网最后更新 2024年11月11日学习日志---partitioner和采样器

Mapreduce中:

shuffle阶段是在map和reduce之间,可以自定义排序,自定义分区和自定义分组!


Mapreduce中,map出的数据是键值对,默认的是hashPatitionner来对map出的数据进行分区;

分区的方法还有其他几个:

RandomSampler sampler =                      new InputSampler.RandomSampler(0.5, 3000, 10);IntervalSampler sampler2 =                     new InputSampler.IntervalSampler(0.333, 10);SplitSampler sampler3 =                     new InputSampler.SplitSampler(reduceNumber);

实现和细节

public class TotalSortMR {           @SuppressWarnings("deprecation")    public static int runTotalSortJob(String[] args) throws Exception {          Path inputPath = new Path(args[0]);          Path outputPath = new Path(args[1]);          Path partitionFile = new Path(args[2]);          int reduceNumber = Integer.parseInt(args[3]);                    //三种采样器        RandomSampler sampler = new InputSampler.RandomSampler(1, 3000, 10);        IntervalSampler sampler2 = new InputSampler.IntervalSampler(0.333, 10);        SplitSampler sampler3 = new InputSampler.SplitSampler(reduceNumber);                //任务初始化        Configuration conf = new Configuration();          Job job = Job.getInstance(conf);                job.setJobName("Total-Sort");          job.setJarByClass(TotalSortMR.class);          job.setInputFormatClass(KeyValueTextInputFormat.class);          job.setMapOutputKeyClass(Text.class);          job.setMapOutputValueClass(Text.class);          job.setNumReduceTasks(reduceNumber);          //设置所有的分区类        job.setPartitionerClass(TotalOrderPartitioner.class);          //分区类参考的分区文件        TotalOrderPartitioner.setPartitionFile(conf, partitionFile);          //分区使用哪种采样器        InputSampler.writePartitionFile(job, sampler);                 //job的输入和输出路径        FileInputFormat.setInputPaths(job, inputPath);          FileOutputFormat.setOutputPath(job, outputPath);          outputPath.getFileSystem(conf).delete(outputPath, true);                    return job.waitForCompletion(true)? 0 : 1;    }            public static void main(String[] args) throws Exception{          System.exit(runTotalSortJob(args));      }}

job默认的输入格式是TextInputFormat,这个是key-value的形式,key是每行的行标,value是每行的内容。可以更改

job.setInputFormatClass(,....)

一般要设置mapper的输出格式,以备后面使用。

0