使用BulkLoad从HDFS批量导入数据到HBase
发表于:2024-11-19 作者:千家信息网编辑
千家信息网最后更新 2024年11月19日,在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图。数据发出后首先写入到雨鞋日志
千家信息网最后更新 2024年11月19日使用BulkLoad从HDFS批量导入数据到HBase
在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图。
数据发出后首先写入到雨鞋日志WAl中,写入到预写日志中之后,随后写入到内存MemStore中,最后在Flush到Hfile中。这样写数据的方式不会导致数据的丢失,并且道正数据的有序性,但是当遇到大量的数据写入时,写入的速度就难以保证。所以,介绍一种性能更高的写入方式BulkLoad。
使用BulkLoad批量写入数据主要分为两部分:
一、使用HFileOutputFormat2通过自己编写的MapReduce作业将HFile写入到HDFS目录,由于写入到HBase中的数据是按照顺序排序的,HFileOutputFormat2中的configureIncrementalLoad()可以完成所需的配置。
二、将Hfile从HDFS移动到HBase表中,大致过程如图
实例代码pom依赖:
org.apache.hbase hbase-server 1.4.0 org.apache.hadoop hadoop-client 2.6.4 org.apache.hbase hbase-client 0.99.2
package com.yangshou;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class BulkLoadMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取文件中的每一条数据,以序号作为行键 String line = value.toString(); //将数据进行切分 //切分后数组中的元素分别为:序号,用户id,商品id,用户行为,商品分类,时间,地址 String[] str = line.split(" "); String id = str[0]; String user_id = str[1]; String item_id = str[2]; String behavior = str[3]; String item_type = str[4]; String time = str[5]; String address = "156"; //拼接rowkey和put ImmutableBytesWritable rowkry = new ImmutableBytesWritable(id.getBytes()); Put put = new Put(id.getBytes()); put.add("info".getBytes(),"user_id".getBytes(),user_id.getBytes()); put.add("info".getBytes(),"item_id".getBytes(),item_id.getBytes()); put.add("info".getBytes(),"behavior".getBytes(),behavior.getBytes()); put.add("info".getBytes(),"item_type".getBytes(),item_type.getBytes()); put.add("info".getBytes(),"time".getBytes(),time.getBytes()); put.add("info".getBytes(),"address".getBytes(),address.getBytes()); //将数据写出 context.write(rowkry,put); }}
package com.yangshou;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class BulkLoadDriver { public static void main(String[] args) throws Exception { //获取Hbase配置 Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("BulkLoadDemo")); Admin admin = conn.getAdmin(); //设置job Job job = Job.getInstance(conf,"BulkLoad"); job.setJarByClass(BulkLoadDriver.class); job.setMapperClass(BulkLoadMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //设置文件的输入输出路径 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat2.class); FileInputFormat.setInputPaths(job,new Path("hdfs://hadoopalone:9000/tmp/000000_0")); FileOutputFormat.setOutputPath(job,new Path("hdfs://hadoopalone:9000/demo1")); //将数据加载到Hbase表中 HFileOutputFormat2.configureIncrementalLoad(job,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo"))); if(job.waitForCompletion(true)){ LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); load.doBulkLoad(new Path("hdfs://hadoopalone:9000/demo1"),admin,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo"))); } }}
实例数据
44979 100640791 134060896 1 5271 2014-12-09 天津市44980 100640791 96243605 1 13729 2014-12-02 新疆
在Hbase shell 中创建表
create 'BulkLoadDemo','info'
打包后执行
```hadoop jar BulkLoadDemo-1.0-SNAPSHOT.jar com.yangshou.BulkLoadDriver
注意:在执行hadoop jar之前应该先将Hbase中的相关包加载过来
export HADOOP_CLASSPATH=$HBASE_HOME/lib/*
数据
方式
商品
实例
序号
文件
日志
用户
如图
配置
有序
代码
元素
内存
地址
常见
性能
数据库
数组
方法
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器e5支持win7吗
1月服务器
洮北分局网络安全宣传
数据库 灾备技术方案
上海乾创软件开发有限公司
银行数据库软件开发
数据库每个表需要一个主码吗
数据库mysql调优
rsc数据库图片版权申请
银川地名数据库
怎样做一个优秀的网络安全员
网络技术模拟模式
上海橙聚网络技术有限公司
cft网络安全大赛
无锡dell服务器维修中心电话
网络安全防止沉迷手抄报
创建数据库性别
镇海嵌入式软件开发系统
软件开发 重复代码
软通软件开发
马鞍山民宿软件开发
负责网络安全机构
惠州尚城软件开发方案
广州软件开发哪家便宜
网络安全预防电信诈骗手抄报
更改数据库属于sql的语言
台湾网络安全厂商排名
拉流推流服务器设计
网络安全课程设计 百度文库
军队网络安全工作规定