千家信息网

hbase0.98.9中如何实现endpoints

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇文章为大家展示了hbase0.98.9中如何实现endpoints,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。定制一个endpoint的过程。下面是实现
千家信息网最后更新 2025年02月03日hbase0.98.9中如何实现endpoints

本篇文章为大家展示了hbase0.98.9中如何实现endpoints,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

定制一个endpoint的过程。

下面是实现过程:

1、定义接口描述文件(该功能有protobuf提供出来

option java_package = "coprocessor.endpoints.generated";option java_outer_classname = "RowCounterEndpointProtos";option java_generic_services = true;option java_generate_equals_and_hash = true;option optimize_for = SPEED;message CountRequest {}message CountResponse {  required int64 count = 1 [default = 0];}service RowCountService {  rpc getRowCount(CountRequest)    returns (CountResponse);  rpc getKeyValueCount(CountRequest)    returns (CountResponse);}

这个文件我直接拿的hbase提供的example中的例子。其中的语法应该有过类似经验的一看就清楚了,实在不清楚就请查查protobuf的帮助手册吧。

2、根据接口描述文件生成java接口类(该功能有protobuf提供出来)

有了接口描述文件,还需要生成java语言的接口类。这个需要借助protobuf提供的工具protoc。

$protoc --java_out=./ Examples.proto

简单解释下,protoc这个命令在你装了protobuf后就有了。Examples.proto这个是文件名,也就是刚才编写的那个接口描述文件。"--java_out"这个用来指定生成后的java类放的地方。

所以,这地方如果你没有装protobuf,你需要装一个,window和linux版都有,多说一句,如果你去装hadoop64位的编译环境的话,应该是要装protobuf。

3、实现接口

package coprocessor;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.Coprocessor;import org.apache.hadoop.hbase.CoprocessorEnvironment;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.coprocessor.CoprocessorException;import org.apache.hadoop.hbase.coprocessor.CoprocessorService;import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;import org.apache.hadoop.hbase.protobuf.ResponseConverter;import org.apache.hadoop.hbase.regionserver.InternalScanner;import org.apache.hadoop.hbase.util.Bytes;import com.google.protobuf.RpcCallback;import com.google.protobuf.RpcController;import com.google.protobuf.Service;import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;public class RowCounterEndpointExample extends RowCountService implements                Coprocessor, CoprocessorService {        private RegionCoprocessorEnvironment env;        public RowCounterEndpointExample() {        }        @Override        public Service getService() {                return this;        }        @Override        public void getRowCount(RpcController controller, CountRequest request,                        RpcCallback done) {                Scan scan = new Scan();                scan.setFilter(new FirstKeyOnlyFilter());                CountResponse response = null;                InternalScanner scanner = null;                try {                        scanner = env.getRegion().getScanner(scan);                        List results = new ArrayList();                        boolean hasMore = false;                        byte[] lastRow = null;                        long count = 0;                        do {                                hasMore = scanner.next(results);                                for (Cell kv : results) {                                        byte[] currentRow = CellUtil.cloneRow(kv);                                        if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {                                                lastRow = currentRow;                                                count++;                                        }                                }                                results.clear();                        } while (hasMore);                        response = CountResponse.newBuilder().setCount(count).build();                } catch (IOException ioe) {                        ResponseConverter.setControllerException(controller, ioe);                } finally {                        if (scanner != null) {                                try {                                        scanner.close();                                } catch (IOException ignored) {                                }                        }                }                done.run(response);        }        @Override        public void getKeyValueCount(RpcController controller,                        CountRequest request, RpcCallback done) {                CountResponse response = null;                InternalScanner scanner = null;                try {                        scanner = env.getRegion().getScanner(new Scan());                        List results = new ArrayList();                        boolean hasMore = false;                        long count = 0;                        do {                                hasMore = scanner.next(results);                                for (Cell kv : results) {                                        count++;                                }                                results.clear();                        } while (hasMore);                        response = CountResponse.newBuilder().setCount(count).build();                } catch (IOException ioe) {                        ResponseConverter.setControllerException(controller, ioe);                } finally {                        if (scanner != null) {                                try {                                        scanner.close();                                } catch (IOException ignored) {                                }                        }                }                done.run(response);        }        @Override        public void start(CoprocessorEnvironment env) throws IOException {                if (env instanceof RegionCoprocessorEnvironment) {                        this.env = (RegionCoprocessorEnvironment) env;                } else {                        throw new CoprocessorException("Must be loaded on a table region!");                }        }        @Override        public void stop(CoprocessorEnvironment env) throws IOException {                // TODO Auto-generated method stub        }}

4、注册接口(Hbase功能,通过配置文件或者表模式方式注册

这部分,可以看hbase权威指南了,我就看这部分做的。

5、测试调用

package coprocessor;import java.io.IOException;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.coprocessor.Batch;import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;import org.apache.hadoop.hbase.ipc.ServerRpcController;import org.apache.hadoop.hbase.util.Bytes;import com.google.protobuf.ServiceException;import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;import util.HBaseHelper;public class RowCounterEndpointClientExample {        public static void main(String[] args) throws ServiceException, Throwable {                Configuration conf = HBaseConfiguration.create();                HBaseHelper helper = HBaseHelper.getHelper(conf);                //helper.dropTable("testtable");                //helper.createTable("testtable", "colfam1", "colfam2");                System.out.println("Adding rows to table...");                helper.fillTable("testtable", 1, 10, 10, "colfam1", "colfam2");                HTable table = new HTable(conf, "testtable");                final CountRequest request = CountRequest.getDefaultInstance();                                final Batch.Call call =new Batch.Call() {                        public Long call(RowCountService counter)                                        throws IOException {                                ServerRpcController controller = new ServerRpcController();                                BlockingRpcCallback rpcCallback = new BlockingRpcCallback();                                counter.getRowCount(controller, request, rpcCallback);                                CountResponse response = rpcCallback.get();                                if (controller.failedOnException()) {                                        throw controller.getFailedOn();                                }                                return (response != null && response.hasCount()) ? response                                                .getCount() : 0;                        }                };                                Map results = table.coprocessorService(                                RowCountService.class, null, null, call);                                for(byte[] b : results.keySet()){                        System.err.println(Bytes.toString(b) + ":" + results.get(b));                }         }}

上述内容就是hbase0.98.9中如何实现endpoints,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

0