通用MapReduce程序复制HBase表数据
发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。原始表te
千家信息网最后更新 2025年01月22日通用MapReduce程序复制HBase表数据
编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。
原始表test1数据如下:
每个row key都有两个版本的数据,这里只显示了row key为1的数据
在hbase shell 中创建数据表:
create 'test2',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、无列导入设置、无列导出设置的数据create 'test3',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、无列导入设置、有列导出设置的数据create 'test4',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、有列导入设置、无列导出设置的数据create 'test5',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、无列导入设置、无列导出设置的数据create 'test6',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、无列导入设置、有列导出设置的数据create 'test7',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列导入设置、无列导出设置的数据create 'test8',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列导入设置、有列导出设置的数据
main函数入口:
package GeneralHBaseToHBase;import org.apache.hadoop.util.ToolRunner;public class DriverTest { public static void main(String[] args) throws Exception { // 无版本设置、无列导入设置,无列导出设置 String[] myArgs1= new String[]{ "test1", // 输入表 "test2", // 输出表 "0", // 版本大小数,如果值为0,则为默认从输入表导出最新的数据到输出表 "-1", // 列导入设置,如果为-1 ,则没有设置列导入 "-1" // 列导出设置,如果为-1,则没有设置列导出 }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs1); // 无版本设置、有列导入设置,无列导出设置 String[] myArgs2= new String[]{ "test1", "test3", "0", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs2); // 无版本设置,无列导入设置,有列导出设置 String[] myArgs3= new String[]{ "test1", "test4", "0", "-1", "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs3); // 有版本设置,无列导入设置,无列导出设置 String[] myArgs4= new String[]{ "test1", "test5", "2", "-1", "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs4); // 有版本设置、有列导入设置,无列导出设置 String[] myArgs5= new String[]{ "test1", "test6", "2", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs5); // 有版本设置、无列导入设置,有列导出设置 String[] myArgs6= new String[]{ "test1", "test7", "2", "-1", "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs6); // 有版本设置、有列导入设置,有列导出设置 String[] myArgs7= new String[]{ "test1", "test8", "2", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs7); } }
driver:
package GeneralHBaseToHBase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.util.Tool;import util.JarUtil; public class HBaseDriver extends Configured implements Tool{ public static String FROMTABLE=""; //导入表 public static String TOTABLE=""; //导出表 public static String SETVERSION=""; //是否设置版本 // args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable} @Override public int run(String[] args) throws Exception { if(args.length!=5){ System.err.println("Usage:\n demo.job.HBaseDriver" + "
mapper:
package GeneralHBaseToHBase;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.Map.Entry;import java.util.NavigableMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class HBaseToHBaseMapper extends TableMapper{ Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class); private static int versionNum = 0; private static String[] columnFromTable = null; private static String[] columnToTable = null; private static String column1 = null; private static String column2 = null; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); versionNum = Integer.parseInt(conf.get("SETVERSION", "0")); column1 = conf.get("COLUMNFROMTABLE",null); if(!(column1 == null)){ columnFromTable = column1.split(","); } column2 = conf.get("COLUMNTOTABLE",null); if(!(column2 == null)){ columnToTable = column2.split(","); } } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { context.write(key, resultToPut(key,value)); } /*** * 把key,value转换为Put * @param key * @param value * @return * @throws IOException */ private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException { HashMap fTableMap = new HashMap<>(); HashMap tTableMap = new HashMap<>(); Put put = new Put(key.get()); if(! (columnFromTable == null || columnFromTable.length == 0)){ fTableMap = getFamilyAndColumn(columnFromTable); } if(! (columnToTable == null || columnToTable.length == 0)){ tTableMap = getFamilyAndColumn(columnToTable); } if(versionNum==0){ if(fTableMap.size() == 0){ if(tTableMap.size() == 0){ for (Cell kv : value.rawCells()) { put.add(kv); // 没有设置版本,没有设置列导入,没有设置列导出 } return put; } else{ return getPut(put, value, tTableMap); // 无版本、无列导入、有列导出 } } else { if(tTableMap.size() == 0){ return getPut(put, value, fTableMap);// 无版本、有列导入、无列导出 } else { return getPut(put, value, tTableMap);// 无版本、有列导入、有列导出 } } } else{ if(fTableMap.size() == 0){ if(tTableMap.size() == 0){ return getPut1(put, value); // 有版本,无列导入,无列导出 }else{ return getPut2(put, value, tTableMap); //有版本,无列导入,有列导出 } }else{ if(tTableMap.size() == 0){ return getPut2(put,value,fTableMap);// 有版本,有列导入,无列导出 }else{ return getPut2(put,value,tTableMap); // 有版本,有列导入,有列导出 } } } } /*** * 无版本设置的情况下,对于有列导入或者列导出 * @param put * @param value * @param tableMap * @return * @throws IOException */ private Put getPut(Put put,Result value,HashMap tableMap) throws IOException{ for(Cell kv : value.rawCells()){ byte[] family = kv.getFamily(); if(tableMap.containsKey(new String(family))){ String columnStr = tableMap.get(new String(family)); ArrayList columnBy = toByte(columnStr); if(columnBy.contains(new String(kv.getQualifier()))){ put.add(kv); //没有设置版本,没有设置列导入,有设置列导出 } } } return put; } /*** * (有版本,无列导入,有列导出)或者(有版本,有列导入,无列导出) * @param put * @param value * @param tTableMap * @return */ private Put getPut2(Put put,Result value,HashMap tableMap){ NavigableMap >> map=value.getMap(); for(byte[] family:map.keySet()){ if(tableMap.containsKey(new String(family))){ String columnStr = tableMap.get(new String(family)); log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr); ArrayList columnBy = toByte(columnStr); NavigableMap > familyMap = map.get(family);//列簇作为key获取其中的列相关数据 for(byte[] column:familyMap.keySet()){ //根据列名循坏 log.info("!!!!!!!!!!!"+new String(column)); if(columnBy.contains(new String(column))){ NavigableMap valuesMap = familyMap.get(column); for(Entry s:valuesMap.entrySet()){//获取列对应的不同版本数据,默认最新的一个 System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue())); put.addColumn(family, column, s.getKey(),s.getValue()); } } } } } return put; } /*** * 有版本、无列导入、无列导出 * @param put * @param value * @return */ private Put getPut1(Put put,Result value){ NavigableMap >> map=value.getMap(); for(byte[] family:map.keySet()){ NavigableMap > familyMap = map.get(family);//列簇作为key获取其中的列相关数据 for(byte[] column:familyMap.keySet()){ //根据列名循坏 NavigableMap valuesMap = familyMap.get(column); for(Entry s:valuesMap.entrySet()){ //获取列对应的不同版本数据,默认最新的一个 put.addColumn(family, column, s.getKey(),s.getValue()); } } } return put; } // str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} /*** * 得到列簇名与列名的k,v形式的map * @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} * @return map => {"cf1" => "c1,c2,c10,c11,c14"} */ private static HashMap getFamilyAndColumn(String[] str){ HashMap map = new HashMap<>(); HashSet set = new HashSet<>(); for(String s : str){ set.add(s.split(":")[0]); } Object[] ob = set.toArray(); for(int i=0; i toByte(String s){ ArrayList b = new ArrayList<>(); String[] sarr = s.split(","); for(int i=0;i 程序运行完之后,在hbase shell中查看每个表,看是否数据导入正确:
test2:(无版本、无列导入设置、无列导出设置)
test3 (无版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)
test4(无版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))
test5(有版本、无列导入设置、无列导出设置)
test6(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)
test7(有版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))
test8(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
版本
数据
输出
输入
配置
程序
不同
类型
原始
两个
代码
任务
位置
入口
其中包括
内容
函数
分配器
大小
大部分
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
默认网关 dhcp服务器
考勤数据库已满
十万师生网络安全知识
网络软件开发合同范本专业版
郑州星云软件开发有限公司
爬虫自动抓取网页数据库
想学习网络技术加盟注意事项
数据库管理技术支持翻译
刚添加一个新用户数据库如何查看
阿里云服务器的稳定性
其他电脑如何连接本地电脑服务器
网络安全专业人才库
网络安全英文黑板报
数据库搜索引擎有哪些
数据库字段有空格是null吗
qq游戏连不上服务器
网络安全教育学校教育
网络安全大纲征文大纲
qq邮箱服务器设置
安徽专业软件开发价位
内外网打印机共享网络安全
网吧无盘服务器的长宽高
tomcat怎么配数据库
银川网络技术是什么
湖南外国语学院网络技术专业
软件开发从哪里学起跟谁学
如何在数据库中下载
计算机网络安全薪水
5g网络技术与电商发展
数据库支持json好处