千家信息网

学习日志---基于hadoop实现PageRank

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,PageRank简单介绍:其值是通过其他值得指向值所决定,具体例子如下:第一部分:对应于每个mapReduce的计算:由mapper算出每个点所指节点的分值,由reduce整个key相同的,由公式算出
千家信息网最后更新 2025年01月23日学习日志---基于hadoop实现PageRank

PageRank简单介绍:

其值是通过其他值得指向值所决定,具体例子如下:

第一部分:

对应于每个mapReduce的计算:

由mapper算出每个点所指节点的分值,由reduce整个key相同的,由公式算出。

三角号表示的是迭代两次之间计算的差值,若小于某个值则计算完成,求的每个点的pagerank值。

自我实现的代码:如下

输入的数据分为:

input1.txt

A,B,D
B,C
C,A,B
D,B,C

表示每行第一个点所指向的节点,在reducer的setup会用到,构建hashmap供使用。

input2.txt

A,0.25,B,D
B,0.25,C
C,0.25,A,B
D,0.25,B,C

中间多的数字,表示当前每个节点的pagerank值,其文件可无,因为可以由上面的文件计算生成,有四个节点,即1/4。

自我实现的代码:

package bbdt.steiss.pageRank;import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.net.URI;import java.util.ArrayList;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class PageRank {    public static class PageMapper extends Mapper{                private Text averageValue = new Text();        private Text node = new Text();                @Override        //把每行数据的对应节点的分pagerank找出,并输出,当前节点的值除以指向节点的总数        protected void map(LongWritable key, Text value,                Context context)                throws IOException, InterruptedException {            String string = value.toString();            String [] ss = string.split(",");            int length = ss.length;            double pageValue = Double.parseDouble(ss[1]);            double average = pageValue/(length-2);            averageValue.set(String.valueOf(average));            int i = 2;            while(i<=length-1){                node.set(ss[i]);                context.write(node,averageValue);                i++;            }                    }    }         public static class PageReducer extends Reducer{                private HashMap content;        private Text res = new Text();                //reducer工作前,key相同的会分组分在一组,用迭代器操作,从总的图中找到所有该节点的分pagerank值        //利用公式计算该pagerank值,输出。因为下一次要用,因此输出可以凑近一些,把结果都放在value里输出        @Override        protected void reduce(Text text, Iterable intIterable,                Context context)                throws IOException, InterruptedException {            double sum = 0.0;            double v = 0.0;            for (Text t : intIterable) {                v = Double.parseDouble(t.toString());                sum = sum + v;            }            double a = 0.85;            double result = (1-a)/4 + a*sum;            String sRes = String.valueOf(result);            String back = content.get(text.toString());            String front = text.toString();            String comp = front + "," + sRes + back;            res.set(comp);            context.write(null,res);                    }                @Override        //reducer的初始化时,先把节点对应文件的数据,存在hashmap中,也就是content中,供每次reduce方法使用,相当于数据库的作用        //方便查询        protected void setup(Context context)                throws IOException, InterruptedException {            URI[] uri = context.getCacheArchives();            content = new HashMap();            for(URI u : uri)            {                FileSystem fileSystem = FileSystem.get(u.create("hdfs://hadoop1:9000"), context.getConfiguration());                FSDataInputStream in = null;                in = fileSystem.open(new Path(u.getPath()));                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));                String line;                while((line = bufferedReader.readLine())!=null)                {                    int index = line.indexOf(",");                    String first = line.substring(0,index);                    String last = line.substring(index,line.length());                    content.put(first, last);                }                            }        }    }        public static void main(String[] args) throws Exception{                //接受路径文件        Path inputPath = new Path(args[0]);        Path outputPath = new Path(args[1]);        Path cachePath = new Path(args[2]);        double result = 100;        int flag = 0;        //制定差值多大时进入循环        while(result>0.1)        {            if(flag == 1)            {                //初次调用mapreduce不操作这个                //这个是把mapreduce的输出文件复制到输入文件中,作为这次mapreduce的输入文件                copyFile();                flag = 0;            }            Configuration configuration = new Configuration();            Job job = Job.getInstance(configuration);                        job.setJarByClass(PageRank.class);            job.setMapperClass(PageMapper.class);            job.setReducerClass(PageReducer.class);                        job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(Text.class);                job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);                        job.setInputFormatClass(TextInputFormat.class);            job.setOutputFormatClass(TextOutputFormat.class);                        FileInputFormat.addInputPath(job, inputPath);            FileOutputFormat.setOutputPath(job, outputPath);            job.addCacheArchive(cachePath.toUri());            outputPath.getFileSystem(configuration).delete(outputPath, true);            job.waitForCompletion(true);                        String outpathString = outputPath.toString()+"/part-r-00000";            //计算两个文件的各节点的pagerank值差            result = fileDo(inputPath, new Path(outpathString));            flag = 1;        }            System.exit(0);       }        //计算两个文件的每个节点的pagerank差值,返回    public static double fileDo(Path inputPath,Path outPath) throws Exception    {         Configuration conf = new Configuration();         conf.set("fs.defaultFS", "hdfs://hadoop1:9000");         FileSystem fs = FileSystem.get(conf);         FSDataInputStream in1 = null;         FSDataInputStream in2 = null;         in1 = fs.open(inputPath);         in2 = fs.open(outPath);         BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));         BufferedReader br2 = new BufferedReader(new InputStreamReader(in2));         String s1 = null;         String s2 = null;         ArrayList arrayList1 = new ArrayList();         ArrayList arrayList2 = new ArrayList();         while ((s1 = br1.readLine()) != null)         {             String[] ss = s1.split(",");             arrayList1.add(Double.parseDouble(ss[1]));         }         br1.close();                  while ((s2 = br2.readLine()) != null)         {             String[] ss = s2.split(",");             arrayList2.add(Double.parseDouble(ss[1]));         }         double res = 0;                  for(int i = 0;i

注意:

在本地操作hdfs时,进行文件的删除和添加,需要打开hdfs的文件操作权限,

这里删除需要打开hdfs在/input目录下的权限操作,非常重要"hdfs dfs -chmod 777 /input"打开权限,这样才可以删除其下面的文件

打开/input路径的操作权限


第二部分

以上是自己实现的pagerank的算法;下面介绍一下别人的代码

robby的代码实现:

1.首先对节点定义节点类,用于存当前节点的pagerank值以及所指向的节点,存在一个数组中。

package org.robby.mr.pagerank;import org.apache.commons.lang.StringUtils;import java.io.IOException;import java.util.Arrays;//节点类,记录的是当前节点的pagerank值和其指向的节点public class Node {  private double pageRank = 0.25;  private String[] adjacentNodeNames;  //分割符号  public static final char fieldSeparator = '\t';  public double getPageRank() {    return pageRank;  }  public Node setPageRank(double pageRank) {    this.pageRank = pageRank;    return this;  }  public String[] getAdjacentNodeNames() {    return adjacentNodeNames;  }  //接受一个数组,复制在指向节点数组上  public Node setAdjacentNodeNames(String[] adjacentNodeNames) {    this.adjacentNodeNames = adjacentNodeNames;    return this;  }  public boolean containsAdjacentNodes() {    return adjacentNodeNames != null;  }  //这个方法是从pagerank值开始+后面的指向的节点  @Override  public String toString() {    StringBuilder sb = new StringBuilder();    sb.append(pageRank);    if (getAdjacentNodeNames() != null) {      sb.append(fieldSeparator)          .append(StringUtils              .join(getAdjacentNodeNames(), fieldSeparator));    }    return sb.toString();  }  //通过字符串建立一个node  public static Node fromMR(String value) throws IOException {    String[] parts = StringUtils.splitPreserveAllTokens(        value, fieldSeparator);    if (parts.length < 1) {      throw new IOException(          "Expected 1 or more parts but received " + parts.length);    }    Node node = new Node()        .setPageRank(Double.valueOf(parts[0]));    if (parts.length > 1) {      node.setAdjacentNodeNames(Arrays.copyOfRange(parts, 1,          parts.length));    }    return node;  }}


2.这个是mapper的实现

package org.robby.mr.pagerank;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;//这里map的输入时Text和Text类型,说明是两个文本,因此主函数中应设置job的输入类型格式为KeyValueTextInputFormatpublic class Map    extends Mapper {  private Text outKey = new Text();  private Text outValue = new Text();  @Override  protected void map(Text key, Text value, Context context)      throws IOException, InterruptedException {          //先把原始的数据输出,供reduce找指向节点使用    context.write(key, value);             //传入时,key是第一个节点,以制表符分割,后面是value    Node node = Node.fromMR(value.toString());    if(node.getAdjacentNodeNames() != null &&        node.getAdjacentNodeNames().length > 0) {      double outboundPageRank = node.getPageRank() /          (double)node.getAdjacentNodeNames().length;      for (int i = 0; i < node.getAdjacentNodeNames().length; i++) {        String neighbor = node.getAdjacentNodeNames()[i];        outKey.set(neighbor);                Node adjacentNode = new Node()            .setPageRank(outboundPageRank);        outValue.set(adjacentNode.toString());        System.out.println(            "  output -> K[" + outKey + "],V[" + outValue + "]");        //这里输出计算出的节点分pagerank值        context.write(outKey, outValue);      }    }  }}输出的数据:例子A  0.25  B  DB  0.125  D  0.125

注意:

KeyValueTextInputFormat的输入格式(Text,Text),对每行的文本内容进行处理,以第一个制表符作为分割,分为key和value传入。

TextInputFormat的格式是(Longwritable,Text),以行标作为key,内容作为value处理;


3.reduce方法的实现

package org.robby.mr.pagerank;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Reduce    extends Reducer {  public static final double CONVERGENCE_SCALING_FACTOR = 1000.0;  public static final double DAMPING_FACTOR = 0.85;  public static String CONF_NUM_NODES_GRAPH = "pagerank.numnodes";  private int numberOfNodesInGraph;  public static enum Counter {    CONV_DELTAS  }  //reduce初始化时执行的方法,得到总节点个数,在conf对象里  @Override  protected void setup(Context context)      throws IOException, InterruptedException {    numberOfNodesInGraph = context.getConfiguration().getInt(        CONF_NUM_NODES_GRAPH, 0);  }  private Text outValue = new Text();  public void reduce(Text key, Iterable values,                     Context context)      throws IOException, InterruptedException {    System.out.println("input -> K[" + key + "]");    double summedPageRanks = 0;    Node originalNode = new Node();    for (Text textValue : values) {      System.out.println("  input -> V[" + textValue + "]");      Node node = Node.fromMR(textValue.toString());      //这里就是传入的是原始数据      if (node.containsAdjacentNodes()) {        // the original node        //        originalNode = node;      } else {        //计算针对一个节点的pagerank总和        summedPageRanks += node.getPageRank();      }    }    double dampingFactor =        ((1.0 - DAMPING_FACTOR) / (double) numberOfNodesInGraph);    double newPageRank =        dampingFactor + (DAMPING_FACTOR * summedPageRanks);    //计算差值    double delta = originalNode.getPageRank() - newPageRank;    //把原节点对象的pagerank改为新的    originalNode.setPageRank(newPageRank);    outValue.set(originalNode.toString());    System.out.println(        "  output -> K[" + key + "],V[" + outValue + "]");            //把更改后的节点对象输出    context.write(key, outValue);    int scaledDelta =        Math.abs((int) (delta * CONVERGENCE_SCALING_FACTOR));    System.out.println("Delta = " + scaledDelta);    //这个是计数器,mapreduce有很多计数器,自定义的要通过enum对象传入建立和取值    //increment是增值的意思    context.getCounter(Counter.CONV_DELTAS).increment(scaledDelta);  }}

4.main函数的实现:

package org.robby.mr.pagerank;import org.apache.commons.io.*;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.*;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.*;import java.util.*;public final class Main {  public static void main(String... args) throws Exception {    //传入输入文件的路径,与输出文件的路径    String inputFile = args[0];    String outputDir = args[1];    iterate(inputFile, outputDir);  }  public static void iterate(String input, String output)      throws Exception {    //因为这个是在hadoop上运行的(hadoop jar ...),因此conf会自动配上集群上hadoop的hdfs的入口    //后面的文件可以直接找filesystem,即hdfs的文件操作类    Configuration conf = new Configuration();    Path outputPath = new Path(output);    outputPath.getFileSystem(conf).delete(outputPath, true);    outputPath.getFileSystem(conf).mkdirs(outputPath);    //建立输入文件    Path inputPath = new Path(outputPath, "input.txt");    //建立文件,返回节点个数    int numNodes = createInputFile(new Path(input), inputPath);    int iter = 1;    double desiredConvergence = 0.01;    while (true) {      //path建立时,outputpath+后面的是文件路径      Path jobOutputPath =          new Path(outputPath, String.valueOf(iter));      System.out.println("======================================");      System.out.println("=  Iteration:    " + iter);      System.out.println("=  Input path:   " + inputPath);      System.out.println("=  Output path:  " + jobOutputPath);      System.out.println("======================================");      //这里进行mapreduce      if (calcPageRank(inputPath, jobOutputPath, numNodes) <          desiredConvergence) {        System.out.println(            "Convergence is below " + desiredConvergence +                ", we're done");        break;      }      inputPath = jobOutputPath;      iter++;    }  }  //这个类的作用是把file文件的内容加上pagerank值送到targetfile里  public static int createInputFile(Path file, Path targetFile)      throws IOException {    Configuration conf = new Configuration();    FileSystem fs = file.getFileSystem(conf);    int numNodes = getNumNodes(file);    double initialPageRank = 1.0 / (double) numNodes;    //fs调用create方法根据path对象建立文件,返回该文件流    OutputStream os = fs.create(targetFile);    //file文件的流迭代器    LineIterator iter = IOUtils        .lineIterator(fs.open(file), "UTF8");    while (iter.hasNext()) {      String line = iter.nextLine();      //获取每行的内容      String[] parts = StringUtils.split(line);      //建立node对象      Node node = new Node()          .setPageRank(initialPageRank)          .setAdjacentNodeNames(              Arrays.copyOfRange(parts, 1, parts.length));      IOUtils.write(parts[0] + '\t' + node.toString() + '\n', os);    }    os.close();    return numNodes;  }  //获取节点数量,也就是获取文件的行数  public static int getNumNodes(Path file) throws IOException {    Configuration conf = new Configuration();    FileSystem fs = file.getFileSystem(conf);    return IOUtils.readLines(fs.open(file), "UTF8").size();  }  //进行mapreduce运算  public static double calcPageRank(Path inputPath, Path outputPath, int numNodes)      throws Exception {    Configuration conf = new Configuration();    conf.setInt(Reduce.CONF_NUM_NODES_GRAPH, numNodes);    Job job = Job.getInstance(conf);    job.setJarByClass(Main.class);    job.setMapperClass(Map.class);    job.setReducerClass(Reduce.class);    //输入的key和value都是文本,因此使用这个class,以第一个分隔符作为分割符号,分为key和value    job.setInputFormatClass(KeyValueTextInputFormat.class);    //map输出定义下    job.setMapOutputKeyClass(Text.class);    job.setMapOutputValueClass(Text.class);    FileInputFormat.setInputPaths(job, inputPath);    FileOutputFormat.setOutputPath(job, outputPath);    if (!job.waitForCompletion(true)) {      throw new Exception("Job failed");    }    long summedConvergence = job.getCounters().findCounter(        Reduce.Counter.CONV_DELTAS).getValue();    double convergence =        ((double) summedConvergence /            Reduce.CONVERGENCE_SCALING_FACTOR) /            (double) numNodes;    System.out.println("======================================");    System.out.println("=  Num nodes:           " + numNodes);    System.out.println("=  Summed convergence:  " + summedConvergence);    System.out.println("=  Convergence:         " + convergence);    System.out.println("======================================");    return convergence;  }}

注意:

这个是文件流操作的方法,使用 import org.apache.commons.io.IOUtils中的IOUtils类中的方法。

还有一个Arrays方法copyOfRange,可以返回数组的指定位置,返回一个数组

    OutputStream os = fs.create(targetFile);    //file文件的流迭代器    LineIterator iter = IOUtils        .lineIterator(fs.open(file), "UTF8");    while (iter.hasNext()) {      String line = iter.nextLine();      String[] parts = StringUtils.split(line);      Node node = new Node()          .setPageRank(initialPageRank)          .setAdjacentNodeNames(              Arrays.copyOfRange(parts, 1, parts.length));      IOUtils.write(parts[0] + '\t' + node.toString() + '\n', os);    }

使用readLines方法,返回的是一个String数组,每个单元里放的是每行的内容

IOUtils.readLines(fs.open(file), "UTF8").size();

TextOutPutFormat的输出的键值对可以是任何类型,输出是自动调用toString方法,把对象转为字符串输出。

使用stringUtils,截字符串为数组

String[] parts = StringUtils.splitPreserveAllTokens(        value, fieldSeparator);


0