千家信息网

HBase基础操作,包括表的增删改查过滤等

发表于:2024-09-27 作者:千家信息网编辑
千家信息网最后更新 2024年09月27日,package com.snglw.basic;import java.io.IOException;import java.util.ArrayList;import java.util.List;
千家信息网最后更新 2024年09月27日HBase基础操作,包括表的增删改查过滤等
package com.snglw.basic;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Delete;import org.apache.hadoop.hbase.client.Get;import org.apache.hadoop.hbase.client.HBaseAdmin;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.ResultScanner;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;import org.apache.hadoop.hbase.filter.FilterList;import org.apache.hadoop.hbase.filter.FilterList.Operator;import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;import org.apache.hadoop.hbase.util.Bytes;import com.snglw.util.ConnectInit;public class BasicOperator {        ConnectInit ci = new ConnectInit();        Configuration conf = ci.getConfiguration();                        /*建表与删表*/        public void createTable(){                String tableName = "user";                HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));                HColumnDescriptor hcd = new HColumnDescriptor("info");                htd.addFamily(hcd);                                HBaseAdmin admin = null;                try        {            admin = new HBaseAdmin(conf);            if (admin.tableExists(tableName)){                admin.disableTable(tableName);                admin.deleteTable(tableName);            }else{                    admin.createTable(htd);                    System.out.println("Table Created!");            }                }catch(IOException e){                        e.printStackTrace();                }                finally{                        if(admin != null){                                try{                                        admin.close();                                }catch(IOException e){                                        e.printStackTrace();                                }                        }                }        }                        /*修改表(注意:修改表或者列时,只有表的Enabled属性为false时才能生效)*/        public void modifyTable(){                //指定表名                String tableName = "user";                //指定列族名                byte[] familyName = Bytes.toBytes("education");                                HBaseAdmin admin = null;                try{                        //实例化HBaseAdmin对象                        admin = new HBaseAdmin(conf);                        //获取表描述信息对象                        HTableDescriptor htd = admin.getTableDescriptor(Bytes.toBytes(tableName));                        //修改前,判断表是否有指定列族                        if(!htd.hasFamily(familyName)){                                //创建列描述信息                                HColumnDescriptor hcd = new HColumnDescriptor(familyName);                                htd.addFamily(hcd);                                                                //修改表前,需要disable表,使其下线                                admin.disableTable(tableName);                                //提交modifyTable请求                                admin.modifyTable(tableName, htd);                                //修改完成之后,使表上线                                admin.enableTable(tableName);                        }                }catch(IOException e){                        e.printStackTrace();                }finally{                        if(admin != null){                                try {                                        admin.close();                                } catch (IOException e) {                                        e.printStackTrace();                                }                        }                }        }                        /*插入数据*/        public void put(){                //指定表名                String tableName = "user";                //指定列族名                byte[] familyName = Bytes.toBytes("info");                //指定列名                byte[][] qualifiers = {Bytes.toBytes("name"),Bytes.toBytes("gender"),                                                           Bytes.toBytes("age"),Bytes.toBytes("address")};                HTable table = null;                try{                        //实例化一个HTable对象                        table = new HTable(conf,tableName);                        List puts = new ArrayList();                        //实例化一个Put对象                        Put put = new Put(Bytes.toBytes("012005000201"));                        put.addImmutable(familyName, qualifiers[0],Bytes.toBytes("张三"));                        put.addImmutable(familyName, qualifiers[1],Bytes.toBytes("男"));                        put.addImmutable(familyName, qualifiers[2],Bytes.toBytes(new Long(19)));                        put.addImmutable(familyName, qualifiers[3],Bytes.toBytes("广东省深圳市"));                        puts.add(put);                                                put = new Put(Bytes.toBytes("012005000202"));                        put.addImmutable(familyName, qualifiers[0],Bytes.toBytes("李"));                        put.addImmutable(familyName, qualifiers[1],Bytes.toBytes("女"));                        put.addImmutable(familyName, qualifiers[2],Bytes.toBytes(new Long(23)));                        put.addImmutable(familyName, qualifiers[3],Bytes.toBytes("山西省大同市"));                        puts.add(put);                                                put = new Put(Bytes.toBytes("012005000203"));                        put.addImmutable(familyName, qualifiers[0],Bytes.toBytes("王"));                        put.addImmutable(familyName, qualifiers[1],Bytes.toBytes("男"));                        put.addImmutable(familyName, qualifiers[2],Bytes.toBytes(new Long(26)));                        put.addImmutable(familyName, qualifiers[3],Bytes.toBytes("浙江省宁波市"));                        puts.add(put);                                                //提交put数据请求                        table.put(puts);                }catch(IOException e){                        e.printStackTrace();                }finally{                        if(table != null){                                try{                                        //关闭HTable对象                                        table.close();                                }catch(IOException e){                                        e.printStackTrace();                                }                        }                }        }                        /*删除数据*/        public void delete(){                String tableName = "user";                //指定rowKey值,即编号为012005000201                byte[] rowKey = Bytes.toBytes("012005000201");                                HTable table = null;                try{                        table = new HTable(conf,tableName);                        Delete delete = new Delete(rowKey);                        //提交一次delete数据请求                        table.delete(delete);                }catch(IOException e){                        e.printStackTrace();                }finally{                        if(table != null){                                try {                                        table.close();                                } catch (IOException e) {                                        e.printStackTrace();                                }                        }                }        }                        /*使用Get读取数据*/        public void get(){                String tableName = "user";                //指定列族名                byte[] familyName = Bytes.toBytes("info");                //指定列名                byte[][] qualifier = {Bytes.toBytes("name"),Bytes.toBytes("address")};                //指定rowKey值                byte[] rowKey = Bytes.toBytes("012005000202");                                HTable table = null;                try{                        table = new HTable(conf,tableName);                        //实例化get对象                        Get get = new Get(rowKey);                        //设置列族和列名                        get.addColumn(familyName,qualifier[0]);                        get.addColumn(familyName,qualifier[1]);                        //提交请求                        Result result = table.get(get);                        for(Cell cell:result.rawCells()){                                System.out.println(Bytes.toString(CellUtil.cloneRow(cell))                                                + ":" + Bytes.toString(CellUtil.cloneFamily(cell))                                                + ":" + Bytes.toString(CellUtil.cloneQualifier(cell))                                                + ":" + Bytes.toString(CellUtil.cloneValue(cell)));                        }                }catch(IOException e){                        e.printStackTrace();                }finally{                        if(table != null){                                try {                                        table.close();                                } catch (IOException e) {                                        e.printStackTrace();                                }                        }                }        }                        /*使用scan读取数据*/        public void scan(){                String tableName = "webPage";                HTable table = null;                try{                        table = new HTable(conf,tableName);                        Scan scan = new Scan();                        scan.addColumn(Bytes.toBytes("webPageInfo"),Bytes.toBytes("doc"));                        //设置缓存大小                        scan.setCaching(5000);                        scan.setBatch(2);                        //实例化一个ResultScanner对象                        ResultScanner rScanner = null;                        //提交请求                        rScanner = table.getScanner(scan);                        for(Result r = rScanner.next();r != null;r = rScanner.next()){                                for(Cell cell:r.rawCells()){                                        System.out.println(Bytes.toString(CellUtil.cloneRow(cell))                                                        + ":" + Bytes.toString(CellUtil.cloneFamily(cell))                                                        + ":" + Bytes.toString(CellUtil.cloneQualifier(cell))                                                        + ":" + Bytes.toString(CellUtil.cloneValue(cell)));                                }                        }                }catch(IOException e){                        e.printStackTrace();                }finally{                        if(table != null){                                try {                                        table.close();                                } catch (IOException e) {                                        e.printStackTrace();                                }                        }                }        }                        /*使用过滤器*/        public void singleColumnValueFilter(){                String tableName = "user";                HTable table = null;                try{                        table = new HTable(conf,tableName);                        //实例化一个Scan对象                        Scan scan = new Scan();                        scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"));                        //设置过滤条件                        SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("name"),                                                                CompareOp.EQUAL,Bytes.toBytes("王"));                        scan.setFilter(filter);                        //实例化ResultScanner对象                        ResultScanner rScanner = null;                        rScanner = table.getScanner(scan);                        for(Result r = rScanner.next();r != null;r = rScanner.next()){                                for(Cell cell:r.rawCells()){                                        System.out.println(Bytes.toString(CellUtil.cloneRow(cell))                                                        + ":" + Bytes.toString(CellUtil.cloneFamily(cell))                                                        + ":" + Bytes.toString(CellUtil.cloneQualifier(cell))                                                        + ":" + Bytes.toString(CellUtil.cloneValue(cell)));                                }                        }                }catch(IOException e){                        e.printStackTrace();                }finally{                        if(table != null){                                try {                                        table.close();                                } catch (IOException e) {                                        e.printStackTrace();                                }                        }                }        }                        /*使用FilterList过滤器*/        public void filterList(){                String tableName = "user";                HTable table = null;                try{                        table = new HTable(conf,tableName);                        //实例化一个Scan对象                        Scan scan = new Scan();                        scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"));                        //实例化FilterList对象,里各个filter的是"and"关系                        FilterList list = new FilterList(Operator.MUST_PASS_ALL);                        //设置过滤条件(age>20的数据)                        list.addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("age"),                                                                CompareOp.GREATER_OR_EQUAL,Bytes.toBytes(new Long(20))));                        //获取age<=29的数据                        list.addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("age"),                                        CompareOp.GREATER_OR_EQUAL,Bytes.toBytes(new Long(29))));                        scan.setFilter(list);                        //实例化ResultScanner对象                        ResultScanner rScanner = null;                        rScanner = table.getScanner(scan);                        for(Result r = rScanner.next();r != null;r = rScanner.next()){                                for(Cell cell:r.rawCells()){                                        System.out.println(Bytes.toString(CellUtil.cloneRow(cell))                                                        + ":" + Bytes.toString(CellUtil.cloneFamily(cell))                                                        + ":" + Bytes.toString(CellUtil.cloneQualifier(cell))                                                        + ":" + Bytes.toString(CellUtil.cloneValue(cell)));                                }                        }                }catch(IOException e){                        e.printStackTrace();                }finally{                        if(table != null){                                try {                                        table.close();                                } catch (IOException e) {                                        e.printStackTrace();                                }                        }                }        }                        /*聚合函数Aggregate*/        public void aggregate(){                //指定表名                byte[] tableName = Bytes.toBytes("user");                //指定列族名                byte[] family = Bytes.toBytes("info");                                AggregationClient aggregationClient = new AggregationClient(conf);                //实例化scan对象                Scan scan = new Scan();                scan.addFamily(family);                scan.addColumn(family,Bytes.toBytes("age"));                try{                        //获取行数                        long rowCount = aggregationClient.rowCount(TableName.valueOf(tableName),null,scan);                        System.out.println("row count is "+rowCount);                        //获取最大值                        long max = aggregationClient.max(TableName.valueOf(tableName),new LongColumnInterpreter(),scan);                        System.out.println("max number is "+max);                        //获取最小值                        long min = aggregationClient.min(TableName.valueOf(tableName),new LongColumnInterpreter(),scan);                        System.out.println("min number is "+min);                }catch(Throwable e){                        e.printStackTrace();                }        }                        public static void main(String[] args){                BasicOperator bo = new BasicOperator();                bo.aggregate();        }}


0