千家信息网

hbase0.98 coprocessor Endpoint如何实现HelloWorld

发表于:2024-11-14 作者:千家信息网编辑
千家信息网最后更新 2024年11月14日,这篇文章主要为大家展示了"hbase0.98 coprocessor Endpoint如何实现HelloWorld",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习
千家信息网最后更新 2024年11月14日hbase0.98 coprocessor Endpoint如何实现HelloWorld

这篇文章主要为大家展示了"hbase0.98 coprocessor Endpoint如何实现HelloWorld",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"hbase0.98 coprocessor Endpoint如何实现HelloWorld"这篇文章吧。

HBase作为列族数据库最经常被人诟病的特性包括:无法轻易建立"二级索引",难以执行求和、计数、排序等操作。比如,在旧版本的(<0.92)Hbase中,统计数据表的总行数,需要使用Counter方法,执行一次MapReduce Job才能得到。虽然HBase在数据存储层中集成了MapReduce,能够有效用于数据表的分布式计算。然而在很多情况下,做一些简单的相加或者聚合计算的时候,如果直接将计算过程放置在server端,能够减少通讯开销,从而获得很好的性能提升。于是,HBase在0.92之后引入了协处理器(coprocessors),实现一些激动人心的新特性:能够轻易建立二次索引、复杂过滤器(谓词下推)以及访问控制等。 HBase协处理器的灵感来自于Jeff Dean 09年的演讲( P66-67)。

####hbase coprocessor 大类分为两种coprocessor分别是:

  1. RegionObserver :它是一种类似于传统数据库的触发器,提供了钩子函数:Get、Put、Delete、Scan等。

  1. Endpoint:是一个远程rpc调用,类似于webservice形式调用,但他不适用xml,而是使用的序列化框架是protobuf(序列化后数据更小),本文将介绍此种Coprocessor.

Endpoint 允许您定义自己的动态RPC协议,用于客户端与region servers通讯。Coprocessor 与region server在相同的进程空间中,因此您可以在region端定义自己的方法(endpoint),将计算放到region端,减少网络开销,常用于提升hbase的功能,如:count,sum等。

###我的环境

  • hadoop : 2.2

  • hbase-hadoop2 :0.98+

  • JDK:1.6 ##这里必须要1.6 要不然会出现不能加载jar包的现象。

  • 操作系统:CentOS 6.4

###编写代码

  1. 首先你需要利用protobuf(网上自己搜google维护的目前发展到2.5版本) 工具成一个HelloWorld 序列化对象。

    ####HelloWorld.proto

    option java_package = "com.gzhdi.coprocessor.generated";option java_outer_classname = "ServerHelloworld";option java_generic_services = true;option java_generate_equals_and_hash = true;option optimize_for = SPEED;message HelloRequest {  required bytes askWord = 10;}message HelloResponse {  required bytes retWord = 10;}message AskRequest {  required bytes ask = 100;}message AnsResponse {  required bytes ans = 100;}service HelloWorld {  rpc sendHello(HelloRequest)    returns (HelloResponse);  rpc question(AskRequest)    returns (AnsResponse);}


  2. 使用命令生成代码,并拷贝到你的工程里边去,我的文件在工程下面放着呢,直接生成到工程里边。 这段代码就会生成一个HelloWorld.java文件.

    protoc.exe  --java_out=../src HelloWorld.proto


  3. 编写主要代码

    ####server端代码

    package com.gzhdi.copocessor;import java.io.IOException;import org.apache.hadoop.hbase.Coprocessor;import org.apache.hadoop.hbase.CoprocessorEnvironment;import org.apache.hadoop.hbase.coprocessor.CoprocessorException;import org.apache.hadoop.hbase.coprocessor.CoprocessorService;import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;import com.google.protobuf.ByteString;import com.google.protobuf.RpcCallback;import com.google.protobuf.RpcController;import com.google.protobuf.Service;import com.gzhdi.coprocessor.generated.ServerHelloworld;import com.gzhdi.coprocessor.generated.ServerHelloworld.AnsResponse;import com.gzhdi.coprocessor.generated.ServerHelloworld.AskRequest;import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloRequest;import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloResponse;public class HelloWorldEndPoint  extends ServerHelloworld.HelloWorld implements Coprocessor,CoprocessorService{        private RegionCoprocessorEnvironment env;         @Override        public void sendHello(RpcController controller, HelloRequest request,                        RpcCallback done) {                System.out.println("request HelloRequest:"+request.getAskWord());                HelloResponse resp=HelloResponse.newBuilder().setRetWord(ByteString.copyFromUtf8("hello world!!!")).build();                done.run(resp);        }        @Override        public void question(RpcController controller, AskRequest request,                        RpcCallback done) {                System.out.println("request question:"+request.getAsk());                AnsResponse resp=AnsResponse.newBuilder().setAns(ByteString.copyFromUtf8("helloworld,"+request.getAsk().toStringUtf8())).build();                done.run(resp);        }        @Override        public Service getService() {                return this;        }        @Override        public void start(CoprocessorEnvironment env) throws IOException {                if (env instanceof RegionCoprocessorEnvironment) {                        this.env = (RegionCoprocessorEnvironment)env;                      } else {                        throw new CoprocessorException("Must be loaded on a table region!");                      }          }        @Override        public void stop(CoprocessorEnvironment env) throws IOException {        }}


    ####client 端代码

    package com.gzhdi.copocessor;import java.io.IOException;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.coprocessor.Batch;import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;import org.apache.hadoop.hbase.ipc.ServerRpcController;import com.google.protobuf.ByteString;import com.google.protobuf.ServiceException;import com.gzhdi.coprocessor.generated.ServerHelloworld.AnsResponse;import com.gzhdi.coprocessor.generated.ServerHelloworld.AskRequest;import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloWorld;public class HelloWorldClient {        public static void main(String[] args) throws ServiceException, Throwable {                myclient();        }//如果你没有写好自己的例子可以跑跑hbase自带的小例子//      private static void example1() throws IOException, ServiceException,//                      Throwable {//              System.out.println("begin.....");  //        long begin_time=System.currentTimeMillis();  //       Configuration config=HBaseConfiguration.create();  ////     String master_ip="192.168.150.128";  //       String master_ip="10.10.113.211";  //       String zk_ip="10.10.113.211";  //       String table_name="t1";  //       config.set("hbase.zookeeper.property.clientPort", "2181");   //       config.set("hbase.zookeeper.quorum", zk_ip);   //       config.set("hbase.master", master_ip+":600000");  //       //       HTable table = new HTable(config, table_name);//       final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();//       Map results = table.coprocessorService(//           ExampleProtos.RowCountService.class, // the protocol interface we're invoking//           null, null,                          // start and end row keys//           //           new Batch.Call() {//                 //               public Long call(Object counter) throws IOException {//                 BlockingRpcCallback rpcCallback =//                     new BlockingRpcCallback();//                 ((ExampleProtos.RowCountService)counter).getRowCount(null, request, rpcCallback);//                 ExampleProtos.CountResponse response = rpcCallback.get();//                 System.out.println("count :::::"+response.getCount());//                 return response.hasCount() ? response.getCount() : 0;//               }//                      //           });//      }        public static void myclient(){                // TODO Auto-generated method stub                                System.out.println("begin.....");                          long begin_time=System.currentTimeMillis();                         Configuration config=HBaseConfiguration.create();  //                   String master_ip="192.168.150.128";                         String master_ip="10.10.113.211";                         String zk_ip="10.10.113.211";                         String table_name="t1";                         config.set("hbase.zookeeper.property.clientPort", "2181");                          config.set("hbase.zookeeper.quorum", zk_ip);                          config.set("hbase.master", master_ip+":600000");                         final AskRequest req=AskRequest.newBuilder().setAsk(ByteString.copyFromUtf8("hello")).build();                       AnsResponse resp=null;                       try {                                HTable table=new HTable(config,table_name);                                Map re=table.coprocessorService(HelloWorld.class, null, null, new Batch.Call() {                                        @Override                                        public ByteString call(HelloWorld instance) throws IOException {                                                ServerRpcController controller = new ServerRpcController();                                                BlockingRpcCallback rpccall=new BlockingRpcCallback();                                                instance.question(controller, req, rpccall);                                                AnsResponse resp=rpccall.get();                                                //result                                                System.out.println("resp:"+ resp.getAns().toStringUtf8());                                                return resp.getAns();                                        }                                });                        } catch (IOException e) {                                e.printStackTrace();                        } catch (ServiceException e) {                                e.printStackTrace();                        } catch (Throwable e) {                                e.printStackTrace();                        }          }}


  4. 利用jdk 1.6打包(切记jdk1.6,因为hbase用1.6打包的) 导出hellworld.jar 包名随便起。

###部署

  1. 将包helloworld.jar 放在 %HBASE_HOME/lib/ 下就可以了。

  2. 重新启动hbase

  3. 验证

     [root@hdp22 ~ Desktop]# hbase shellhbase(main):001:0> import com.gzhdi.copocessor.HelloWorldEndPoint=> Java::ComGzhdiCopocessor::HelloWorldEndPoint    //如果打印出这句话就说明包已经加载完毕


  4. 向指定表添加endpoint

    hbase(main):002:0> create 't1','f1'0 row(s) in 6.5290 seconds=> Hbase::Table - t1   //创建表t1hbase(main):003:0> alter 't1','coprocessor'=>'|com.gzhdi.copocessor.HelloWorldEndPoint|1001|'Updating all regions with the new schema...0/1 regions updated.1/1 regions updated.Done.0 row(s) in 2.5960 secondshbase(main):005:0> describe 't1'DESCRIPTION                                                                                                              ENABLED                                                           't1', {TABLE_ATTRIBUTES => {coprocessor$1 => '|com.gzhdi.copocessor.HelloWorldEndPoint|1001|'}, {NAME => 'f1', DATA_BLO true                                                              CK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERS                                                                   IONS => '0', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE                                                                    => 'true'}                                                                                                                                                                              1 row(s) in 0.0940 seconds//OK 成功了


###调用 现在就可以使用你的客户端代码调用该服务了,需要制定zookeeper地址和表名(因为服务是针对表的)。

以上是"hbase0.98 coprocessor Endpoint如何实现HelloWorld"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0