使用BulkLoad从HDFS批量导入数据到HBase
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图。数据发出后首先写入到雨鞋日志
千家信息网最后更新 2025年02月05日使用BulkLoad从HDFS批量导入数据到HBase
在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图。
数据发出后首先写入到雨鞋日志WAl中,写入到预写日志中之后,随后写入到内存MemStore中,最后在Flush到Hfile中。这样写数据的方式不会导致数据的丢失,并且道正数据的有序性,但是当遇到大量的数据写入时,写入的速度就难以保证。所以,介绍一种性能更高的写入方式BulkLoad。
使用BulkLoad批量写入数据主要分为两部分:
一、使用HFileOutputFormat2通过自己编写的MapReduce作业将HFile写入到HDFS目录,由于写入到HBase中的数据是按照顺序排序的,HFileOutputFormat2中的configureIncrementalLoad()可以完成所需的配置。
二、将Hfile从HDFS移动到HBase表中,大致过程如图
实例代码pom依赖:
org.apache.hbase hbase-server 1.4.0 org.apache.hadoop hadoop-client 2.6.4 org.apache.hbase hbase-client 0.99.2
package com.yangshou;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class BulkLoadMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取文件中的每一条数据,以序号作为行键 String line = value.toString(); //将数据进行切分 //切分后数组中的元素分别为:序号,用户id,商品id,用户行为,商品分类,时间,地址 String[] str = line.split(" "); String id = str[0]; String user_id = str[1]; String item_id = str[2]; String behavior = str[3]; String item_type = str[4]; String time = str[5]; String address = "156"; //拼接rowkey和put ImmutableBytesWritable rowkry = new ImmutableBytesWritable(id.getBytes()); Put put = new Put(id.getBytes()); put.add("info".getBytes(),"user_id".getBytes(),user_id.getBytes()); put.add("info".getBytes(),"item_id".getBytes(),item_id.getBytes()); put.add("info".getBytes(),"behavior".getBytes(),behavior.getBytes()); put.add("info".getBytes(),"item_type".getBytes(),item_type.getBytes()); put.add("info".getBytes(),"time".getBytes(),time.getBytes()); put.add("info".getBytes(),"address".getBytes(),address.getBytes()); //将数据写出 context.write(rowkry,put); }}
package com.yangshou;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class BulkLoadDriver { public static void main(String[] args) throws Exception { //获取Hbase配置 Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("BulkLoadDemo")); Admin admin = conn.getAdmin(); //设置job Job job = Job.getInstance(conf,"BulkLoad"); job.setJarByClass(BulkLoadDriver.class); job.setMapperClass(BulkLoadMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //设置文件的输入输出路径 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat2.class); FileInputFormat.setInputPaths(job,new Path("hdfs://hadoopalone:9000/tmp/000000_0")); FileOutputFormat.setOutputPath(job,new Path("hdfs://hadoopalone:9000/demo1")); //将数据加载到Hbase表中 HFileOutputFormat2.configureIncrementalLoad(job,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo"))); if(job.waitForCompletion(true)){ LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); load.doBulkLoad(new Path("hdfs://hadoopalone:9000/demo1"),admin,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo"))); } }}
实例数据
44979 100640791 134060896 1 5271 2014-12-09 天津市44980 100640791 96243605 1 13729 2014-12-02 新疆
在Hbase shell 中创建表
create 'BulkLoadDemo','info'
打包后执行
```hadoop jar BulkLoadDemo-1.0-SNAPSHOT.jar com.yangshou.BulkLoadDriver
注意:在执行hadoop jar之前应该先将Hbase中的相关包加载过来
export HADOOP_CLASSPATH=$HBASE_HOME/lib/*
数据
方式
商品
实例
序号
文件
日志
用户
如图
配置
有序
代码
元素
内存
地址
常见
性能
数据库
数组
方法
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
全市网络安全员培训班
智能城市网络安全
数据库系统中包含用户吗
莱西安卓软件开发系统
一台电脑做内部服务器
数据库网上购物数据库设计
福建精英网络技术咨询口碑推荐
如何践行网络安全的号召
网络技术软件包
moxa串口服务器说明书 端口
肖新光 网络安全小组
网络安全活动周领导小组
软件开发自学步骤视频
亚马逊和腾讯云服务器区别
软件开发中的框架指的是
远程控制服务器
联想服务器远程管理卡重启
sap数据库管理
数据库中表的删除有顺序吗
阴阳师账号忘记在哪个服务器
开展《网络安全法》
腾讯云服务器200m共享
装u8和数据库冲突吗
网络技术网上培训班
武汉互联网金融科技公司
数据库assertion
服务器配线架制作
饥荒进洞穴服务器无应答
网络安全活动周领导小组
信息网络安全杂志 阜成路