Mapreduce如何扫描hbase表建立solr索引
发表于:2024-09-25 作者:千家信息网编辑
千家信息网最后更新 2024年09月25日,小编给大家分享一下Mapreduce如何扫描hbase表建立solr索引,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!package com.hbase.index;import j
千家信息网最后更新 2024年09月25日Mapreduce如何扫描hbase表建立solr索引
小编给大家分享一下Mapreduce如何扫描hbase表建立solr索引,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
package com.hbase.index;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class RebuildHbaseIndex { public static final Logger LOG = LoggerFactory .getLogger(RebuildHbaseIndex.class); public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); conf.setBoolean("mapred.map.tasks.speculative.execution", false); //每次读取100条数据 conf.setInt("hbase.client.scanner.caching", 100); String[] tbNames={"Suggest"}; for(int i=0;i package com.hbase.index;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.apache.solr.client.solrj.SolrServer;import org.apache.solr.client.solrj.SolrServerException;import org.apache.solr.client.solrj.impl.HttpSolrServer;import org.apache.solr.common.SolrInputDocument;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class SolrIndexerMapper extends TableMapper{ public static final Logger LOG = LoggerFactory.getLogger(SolrIndexerMapper.class); //计数器 public static enum Counters {ROWS}; //只创建一个SolrServer实例 private SolrServer solr; public String solrURL="http://192.168.1.79:8983/solr/IK_shard1_replica1"; private int commitSize; private final List docs=new ArrayList (); //任务开始调用 protected void setup(Context context){ Configuration conf=context.getConfiguration(); solr=new HttpSolrServer(solrURL); //一次性添加文档数 commitSize=conf.getInt("solr.commit.size", 1000); } @Override protected void map(ImmutableBytesWritable row, Result values,Context context)throws IOException, InterruptedException { SolrInputDocument solrDoc = new SolrInputDocument(); String rowkey=Bytes.toString(values.getRow()); String id=Bytes.toString(values.getRow()); String tableName="Suggest"; solrDoc.addField("id", id); solrDoc.addField("rowkey", rowkey); //hbase里面需要增加tableName字段 solrDoc.addField("tableName", tableName); for (KeyValue kv : values.list()) { String fieldName = Bytes.toString(kv.getQualifier()); String fieldValue = Bytes.toString(kv.getValue()); solrDoc.addField(fieldName, fieldValue); } docs.add(solrDoc); if (docs.size() >= commitSize) { try { LOG.info("添加文档:Adding " + Integer.toString(docs.size()) + " documents"); solr.add(docs); // 索引文档 } catch (final SolrServerException e) { final IOException ioe = new IOException(); ioe.initCause(e); throw ioe; } docs.clear(); } context.getCounter(Counters.ROWS).increment(1); } //任务结束时候调用 @Override protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { try { if(!docs.isEmpty()){ LOG.info("清空队列:Adding " + Integer.toString(docs.size()) + " documents"); solr.add(docs); docs.clear(); } } catch (final SolrServerException e) { final IOException ioe=new IOException(); ioe.initCause(e); throw ioe; } } public static Job createSubmittableJob(Configuration conf, String tableName) throws IOException { Job job=Job.getInstance(conf,"SolrIndex_" + tableName); job.setJarByClass(SolrIndexerMapper.class); Scan scan=new Scan(); //scan的数据不放在缓存中,一次性的 scan.setCacheBlocks(false); job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, SolrIndexerMapper.class, null, null, job); // 不需要输出,键、值类型为null job.setNumReduceTasks(0); // 无reduce任务 return job; }} 看完了这篇文章,相信你对"Mapreduce如何扫描hbase表建立solr索引"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!
索引
任务
文档
一次性
数据
篇文章
字段
完了
实例
日志
时候
更多
知识
类型
缓存
行业
计数器
资讯
资讯频道
队列
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
db2数据库主备切换
手机总无服务器
小学护苗网络安全课简报
盛世网络技术
浙江物优网络技术
南京参考软件开发概况
《数据库安全》
网络安全攻击量化工具
广电网络技术改造项目申报
服务器卡顿什么意思
数据库调用内存的
vb提示使用新的数据库
326汽车网络技术试卷
不用经过服务器的xss
计算机应用跟网络技术哪个好
以数据库思维理解区块链
宁波打标机软件开发
手游诛仙服务器
excel二列数据库
服务器自己可以远程管理吗
语音聊天软件开发哪家比较专业
science 数据库
是目前较新的服务器操作系统
实体独立服务器图片
北京数字认证服务器默认地址多少
软件开发 版权 著作权
数据库系统的开发案例
网络安全设计原则与依据
电子时代网络技术的好处
存续机电网络技术有限公司