千家信息网

Mapreduce如何扫描hbase表建立solr索引

发表于:2024-09-25 作者:千家信息网编辑
千家信息网最后更新 2024年09月25日,小编给大家分享一下Mapreduce如何扫描hbase表建立solr索引,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!package com.hbase.index;import j
千家信息网最后更新 2024年09月25日Mapreduce如何扫描hbase表建立solr索引

小编给大家分享一下Mapreduce如何扫描hbase表建立solr索引,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

package com.hbase.index;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class RebuildHbaseIndex {    public static final Logger LOG = LoggerFactory            .getLogger(RebuildHbaseIndex.class);    public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {                Configuration conf = HBaseConfiguration.create();        conf.setBoolean("mapred.map.tasks.speculative.execution", false);        //每次读取100条数据        conf.setInt("hbase.client.scanner.caching", 100);        String[] tbNames={"Suggest"};        for(int i=0;i
package com.hbase.index;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.apache.solr.client.solrj.SolrServer;import org.apache.solr.client.solrj.SolrServerException;import org.apache.solr.client.solrj.impl.HttpSolrServer;import org.apache.solr.common.SolrInputDocument;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class SolrIndexerMapper extends TableMapper {        public static final Logger LOG = LoggerFactory.getLogger(SolrIndexerMapper.class);    //计数器    public static enum Counters {ROWS};     //只创建一个SolrServer实例    private SolrServer solr;    public String solrURL="http://192.168.1.79:8983/solr/IK_shard1_replica1";    private int commitSize;    private final List docs=new ArrayList();        //任务开始调用    protected void setup(Context context){        Configuration conf=context.getConfiguration();        solr=new HttpSolrServer(solrURL);        //一次性添加文档数        commitSize=conf.getInt("solr.commit.size", 1000);    }    @Override    protected void map(ImmutableBytesWritable row, Result values,Context context)throws IOException, InterruptedException {        SolrInputDocument solrDoc = new SolrInputDocument();        String rowkey=Bytes.toString(values.getRow());        String id=Bytes.toString(values.getRow());        String tableName="Suggest";                solrDoc.addField("id", id);        solrDoc.addField("rowkey", rowkey);        //hbase里面需要增加tableName字段        solrDoc.addField("tableName", tableName);                     for (KeyValue kv : values.list()) {            String fieldName = Bytes.toString(kv.getQualifier());            String fieldValue = Bytes.toString(kv.getValue());            solrDoc.addField(fieldName, fieldValue);        }                    docs.add(solrDoc);        if (docs.size() >= commitSize) {            try {                LOG.info("添加文档:Adding " + Integer.toString(docs.size()) + " documents");                solr.add(docs); // 索引文档            } catch (final SolrServerException e) {                final IOException ioe = new IOException();                ioe.initCause(e);                throw ioe;            }            docs.clear();        }        context.getCounter(Counters.ROWS).increment(1);    }        //任务结束时候调用    @Override    protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)            throws IOException, InterruptedException {        try {            if(!docs.isEmpty()){                LOG.info("清空队列:Adding " + Integer.toString(docs.size()) + " documents");                solr.add(docs);                docs.clear();            }                    } catch (final SolrServerException e) {            final IOException ioe=new IOException();            ioe.initCause(e);            throw ioe;        }    }    public static Job createSubmittableJob(Configuration conf, String tableName) throws IOException {        Job job=Job.getInstance(conf,"SolrIndex_" + tableName);        job.setJarByClass(SolrIndexerMapper.class);        Scan scan=new Scan();        //scan的数据不放在缓存中,一次性的        scan.setCacheBlocks(false);        job.setOutputFormatClass(NullOutputFormat.class);        TableMapReduceUtil.initTableMapperJob(tableName, scan,                SolrIndexerMapper.class, null, null, job); // 不需要输出,键、值类型为null        job.setNumReduceTasks(0); // 无reduce任务        return job;            }}

看完了这篇文章,相信你对"Mapreduce如何扫描hbase表建立solr索引"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

0