Mapreduce如何扫描hbase表建立solr索引
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,小编给大家分享一下Mapreduce如何扫描hbase表建立solr索引,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!package com.hbase.index;import j
千家信息网最后更新 2025年02月02日Mapreduce如何扫描hbase表建立solr索引
小编给大家分享一下Mapreduce如何扫描hbase表建立solr索引,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
package com.hbase.index;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class RebuildHbaseIndex { public static final Logger LOG = LoggerFactory .getLogger(RebuildHbaseIndex.class); public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); conf.setBoolean("mapred.map.tasks.speculative.execution", false); //每次读取100条数据 conf.setInt("hbase.client.scanner.caching", 100); String[] tbNames={"Suggest"}; for(int i=0;i package com.hbase.index;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.apache.solr.client.solrj.SolrServer;import org.apache.solr.client.solrj.SolrServerException;import org.apache.solr.client.solrj.impl.HttpSolrServer;import org.apache.solr.common.SolrInputDocument;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class SolrIndexerMapper extends TableMapper{ public static final Logger LOG = LoggerFactory.getLogger(SolrIndexerMapper.class); //计数器 public static enum Counters {ROWS}; //只创建一个SolrServer实例 private SolrServer solr; public String solrURL="http://192.168.1.79:8983/solr/IK_shard1_replica1"; private int commitSize; private final List docs=new ArrayList (); //任务开始调用 protected void setup(Context context){ Configuration conf=context.getConfiguration(); solr=new HttpSolrServer(solrURL); //一次性添加文档数 commitSize=conf.getInt("solr.commit.size", 1000); } @Override protected void map(ImmutableBytesWritable row, Result values,Context context)throws IOException, InterruptedException { SolrInputDocument solrDoc = new SolrInputDocument(); String rowkey=Bytes.toString(values.getRow()); String id=Bytes.toString(values.getRow()); String tableName="Suggest"; solrDoc.addField("id", id); solrDoc.addField("rowkey", rowkey); //hbase里面需要增加tableName字段 solrDoc.addField("tableName", tableName); for (KeyValue kv : values.list()) { String fieldName = Bytes.toString(kv.getQualifier()); String fieldValue = Bytes.toString(kv.getValue()); solrDoc.addField(fieldName, fieldValue); } docs.add(solrDoc); if (docs.size() >= commitSize) { try { LOG.info("添加文档:Adding " + Integer.toString(docs.size()) + " documents"); solr.add(docs); // 索引文档 } catch (final SolrServerException e) { final IOException ioe = new IOException(); ioe.initCause(e); throw ioe; } docs.clear(); } context.getCounter(Counters.ROWS).increment(1); } //任务结束时候调用 @Override protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { try { if(!docs.isEmpty()){ LOG.info("清空队列:Adding " + Integer.toString(docs.size()) + " documents"); solr.add(docs); docs.clear(); } } catch (final SolrServerException e) { final IOException ioe=new IOException(); ioe.initCause(e); throw ioe; } } public static Job createSubmittableJob(Configuration conf, String tableName) throws IOException { Job job=Job.getInstance(conf,"SolrIndex_" + tableName); job.setJarByClass(SolrIndexerMapper.class); Scan scan=new Scan(); //scan的数据不放在缓存中,一次性的 scan.setCacheBlocks(false); job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, SolrIndexerMapper.class, null, null, job); // 不需要输出,键、值类型为null job.setNumReduceTasks(0); // 无reduce任务 return job; }} 看完了这篇文章,相信你对"Mapreduce如何扫描hbase表建立solr索引"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!
索引
任务
文档
一次性
数据
篇文章
字段
完了
实例
日志
时候
更多
知识
类型
缓存
行业
计数器
资讯
资讯频道
队列
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库 安全 措施
到无锡找软件开发工作好找吗
服务器的显卡能挖矿吗
服务器找不到网卡
房贷还款逾期基础数据库
常用pc软件开发
怎么提高孩子网络安全意识
把手机变成服务器
安徽8核服务器云服务器
上海海航计算机网络技术服务系统
网络安全大会新闻
p2p 直播 服务器
软件开发梦想秀蓝图
思科网络技术学院青海
国产化软件开发成本高
学校信息网络技术
网络安全 移动 高亮
缓存服务器是什么
支持分布式软件开发
parse数据库
lcraft服务器
软件开发的实践报告
滨湖区互联网智能科技产品软件
网瘾是网络安全吗
龙岩网络安全支撑公示
网络安全宣传日主题党日活动
360 网络安全攻防实验室
网络安全书籍有哪些
成都软件开发人力外派
国家网络安全边界