千家信息网

HBase 之HFileOutputFormat

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,hadoop mr 输出需要导入hbase的话最好先输出成HFile格式, 再导入到HBase,因为HFile是HBase的内部存储格式, 所以导入效率很高,下面是一个示例1. 创建HBase表t1h
千家信息网最后更新 2025年01月24日HBase 之HFileOutputFormat

hadoop mr 输出需要导入hbase的话最好先输出成HFile格式, 再导入到HBase,因为HFile是HBase的内部存储格式, 所以导入效率很高,下面是一个示例
1. 创建HBase表t1

  1. hbase(main):157:0* create 't1','f1'
  2. 0 row(s) in 1.3280 seconds
  3. hbase(main):158:0> scan 't1'
  4. ROW COLUMN+CELL
  5. 0 row(s) in 1.2770 seconds

2.写MR作业
HBaseHFileMapper.java

  1. package com.test.hfile;
  2. import java.io.IOException;
  3. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  4. import org.apache.hadoop.hbase.util.Bytes;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. public class HBaseHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> {
  9. private ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
  10. @Override
  11. protected void map(LongWritable key, Text value,
  12. org.apache.hadoop.mapreduce.Mapper.Context context)
  13. throws IOException, InterruptedException {
  14. immutableBytesWritable.set(Bytes.toBytes(key.get()));
  15. context.write(immutableBytesWritable, value);
  16. }
  17. }

HBaseHFileReducer.java

  1. package com.test.hfile;
  2. import java.io.IOException;
  3. import org.apache.hadoop.hbase.KeyValue;
  4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  5. import org.apache.hadoop.hbase.util.Bytes;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Reducer;
  8. public class HBaseHFileReducer extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
  9. protected void reduce(ImmutableBytesWritable key, Iterable<Text> values,
  10. Context context)
  11. throws IOException, InterruptedException {
  12. String value="";
  13. while(values.iterator().hasNext())
  14. {
  15. value = values.iterator().next().toString();
  16. if(value != null && !"".equals(value))
  17. {
  18. KeyValue kv = createKeyValue(value.toString());
  19. if(kv!=null)
  20. context.write(key, kv);
  21. }
  22. }
  23. }
    // str格式为
    row:family:qualifier:value 简单模拟下
  24. private KeyValue createKeyValue(String str)
  25. {
  26. String[] strstrs = str.split(":");
  27. if(strs.length<4)
  28. return null;
  29. String row=strs[0];
  30. String family=strs[1];
  31. String qualifier=strs[2];
  32. String value=strs[3];
  33. return new KeyValue(Bytes.toBytes(row),Bytes.toBytes(family),Bytes.toBytes(qualifier),System.currentTimeMillis(), Bytes.toBytes(value));
  34. }
  35. }

HbaseHFileDriver.java

  1. package com.test.hfile;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.client.HTable;
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  8. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.GenericOptionsParser;
  14. public class HbaseHFileDriver {
  15. public static void main(String[] args) throws IOException,
  16. InterruptedException, ClassNotFoundException {
  17. Configuration conf = new Configuration();
  18. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  19. Job job = new Job(conf, "testhbasehfile");
  20. job.setJarByClass(HbaseHFileDriver.class);
  21. job.setMapperClass(com.test.hfile.HBaseHFileMapper.class);
  22. job.setReducerClass(com.test.hfile.HBaseHFileReducer.class);
  23. job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  24. job.setMapOutputValueClass(Text.class);

  25. // 偷懒, 直接写死在程序里了,实际应用中不能这样, 应从命令行获取
  26. FileInputFormat.addInputPath(job, new Path("/home/yinjie/input"));
  27. FileOutputFormat.setOutputPath(job, new Path("/home/yinjie/output"));
  28. Configuration HBASE_CONFIG = new Configuration();
  29. HBASE_CONFIG.set("hbase.zookeeper.quorum", "localhost");
  30. HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
  31. HBaseConfiguration cfg = new HBaseConfiguration(HBASE_CONFIG);
  32. String tableName = "t1";
  33. HTable htable = new HTable(cfg, tableName);
  34. HFileOutputFormat.configureIncrementalLoad(job, htable);
  35. System.exit(job.waitForCompletion(true) ? 0 : 1);
  36. }
  37. }

/home/yinjie/input目录下有一个hbasedata.txt文件,内容为

  1. [root@localhost input]# cat hbasedata.txt
  2. r1:f1:c1:value1
  3. r2:f1:c2:value2
  4. r3:f1:c3:value3

将作业打包,我的到处路径为/home/yinjie/job/hbasetest.jar
提交作业到hadoop运行:

  1. [root@localhost job]# hadoop jar /home/yinjie/job/hbasetest.jar com.test.hfile.HbaseHFileDriver -libjars /home/yinjie/hbase-0.90.3/hbase-0.90.3.jar

作业运行完毕后查看下输出目录:

  1. [root@localhost input]# hadoop fs -ls /home/yinjie/output
  2. Found 2 items
  3. drwxr-xr-x - root supergroup 0 2011-08-28 21:02 /home/yinjie/output/_logs
  4. drwxr-xr-x - root supergroup 0 2011-08-28 21:03 /home/yinjie/output/f1

OK, 已经生成以列族f1命名的文件夹了。
接下去使用Bulk Load将数据导入到HBbase

  1. [root@localhost job]# hadoop jar /home/yinjie/hbase-0.90.3/hbase-0.90.3.jar completebulkload /home/yinjie/output t1

导入完毕,查询hbase表t1进行验证

  1. hbase(main):166:0> scan 't1'
  2. ROW COLUMN+CELL
  3. r1 column=f1:c1, timestamp=1314591150788, value=value1
  4. r2 column=f1:c2, timestamp=1314591150814, value=value2
  5. r3 column=f1:c3, timestamp=1314591150815, value=value3
  6. 3 row(s) in 0.0210 seconds

数据已经导入!

0