千家信息网

nutch中如何实现索引去重

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章主要介绍nutch中如何实现索引去重,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!一、主程序调用SolrDeleteDuplicates dedup = new Sol
千家信息网最后更新 2025年02月04日nutch中如何实现索引去重

这篇文章主要介绍nutch中如何实现索引去重,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!


一、主程序调用


SolrDeleteDuplicates dedup = new SolrDeleteDuplicates();

dedup.setConf(getConf());

dedup.dedup(solrUrl);



二、job任务配置


JobConf job = new NutchJob(getConf());


job.setInputFormat(SolrInputFormat.class);

job.setMapperClass(IdentityMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(SolrRecord.class);


job.setReducerClass(SolrDeleteDuplicates.class);

job.setOutputFormat(NullOutputFormat.class);


JobClient.runJob(job);



三、Map、reduce任务的输入和输出

Map任务输入、输出

public void map(

K key, V val,

OutputCollector output


reduce任务输入、输出

输入:Text/Iterator

输出:Text/SolrRecord


public void reduce(

Text key, Iterator values,

OutputCollector output




四、job任务输入类SolrInputFormat


getSplits方法将所有的文档按照数量平均分片

getRecordReader方法中利用solrserver查询了当前分片包含的所有doc记录,solrrecord返回了的当前的RecordReader记录(RecordReader是一个全局的变量),并且有获取下一个方法。


(1)、SolrInputFormat的getSplits方法


1、根据job对象的参数,获取solrserver对象。

2、构建并执行查询(查询参数:[*:*、id、setRow(1)] ),获取响应对象

3、根据响应对象获取索引总数,除以分片数,得到每一片分配多少个索引

4、根据分片数创建 SolrInputSplit数组对象,

5、根据solr输入分片的开始和结束位置,实例化SolrInputSplit对象




public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job);


final SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY);

solrQuery.setFields(SolrConstants.ID_FIELD);

solrQuery.setRows(1);


QueryResponse response;

try {

response = solr.query(solrQuery);

} catch (final SolrServerException e) {

throw new IOException(e);

}


int numResults = (int)response.getResults().getNumFound();

int numDocsPerSplit = (numResults / numSplits);

int currentDoc = 0;

SolrInputSplit[] splits = new SolrInputSplit[numSplits];

for (int i = 0; i < numSplits - 1; i++) {

splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit);

currentDoc += numDocsPerSplit;

}

splits[splits.length - 1] = new SolrInputSplit(currentDoc, numResults - currentDoc);


return splits;

}



(2)、SolrInputFormat的getRecordReader()方法


1、获取solrserver对象

2、将传入的split参数,强转成SolrInputSplit对象,并获取这个分片的文档总数

3、构建查询对象,执行查询(参数[*:*,id,boost,tstamp,digest, SolrInputSplit中的开始位置,文档总数 ])。

4、根据响应对象,获取结果集

5、对匿名内部内RecordReader做了实现,并且返回


public RecordReader getRecordReader(final InputSplit split,

final JobConf job,

Reporter reporter)

throws IOException {


//1、获取solrserver对象

SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job);


//2、将传入的split参数,强转成SolrInputSplit对象,并获取这个分片的文档总数

SolrInputSplit solrSplit = (SolrInputSplit) split;

final int numDocs = solrSplit.getNumDocs();

//3、构建查询对象,执行查询(参数[*:*,id,boost,tstamp,digest,

SolrInputSplit中的开始位置,文档总数

])

SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY);

solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD,

SolrConstants.TIMESTAMP_FIELD,

SolrConstants.DIGEST_FIELD);

solrQuery.setStart(solrSplit.getDocBegin());

solrQuery.setRows(numDocs);


QueryResponse response;


try {

response = solr.query(solrQuery);

} catch (final SolrServerException e) {

throw new IOException(e);

}


//4、根据响应对象,获取结果集

final SolrDocumentList solrDocs = response.getResults();



return new RecordReader() {

//当前的文档

private int currentDoc = 0;


public void close() throws IOException { }


public Text createKey() {

return new Text();

}


public SolrRecord createValue() {

return new SolrRecord();

}

//获取当前的指针

public long getPos() throws IOException {

return currentDoc;

}

//获取进度

public float getProgress() throws IOException {

return currentDoc / (float) numDocs;

}

//获取下一个

public boolean next(Text key, SolrRecord value) throws IOException {

if (currentDoc >= numDocs) {

return false;

}

//

SolrDocument doc = solrDocs.get(currentDoc);


//获取摘要

String digest = (String) doc.getFieldValue(SolrConstants.DIGEST_FIELD);

//把摘要作为key

key.set(digest);

//value(SolrRecord)

//赋值:通过doc给solrrecord的id,tstamp,boost 3个字段赋值

value.readSolrDocument(doc);

//指针加自增1

currentDoc++;

return true;

}

};

}



五、map()方法和reduce()方法中的实现


(1)、map任务

(2)、reduce任务


去重逻辑:


reduce任务会遍历每一个record,并执行reduce()方法中的代码

reduce()方法中,会遍历处于当前文档之后的所有文档,如果分值和时间都比当前的小,会调用solrj删除这个文档,如果比当前的大,会删除当前的,并把当前的替换成这个大的。



public void reduce(Text key, Iterator values,

OutputCollector output, Reporter reporter)

throws IOException {

//1、下一个SolrRecord对象

SolrRecord recordToKeep = new SolrRecord(values.next());


//2、遍历了SolrRecord

while (values.hasNext()) {

//

SolrRecord solrRecord = values.next();


//boost、tstamp参与比较

//如果当前的分值, 比保持的分支高,并且时间比保持的新,就根据id删除这条索引,

if (solrRecord.getBoost() > recordToKeep.getBoost() ||

(solrRecord.getBoost() == recordToKeep.getBoost() &&

solrRecord.getTstamp() > recordToKeep.getTstamp())) {

updateRequest.deleteById(recordToKeep.id);

recordToKeep = new SolrRecord(solrRecord);

} else {

updateRequest.deleteById(solrRecord.id);

}

numDeletes++;

reporter.incrCounter("SolrDedupStatus", "Deleted documents", 1);

if (numDeletes >= NUM_MAX_DELETE_REQUEST) {

try {

LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates");

updateRequest.process(solr);

} catch (SolrServerException e) {

throw new IOException(e);

}

updateRequest = new UpdateRequest();

numDeletes = 0;

}

}

}



六、关于digest


doc中的digest字段,是在IndexerMapReduce类中的reduce方法中加入的

// add digest, used by dedup

doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));


Metadata中包含了一个HashMap

final Metadata metadata = parseData.getContentMeta();

以上是"nutch中如何实现索引去重"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

0