千家信息网

数据挖掘-Mahout-Canopy聚类实践

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,1、原理解释(1)原始数据集合List按照一定的规则进行排序,初始距离阈值设置为T1、T2,T1>T2。(2)在List中随机挑选一个数据向量A,使用一个粗糙距离计算方式计算A与List中其它样本数据
千家信息网最后更新 2025年01月24日数据挖掘-Mahout-Canopy聚类实践

1、原理解释


(1)原始数据集合List按照一定的规则进行排序,初始距离阈值设置为T1、T2,T1>T2。


(2)在List中随机挑选一个数据向量A,使用一个粗糙距离计算方式计算A与List中其它样本数据向量之间的距离d。


(3)根据2中的距离d,把d小于T1的样本数据向量划到一个canopy中,同时把d小于T2的样本数据向量从List中移除。


(4)重复2、3,直至List为空



2、下载测试数据


cd /tmp


hadoop dfs -mkdir /input


wget http://archive.ics.uci.edu/ml/databases/synthetic_control/synthetic_control.data


hadoop dfs -copyFromLocal /tmp/synthetic_control.data /input/synthetic_control.data


3、格式转换(文本→向量)


编辑文件 Text2VectorWritable.jar


package mahout.fansy.utils.transform;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.util.ToolRunner;

import org.apache.mahout.common.AbstractJob;

import org.apache.mahout.math.RandomAccessSparseVector;

import org.apache.mahout.math.Vector;

import org.apache.mahout.math.VectorWritable;

/**

* --* transform text data to vectorWritable data

* --* @author fansy

* --*

* --*/

public class Text2VectorWritable extends AbstractJob{

public static void main(String[] args) throws Exception{

ToolRunner.run(new Configuration(), new Text2VectorWritable(),args);

}

@Override

public int run(String[] arg0) throws Exception {

addInputOption();

addOutputOption();

if (parseArguments(arg0) == null) {

return -1;

}

Path input=getInputPath();

Path output=getOutputPath();

Configuration conf=getConf();

// set job information

Job job=new Job(conf,"text2vectorWritableCopy with input:"+input.getName());

job.setOutputFormatClass(SequenceFileOutputFormat.class);

job.setMapperClass(Text2VectorWritableMapper.class);

job.setMapOutputKeyClass(LongWritable.class);

job.setMapOutputValueClass(VectorWritable.class);

job.setReducerClass(Text2VectorWritableReducer.class);

job.setOutputKeyClass(LongWritable.class);

job.setOutputValueClass(VectorWritable.class);

job.setJarByClass(Text2VectorWritable.class);

FileInputFormat.addInputPath(job, input);

SequenceFileOutputFormat.setOutputPath(job, output);

if (!job.waitForCompletion(true)) { // wait for the job is done

throw new InterruptedException("Canopy Job failed processing " + input);

}

return 0;

}

/**

* Mapper main procedure

* @author fansy

*

--*/

public static class Text2VectorWritableMapper extends Mapper{

public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{

String[] str=value.toString().split("\\s{1,}");

// split data use one or more blanker

Vector vector=new RandomAccessSparseVector(str.length);

for(int i=0;i

vector.set(i, Double.parseDouble(str[i]));

}

VectorWritable va=new VectorWritable(vector);

context.write(key, va);

}

}

/**

* Reducer: do nothing but output

* @author fansy

*

--*/

public static class Text2VectorWritableReducer extends Reducer{

public void reduce(LongWritable key,Iterable values,Context context)throws IOException,InterruptedException{

for(VectorWritable v:values){

context.write(key, v);

}

}

}

}

编译,输出ClusteringUtils.jar,并拷贝至/home/mahout/mahout_jar

输出时选择Export→Runnable Jar File→Extract required libraries into generated JAR



然后执行:

hadoop jar /home/hadoop/mahout/mahout_jar/ClusteringUtils.jar mahout.fansy.utils.transform.Text2VectorWritable -i hdfs:///input/synthetic_control.data -o hdfs:///input/synthetic_control.data.transform


有可能会遇到org/apache/mahout/common/AbstractJob找不到类报错,这个一般是由于HADOOP_CLASSPATH配置位置不包含mahout的jar的原因。


解决方法1:

拷贝mahout的jar文件到/home/hadoop/lib中去,并确认这个/home/hadoop/lib确实在HADOOP_CLASSPATH中


cp /home/hadoop/mahout/*.jar /home/hadoop/hadoop/lib


解决方法2(推荐):

在hadoop-env.sh中加入


for f in /home/hadoop/mahout/*.jar; do

if [ "$HADOOP_CLASSPATH" ]; then

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f

else

export HADOOP_CLASSPATH=$f

fi

done


记得将hadoop-evn.sh分发到其它节点


重启hadoop环境

stop-all.sh

start-all.sh


执行转换:


hadoop jar /home/hadoop/mahout/mahout_jar/ClusteringUtils.jar mahout.fansy.utils.transform.Text2VectorWritable -i hdfs:///input/synthetic_control.data -o hdfs:///input/synthetic_control.data.transform(如果在导出Jar的时候已经指派主类,这个命令会报错,使用下面的命令)

hadoop jar /home/hadoop/mahout/mahout_jar/ClusteringUtils.jar -o hdfs:///input/synthetic_control.data.transform


输出完毕的文件已经是面目全非的Vector文件了

hdfs:///input/synthetic_control.data.transform/part-r-00000


4、执行Canopy聚类

mahout canopy --input hdfs:///input/synthetic_control.data.transform/part-r-00000 --output /output/canopy --distanceMeasure org.apache.mahout.common.distance.EuclideanDistanceMeasure --t1 80 --t2 55 --t3 80 --t4 55 --clustering


5、转换格式(向量→文本)


把4中的结果转换成为文本


编辑文件ReadClusterWritable.java


package mahout.fansy.utils;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.ToolRunner;

import org.apache.mahout.clustering.iterator.ClusterWritable;

import org.apache.mahout.common.AbstractJob;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

??* read cluster centers

??* @author fansy

??*/

public class ReadClusterWritable extends AbstractJob {

public static void main(String[] args) throws Exception{

ToolRunner.run(new Configuration(), new ReadClusterWritable(),args);

}

@Override

public int run(String[] args) throws Exception {

addInputOption();

addOutputOption();

if (parseArguments(args) == null) {

return -1;

}

Job job=new Job(getConf(),getInputPath().toString());

job.setInputFormatClass(SequenceFileInputFormat.class);

job.setMapperClass(RM.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setNumReduceTasks(0);

job.setJarByClass(ReadClusterWritable.class);

FileInputFormat.addInputPath(job, getInputPath());

FileOutputFormat.setOutputPath(job, getOutputPath());

if (!job.waitForCompletion(true)) {

throw new InterruptedException("Canopy Job failed processing " + getInputPath());

}

return 0;

}

public static class RM extends Mapper{

private Logger log=LoggerFactory.getLogger(RM.class);

public void map(Text key,ClusterWritable value,Context context) throws

IOException,InterruptedException{

String str=value.getValue().getCenter().asFormatString();

// System.out.println("center****************:"+str);

log.info("center*****************************:"+str); // set log information

context.write(key, new Text(str));

}

}

}


打包到ClusteringUtils.jar,上传到/home/hadoop/mahout/mahout_jar


如果需要清除eclipse中Launch Configuration中的信息,需要进入工程所在文件夹下的/.metadata/.plugins/org.eclipse.debug.core/.launches

然后删除里面的文件


运行

hadoop jar ClusteringUtils.jar mahout.fansy.utils.ReadClusterWritable -i /output/canopy/clusters-0-final/part-r-00000 -o /output/canopy-output(如果不成功就运行下面的命令)

hadoop jar ClusteringUtils.jar -i /output/canopy/clusters-0-final/part-r-00000 -o /output/canopy-output


这时候/output/canopy-output/part-m-00000里面放置的就是聚类的结果文件


0