千家信息网

hadoop中如何利用mapreduce实现wordcount和电影评分预测

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,这篇文章将为大家详细讲解有关hadoop中如何利用mapreduce实现wordcount和电影评分预测,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
千家信息网最后更新 2025年01月31日hadoop中如何利用mapreduce实现wordcount和电影评分预测

这篇文章将为大家详细讲解有关hadoop中如何利用mapreduce实现wordcount和电影评分预测,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

mapreduce中map指映射,map指的是归约。

mapreduce是一个key-value来处理数据的编程模型,它使用map将一组key-value映射为另一组key-value

通过底层传递给reduce,在reduce中,它将所有map过程传递过来的key-value进行归约,相同的key值,value值会放在一起。mapreduce内部还会对reduce过程中的key值进行一次排序。

一.WordCount

public class WordCount{    //    public static final String HDFS = "hdfs://localhost:8888";    public static final Pattern DELIMITER = Pattern.compile("\\b([a-zA-Z]+)\\b");        //自定义Map类型执行  "映射"这一部分    public static class Map extends Mapper    {        //mapreduce中,Text相当于String类型,IntWritable相当于Int类型        //LongWritable是实现了WritableComparable的一个数据类型。        private final static IntWritable one = new IntWritable(1);        private Text word = new Text();                @Override        //重写父类map()函数        public void map(LongWritable key, Text value,                Context context)                throws IOException, InterruptedException        {            //读取一行数据            String line = value.toString();            //将该行字符全部变为小写            line = line.toLowerCase();            //根据定义好的正则表达式拆分一行字符串。            Matcher matcher = DELIMITER.matcher(line);            while(matcher.find()){                //将分解的一个个单词类型转化为Text。                word.set(matcher.group());                //将相应的key-value值传入。key值为单词,value值为1.                context.write(word,one);            }        }    }        //自定义Combine过程先对本地进行的map进行一次reduce过程,减少传递给主机的数据量.    public static class Combine extends Reducer     {         @Override         public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {            int sum = 0;            //遍历同一个key值的所有value,所有的value放在同一个Iterable中。            for (IntWritable line : values)            {                sum += line.get();            }            IntWritable value = new IntWritable(sum);            //将key-value按照指定的输出格式输出。            context.write(key, value);        }    }        public static class Reduce extends Reducer     {        @Override        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {            int sum = 0;           for (IntWritable line : values)           {               sum += line.get();           }           IntWritable value = new IntWritable(sum);           context.write(key, value);                               }    }    public static void main(String[] args) throws Exception    {        JobConf conf = WordCount.config();        String input = "data/1.txt";        String output = HDFS + "/user/hdfs/wordcount";        //自定义HDFS文件操作工具类        HdfsDAO hdfs = new HdfsDAO(WordCount.HDFS, conf);        //移除存在的文件否则会报文件生成文件已存在的错误        hdfs.rmr(output);        Job job = new Job(conf);        job.setJarByClass(WordCount.class);                //设置输出的key值类型        job.setOutputKeyClass(Text.class);        //设置输出的value值类型        job.setOutputValueClass(IntWritable.class);                job.setMapperClass(WordCount.Map.class);        job.setCombinerClass(WordCount.Combine.class);        job.setReducerClass(WordCount.Reduce.class);                job.setInputFormatClass(TextInputFormat.class);        //设置输出的格式,这里使用的是自定义的FileOutputFormat类,见下文。        job.setOutputFormatClass(ParseTextOutputFormat.class);        FileInputFormat.setInputPaths(job, new Path(input));        FileOutputFormat.setOutputPath(job, new Path(output));                                     System.exit(job.waitForCompletion(true) ? 0 : 1);    }            public static JobConf config() {        JobConf conf = new JobConf(WordCount.class);        conf.setJobName("WordCount");        conf.addResource("classpath:/hadoop/core-site.xml");        conf.addResource("classpath:/hadoop/hdfs-site.xml");        conf.addResource("classpath:/hadoop/mapred-site.xml");//        conf.set("io.sort.mb", "");        return conf;    }        }

自定义文件输出格式

import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public class ParseTextOutputFormat extends FileOutputFormat{    protected static class LineRecordWriter extends RecordWriter {        private static final String utf8 = "UTF-8";        private static final byte[] newline;        static {          try {            newline = "\n".getBytes(utf8);          } catch (UnsupportedEncodingException uee) {            throw new IllegalArgumentException("can't find " + utf8 + " encoding");          }        }        protected DataOutputStream out;        private final byte[] keyValueSeparator;        public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {          this.out = out;          try {            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);          } catch (UnsupportedEncodingException uee) {            throw new IllegalArgumentException("can't find " + utf8 + " encoding");          }        }        public LineRecordWriter(DataOutputStream out) {          this(out, "\t");        }        /**         * Write the object to the byte stream, handling Text as a special         * case.         * @param o the object to print         * @throws IOException if the write throws, we pass it on         */        private void writeObject(Object o) throws IOException {          if (o instanceof Text) {            Text to = (Text) o;            out.write(to.getBytes(), 0, to.getLength());          } else {            out.write(o.toString().getBytes(utf8));          }        }        public synchronized void write(K key, V value)          throws IOException {          boolean nullKey = key == null || key instanceof NullWritable;          boolean nullValue = value == null || value instanceof NullWritable;          if (nullKey && nullValue) {            return;          }          if (!nullKey) {            writeObject(key);          }          if (!(nullKey || nullValue)) {            out.write(keyValueSeparator);          }          if (!nullValue) {            writeObject(value);          }          out.write(newline);        }        public synchronized         void close(TaskAttemptContext context) throws IOException {          out.close();        }      }      public RecordWriter              getRecordWriter(TaskAttemptContext job                             ) throws IOException, InterruptedException {        Configuration conf = job.getConfiguration();        boolean isCompressed = getCompressOutput(job);        String keyValueSeparator= conf.get("mapred.textoutputformat.separator",                                           ":");        CompressionCodec codec = null;        String extension = "";        if (isCompressed) {          Class codecClass =             getOutputCompressorClass(job, GzipCodec.class);          codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);          extension = codec.getDefaultExtension();        }        Path file = getDefaultWorkFile(job, extension);        FileSystem fs = file.getFileSystem(conf);        if (!isCompressed) {          FSDataOutputStream fileOut = fs.create(file, false);          return new LineRecordWriter(fileOut, keyValueSeparator);        } else {          FSDataOutputStream fileOut = fs.create(file, false);          return new LineRecordWriter(new DataOutputStream                                            (codec.createOutputStream(fileOut)),                                            keyValueSeparator);        }      }         }

二.电影评分预测

整个算法的实现中使用了slop one算法来预测评分,此处自定义的输出类与上文一致。

输入文件格式为userId::movieId::score

package  main.java.org.conan.myhadoop.recommend;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.regex.Pattern;import org.apache.hadoop.mapred.JobConf;import  main.java.org.conan.myhadoop.hdfs.HdfsDAO;public class Recommend {    public static final String HDFS = "hdfs://localhost:8888";    public static final Pattern DELIMITER = Pattern.compile("[\t,]");        public static final Pattern STRING = Pattern.compile("[\t,:]");    //    public final static int movieListLength = 100000;//    public static int []movieList = new int[movieListLength];    public static List movieList = new ArrayList();        public static Map userScore = new HashMap();    public static void main(String[] args) throws Exception {        Map path = new HashMap();        String in = "logfile/4.txt";        String out = HDFS + "/user/hdfs/recommend" + "/step5";        //       path.put("data", "logfile/small.csv");        //       path.put("data", "logfile/ratings.dat");        if(args.length == 2){            in = args[0];            out = HDFS + args[1];            System.out.println(out);        }        //设置数据输入路径        path.put("data", in);                //设置第一步输入文件路径        path.put("Step1Input", HDFS + "/user/hdfs/recommend");                //设置第一步结果输出路径        path.put("Step1Output", path.get("Step1Input") + "/step1");                //设置第二步输入文件路径        path.put("Step2Input", path.get("Step1Output"));                //设置第二步结果输出路径        path.put("Step2Output", path.get("Step1Input") + "/step2");                //设置第三步输入文件路径        path.put("Step3Input1", path.get("data"));//        path.put("Step3Input2", "logfile/movie/movies.dat");        //设置第三步结果输出路径        path.put("Step3Output", path.get("Step1Input") + "/step3");//        path.put("Step3Input2", path.get("Step2Output"));//        path.put("Step3Output2", path.get("Step1Input") + "/step3_2");//                //设置第四步输入文件路径1        path.put("Step4Input1", path.get("Step2Output"));                //设置第四步输入文件路径2        path.put("Step4Input2", path.get("Step3Output"));        //设置第四步结果输出路径        path.put("Step4Output", path.get("Step1Input") + "/step4");//                //设置第五步输入文件路径        path.put("Step5Input", path.get("Step4Output"));//        path.put("Step5Input2", path.get("Step3Output2"));        //设置第五步结果输出路径        path.put("Step5Output", out);                //第一步,根据给出的用户评分文件,求出每个用户对物品的评分矩阵        Step1.run(path);                //根据第一步的输出结果计算物品评分的同现矩阵        Step2.run(path);                //获取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为0        Step3.run(path);                //根据第二步与第三步的结果计算出每位用户对每部电影的评分        Step4.run(path);                //整理输出格式。        Step5.run(path);                System.exit(0);    }    public static JobConf config() {        JobConf conf = new JobConf(Recommend.class);        conf.setJobName("Recommand");        conf.addResource("classpath:/hadoop/core-site.xml");        conf.addResource("classpath:/hadoop/hdfs-site.xml");        conf.addResource("classpath:/hadoop/mapred-site.xml");//        conf.set("io.sort.mb", "");        return conf;    }}
//求出用户对物品的评分矩阵,即得出用户对电影 的评分矩阵//每一行数据代表一个用户对所有打分电影的结果//key值为userId, value值为movieID:score,movieId:scorepublic class Step1 {    public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper {        private final static Text k = new Text();        private final static Text v = new Text();        @Override        public void map(Object key, Text value, OutputCollector output, Reporter reporter) throws IOException {            String[] tokens = value.toString().split("::");            String itemID = tokens[1];            String pref = tokens[2];            k.set(tokens[0]);            v.set(itemID + ":" + pref);            output.collect(k, v);        }    }    public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer {        @Override        public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {            String value= "";            int num = 0;            while (values.hasNext()) {                num++;                value += values.next();                value += ",";                if( num >= 400 ){                    value = value.substring(0, value.length() - 1);                    Text v = new Text(value);                    output.collect(key, v);                    value = "";                    num = 0;                    break;                }            }            if(num != 0){                value = value.substring(0, value.length() - 1);                Text v = new Text(value);                output.collect(key, v);            }                    }    }    public static void run(Map path) throws IOException {        JobConf conf = Recommend.config();        String input = path.get("Step1Input");        String output = path.get("Step1Output");        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);//        hdfs.rmr(output);        hdfs.rmr(input);        hdfs.mkdirs(input);        hdfs.copyFile(path.get("data"), input);        conf.setMapOutputKeyClass(Text.class);        conf.setMapOutputValueClass(Text.class);        conf.setOutputKeyClass(Text.class);        conf.setOutputValueClass(Text.class);        conf.setMapperClass(Step1_ToItemPreMapper.class);        conf.setReducerClass(Step1_ToUserVectorReducer.class);        conf.setInputFormat(TextInputFormat.class);        conf.setOutputFormat(TextOutputFormat.class);        FileInputFormat.setInputPaths(conf, new Path(input));        FileOutputFormat.setOutputPath(conf, new Path(output));        RunningJob job = JobClient.runJob(conf);        while (!job.isComplete()) {            job.waitForCompletion();        }    }}
//根据第一步的 结果求出物品的同现矩阵//算法方面,没有太好的算法处理两个for循环,就在求物品同现矩阵的时候使用一个随机数,得出一个movieA:movieB的结果public class Step2 {    public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper {        private final static Text k = new Text();        private final static DoubleWritable v = new DoubleWritable();//        private final static IntWritable v = new IntWritable(1);        @Override        public void map(LongWritable key, Text values, OutputCollector output, Reporter reporter) throws IOException {            String[] tokens = Recommend.DELIMITER.split(values.toString());            for (int i = 1; i < tokens.length; i++) {                String itemID = tokens[i].split(":")[0];//                for (int j = 1; j < i+1; j++) {//                    String itemID2 = tokens[j].split(":")[0];//                    double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]); ////                    if(sum<0.5) break;////                    if(sum>4.5) break;//                    k.set(itemID + ":" + itemID2+":");//                    v.set(sum);//                    output.collect(k, v);//                    k.set(itemID2 + ":" + itemID+":");//                    v.set(sum);//                    output.collect(k, v);////                }                Random random = new Random();                int j;                j = random.nextInt(tokens.length - 1) + 1;                String itemID2 = tokens[j].split(":")[0];                double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]);                k.set(itemID + ":" + itemID2+":");                v.set(sum);                output.collect(k, v);            }        }    }    public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer {        private DoubleWritable result = new DoubleWritable();        @Override        public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {            double sum = 0;            int count = 0;            while (values.hasNext()) {                sum += values.next().get();                count++;            }            sum = sum/count*1.0;            DecimalFormat df = new DecimalFormat("#.0000");            sum = Double.valueOf(df.format(sum));//            System.out.println(key+"---count----"+count+"-------"+sum);            result.set(sum);            output.collect(key, result);        }    }    public static void run(Map path) throws IOException {        JobConf conf = Recommend.config();        String input = path.get("Step2Input");         String output = path.get("Step2Output");        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);        hdfs.rmr(output);        conf.setOutputKeyClass(Text.class);        conf.setOutputValueClass(DoubleWritable.class);        conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class);//        conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);        conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);        conf.setInputFormat(TextInputFormat.class);        conf.setOutputFormat(TextOutputFormat.class);        FileInputFormat.setInputPaths(conf, new Path(input));        FileOutputFormat.setOutputPath(conf, new Path(output));        RunningJob job = JobClient.runJob(conf);        while (!job.isComplete()) {            job.waitForCompletion();        }    }}
//取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为0//此处因为没有一个专门的电影记录为文件,所以就只能从数据文件中获取到所有的电影ID。//并将所有的电影ID维持在一个线性表中,但是当数据文件过大时,每次读取一条数据都要从线性表中判断该电影是否已经记录//,导致效率会越来越低//并且维持一个静态map记录每个用户对的第一部评过分的电影,以此作为标准,使用物品同现矩阵进行计算public class Step3 {    public static class Step4_PartialMultiplyMapper extends Mapper {        private final static Text k = new Text();        private final static Text v = new Text();        private String flag;    //判断读取的数据集//        private final static Map> cooccurrenceMatrix = new HashMap>();        @Override        protected void setup(Context context) throws IOException, InterruptedException {            FileSplit split = (FileSplit) context.getInputSplit();            flag = split.getPath().getParent().getName();// 判断读的数据集                    }        @Override        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {            String[] tokens = values.toString().split("::");//            System.out.println(flag);//            System.out.println(tokens.length);//            //            for(int i = 0;i < tokens.length;i++){//                System.out.println(tokens[i]);//            }            //            获取所有的电影数据,应该有一个文件记录所有的电影信息,就不用判断是否包含直接添加            if( !Recommend.movieList.contains(tokens[1]) ){                Recommend.movieList.add(tokens[1]);            }            //            if(flag.equals("movie")){//                Recommend.movieList.add(tokens[0]);//            }//            else{                k.set(tokens[0]);                v.set(tokens[1] + "," + tokens[2]);                context.write(k, v);//            }                    }    }    public static class Step4_AggregateAndRecommendReducer extends Reducer {        private final static Text v = new Text();        @Override        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {            Map userMovieList = new HashMap();            for(Text line : values){                String[] tokens = Recommend.DELIMITER.split(line.toString());                userMovieList.put(tokens[0], tokens[1]);            }            for(int i = 0; i < Recommend.movieList.size();i++){//                System.out.println("key---->" + key);//                System.out.println("value---->" + v);                if(!userMovieList.containsKey(Recommend.movieList.get(i))){                    v.set(Recommend.movieList.get(i) + "," + 0);                    context.write(key, v);                }                else{                    v.set(Recommend.movieList.get(i) + "," + userMovieList.get(Recommend.movieList.get(i)));                    context.write(key, v);                }            }        }    }    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {        JobConf conf = Recommend.config();        String input1 = path.get("Step3Input1");//        String input2 = path.get("Step3Input2");        String output = path.get("Step3Output");        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);        hdfs.rmr(output);        Job job = new Job(conf);        job.setJarByClass(Step3.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        job.setMapperClass(Step3.Step4_PartialMultiplyMapper.class);        job.setReducerClass(Step3.Step4_AggregateAndRecommendReducer.class);        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        FileInputFormat.setInputPaths(job, new Path(input1));        FileOutputFormat.setOutputPath(job, new Path(output));                do{            job.waitForCompletion(false);        }while(!job.isComplete());    }}
//根据第二步与第三步的结果计算出每位用户对每部电影的评分//根据第三步结果,读取数据,当发现用户对某部电影的评分为0的时候,//根据第二步得到的map获取数据和物品同现矩阵计算得出用户对电影的评分public class Step4 {    public static class Step4Update_PartialMultiplyMapper extends Mapper {        private String flag;// A同现矩阵 or B评分矩阵        @Override        protected void setup(Context context) throws IOException, InterruptedException {            FileSplit split = (FileSplit) context.getInputSplit();            flag = split.getPath().getParent().getName();// 判断读的数据集//             System.out.println(flag);        }        @Override        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {            String[] tokens = Recommend.DELIMITER.split(values.toString());                                                if (flag.equals("step2")) {// 同现矩阵//                System.out.println(tokens.length);//                for(int i = 0; i < tokens.length;i++){//                    System.out.println(tokens[i]);//                }//                String[] v1 = tokens[0].split(":");//                String itemID1 = v1[0];//                String itemID2 = v1[1];//                String num = tokens[1];////                Text k = new Text(itemID1);//                Text v = new Text("A:" + itemID2 + "," + num);                String[] v1 = tokens[0].split(":");                                                                                Text k = new Text(v1[0]);                                Text v = new Text("M:" + v1[1] + "," + tokens[1]);                                                context.write(k, v);//                 System.out.println(k.toString() + "  " + v.toString());            } else if (flag.equals("step3")) {// 评分矩阵//                System.out.println(tokens.length);//                for(int i = 0; i < tokens.length;i++){//                    System.out.println(tokens[i]);//                }                //                String[] v2 = tokens[1].split(",");////                String itemID = tokens[0];////                String userID = v2[0];////                String pref = v2[1];                                if(Double.parseDouble(tokens[2]) != 0 && !Recommend.userScore.containsKey(tokens[0])){                    Recommend.userScore.put(tokens[0], tokens[1] + "," + tokens[2]);                }////                Text k = new Text(tokens[1]);                                Text v = new Text("U:" + tokens[0] + "," + tokens[2]);                context.write(k, v);                // System.out.println(k.toString() + "  " + v.toString());            }        }    }    public static class Step4Update_AggregateReducer extends Reducer {        @Override        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {//            System.out.println("key--->" + key);            Map movie = new HashMap();            Text k;            Text v;            //Map user = new HashMap();            List list = new ArrayList();            for (Text line : values) {                list.add(line.toString());//                System.out.println(line.toString());                String[] tokens = Recommend.STRING.split(line.toString());                if(tokens[0].equals("M")){//                    System.out.println(tokens[1]);//                    System.out.println(tokens[2]);                    movie.put(tokens[1], tokens[2]);                }            }                        for(int i = 0;i < list.size();i++) {                                String[] tokens = Recommend.STRING.split((String) list.get(i));                //System.out.println(tokens[0]);                if(tokens[0].equals("U")){                    if(Double.parseDouble(tokens[2]) == 0 ){                        String userScore = (String) Recommend.userScore.get(tokens[1]);                        String[] temps =  Recommend.STRING.split(userScore);                        k = new Text(key);//                        System.out.println("useid"+tokens[1]+"movie score"+temps[1]);//                        System.out.println("movie id"+movie.get(temps[0]));                        double temp = 0;                        if(movie.get(temps[0]) != null){                            Double.parseDouble((String) movie.get(temps[0]));                        }                                                double score = Double.parseDouble(temps[1])+temp;                                                v = new Text(tokens[1] + "," + score);                    }                    else{                        k = new Text(key);                        v = new Text(tokens[1] + "," + tokens[2]);                                                                    }//                    System.out.println("key-->" + k);//                    System.out.println("value-->" + v);                    context.write(k, v);                }                            }                                    //            System.out.println(key.toString() + ":");////            Map mapA = new HashMap();//            Map mapB = new HashMap();////            for (Text line : values) {//                String val = line.toString();//                System.out.println(val);////                if (val.startsWith("A:")) {//                    String[] kv = Recommend.DELIMITER.split(val.substring(2));//                    mapA.put(kv[0], kv[1]);////                } else if (val.startsWith("B:")) {//                    String[] kv = Recommend.DELIMITER.split(val.substring(2));//                    mapB.put(kv[0], kv[1]);////                }//            }////            double result = 0;//            Iterator iter = mapA.keySet().iterator();//            while (iter.hasNext()) {//                String mapk = iter.next();// itemID////                int num = Integer.parseInt(mapA.get(mapk));//                Iterator iterb = mapB.keySet().iterator();//                while (iterb.hasNext()) {//                    String mapkb = iterb.next();// userID//                    double pref = Double.parseDouble(mapB.get(mapkb));//                    result = num * pref;// 矩阵乘法相乘计算////                    Text k = new Text(mapkb);//                    Text v = new Text(mapk + "," + result);//                    context.write(k, v);//                    System.out.println(k.toString() + "  " + v.toString());//                }//            }        }    }    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {        JobConf conf = Recommend.config();        String input1 = path.get("Step4Input1");        String input2 = path.get("Step4Input2");        String output = path.get("Step4Output");        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);        hdfs.rmr(output);        Job job = new Job(conf);        job.setJarByClass(Step4.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        job.setMapperClass(Step4.Step4Update_PartialMultiplyMapper.class);        job.setReducerClass(Step4.Step4Update_AggregateReducer.class);        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));        FileOutputFormat.setOutputPath(job, new Path(output));                do{            job.waitForCompletion(false);        }while(!job.isComplete());            }}
//对最后的数据输出格式做一遍规范。public class Step5 {      public static class Step5_PartialMultiplyMapper extends Mapper {            @Override         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { //            System.out.println("run"); //            System.out.println("key--->" + key);             String[] tokens = Recommend.DELIMITER.split(values.toString());             Text k = new Text(tokens[1]);             Text v;             if(Double.parseDouble(tokens[2]) == 0){                 v = new Text(tokens[0] + "::");             }             else{                 v = new Text(tokens[0] + "::" + tokens[2]);             }             context.write(k, v);         }      }      public static class Step5_AggregateReducer extends Reducer {          @Override         public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {             for (Text line : values) {                 Text k = new Text(key.toString());                 context.write(k, line);             }         }     }      public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {         JobConf conf = Recommend.config();          String input = path.get("Step5Input");         String output = path.get("Step5Output");          HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);         hdfs.rmr(output);          Job job = new Job(conf);         job.setJarByClass(Step5.class);          job.setOutputKeyClass(Text.class);         job.setOutputValueClass(Text.class);          job.setMapperClass(Step5.Step5_PartialMultiplyMapper.class);         job.setReducerClass(Step5.Step5_AggregateReducer.class);          job.setInputFormatClass(TextInputFormat.class);         job.setOutputFormatClass(ParseTextOutputFormat.class);          FileInputFormat.setInputPaths(job, new Path(input));         FileOutputFormat.setOutputPath(job, new Path(output));          do{             job.waitForCompletion(false);         }while(!job.isComplete());         System.out.println("---------------------end--------------------");     }  }

关于hadoop中如何利用mapreduce实现wordcount和电影评分预测就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0