千家信息网

HBase和HDFS数据互导程序

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,下面说说JAVA API 提供的这些类的功能和他们之间有什么样的联系。1.HBaseConfiguration关系:org.apache.hadoop.hbase.HBaseConfiguration
千家信息网最后更新 2025年01月24日HBase和HDFS数据互导程序




下面说说JAVA API 提供的这些类的功能和他们之间有什么样的联系。


1.HBaseConfiguration

关系:org.apache.hadoop.hbase.HBaseConfiguration

作用:通过此类可以对HBase进行配置

用法实例: Configuration config = HBaseConfiguration.create();

说明: HBaseConfiguration.create() 默认会从classpath 中查找 hbase-site.xml 中的配置信息,初始化 Configuration

2.HBaseAdmin 类

关系:org.apache.hadoop.hbase.client.HBaseAdmin

作用:提供接口关系HBase 数据库中的表信息

用法:HBaseAdmin admin = new HBaseAdmin(config);

3.Descriptor类

关系:org.apache.hadoop.hbase.HTableDescriptor

作用:HTableDescriptor 类包含了表的名字以及表的列族信息

用法:HTableDescriptor htd =new HTableDescriptor(tablename);

构造一个表描述符指定TableName对象。

Htd.addFamily(new HColumnDescriptor("myFamily"));

将列家族给定的描述符

4.HTable

关系:org.apache.hadoop.hbase.client.HTable

作用:HTable HBase 的表通信

用法:HTable tab = new HTable(config,Bytes.toBytes(tablename));

ResultScanner sc = tab.getScanner(Bytes.toBytes("familyName"));

说明:获取表内列族 familyNme 的所有数据。

5.Put

关系:org.apache.hadoop.hbase.client.Put

作用:获取单个行的数据

用法:HTable table = new HTable(config,Bytes.toBytes(tablename));

Put put = new Put(row);

p.add(family,qualifier,value);

说明:向表 tablename 添加 "family,qualifier,value"指定的值。

6.Get

关系:org.apache.hadoop.hbase.client.Get

作用:获取单个行的数据

用法:HTable table = new HTable(config,Bytes.toBytes(tablename));

Get get = new Get(Bytes.toBytes(row));

Result result = table.get(get);

说明:获取 tablename 表中 row 行的对应数据

7.ResultScanner

关系:Interface

作用:获取值的接口

用法:ResultScanner scanner = table.getScanner(Bytes.toBytes(family));

For(Result rowResult : scanner){

Bytes[] str = rowResult.getValue(family,column);

}

说明:循环获取行中列值。



例1 HBase之读取HDFS数据写入HBase

package org.hadoop.hbase;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.client.HBaseAdmin;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class WordCountHbaseWriter { public static class WordCountHbaseMapper extends   Mapper {  private final static IntWritable one = new IntWritable(1);  private Text word = new Text();  public void map(Object key, Text value, Context context)    throws IOException, InterruptedException {   StringTokenizer itr = new StringTokenizer(value.toString());   while (itr.hasMoreTokens()) {    word.set(itr.nextToken());    context.write(word, one);// 输出   }  } } public static class WordCountHbaseReducer extends   TableReducer {  public void reduce(Text key, Iterable values,    Context context) throws IOException, InterruptedException {   int sum = 0;   for (IntWritableval : values) {// 遍历求和    sum += val.get();   }   Put put = new Put(key.getBytes());//put实例化,每一个词存一行   //列族为content,列修饰符为count,列值为数目   put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));   context.write(new ImmutableBytesWritable(key.getBytes()), put);// 输出求和后的  } }  public static void main(String[] args){  String tablename = "wordcount";  Configuration conf = HBaseConfiguration.create();    conf.set("hbase.zookeeper.quorum", "192.168.1.139");    conf.set("hbase.zookeeper.property.clientPort", "2191");  HBaseAdmin admin = null;  try {   admin = new HBaseAdmin(conf);   if(admin.tableExists(tablename)){    System.out.println("table exists!recreating.......");    admin.disableTable(tablename);    admin.deleteTable(tablename);   }   HTableDescriptor htd = new HTableDescriptor(tablename);   HColumnDescriptor tcd = new HColumnDescriptor("content");   htd.addFamily(tcd);//创建列族   admin.createTable(htd);//创建表   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();      if (otherArgs.length != 1) {        System.err.println("Usage: WordCountHbaseWriter ");        System.exit(2);      }      Job job = new Job(conf, "WordCountHbaseWriter");  job.setNumReduceTasks(2);      job.setJarByClass(WordCountHbaseWriter.class);   //使用WordCountHbaseMapper类完成Map过程;      job.setMapperClass(WordCountHbaseMapper.class);      TableMapReduceUtil.initTableReducerJob(tablename, WordCountHbaseReducer.class, job);      //设置任务数据的输入路径;      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));   //设置了Map过程的输出类型,其中设置key的输出类型为Text;      job.setOutputKeyClass(Text.class);   //设置了Map过程的输出类型,其中设置value的输出类型为IntWritable;      job.setOutputValueClass(IntWritable.class);   //调用job.waitForCompletion(true) 执行任务,执行成功后退出;      System.exit(job.waitForCompletion(true) ? 0 : 1);  } catch (Exception e) {   e.printStackTrace();  } finally{   if(admin!=null)    try {     admin.close();    } catch (IOException e) {     e.printStackTrace();    }  }   }}


例2 HBase之读取HBase数据写入HDFS

package org.hadoop.hbase;import java.io.IOException;import java.util.Map.Entry;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class WordCountHbaseReader {  public static class WordCountHbaseReaderMapper extends     TableMapper{    @Override    protected void map(ImmutableBytesWritable key,Result value,Context context)            throws IOException, InterruptedException {        StringBuffer sb = new StringBuffer("");        for(Entry entry:value.getFamilyMap("content".getBytes()).entrySet()){            String str =  new String(entry.getValue());            //将字节数组转换为String类型            if(str != null){                sb.append(new String(entry.getKey()));                sb.append(":");                sb.append(str);            }            context.write(new Text(key.get()), new Text(new String(sb)));        }    }} public static class WordCountHbaseReaderReduce extends Reducer{     private Text result = new Text();     @Override     protected void reduce(Text key, Iterable values,Context context)             throws IOException, InterruptedException {         for(Text val:values){             result.set(val);             context.write(key, result);         }     } }  public static void main(String[] args) throws Exception {     String tablename = "wordcount";     Configuration conf = HBaseConfiguration.create();     conf.set("hbase.zookeeper.quorum", "192.168.1.139");     conf.set("hbase.zookeeper.property.clientPort", "2191");          String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();     if (otherArgs.length != 1) {       System.err.println("Usage: WordCountHbaseReader ");       System.exit(2);     }     Job job = new Job(conf, "WordCountHbaseReader");     job.setJarByClass(WordCountHbaseReader.class);     //设置任务数据的输出路径;     FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));     job.setReducerClass(WordCountHbaseReaderReduce.class);     Scan scan = new Scan();     TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);     //调用job.waitForCompletion(true) 执行任务,执行成功后退出;     System.exit(job.waitForCompletion(true) ? 0 : 1); }}


程序中用到hadoop的相关JAR包(如下图)及hbase所有jar包

如果上面的API还不能满足你的要求,可以到下面这个网站里面Hbase全部API介绍

http://www.yiibai.com/hbase/


0