千家信息网

hbase分页查询实现

发表于:2024-11-27 作者:千家信息网编辑
千家信息网最后更新 2024年11月27日,Hbase本身是没有分页查询的,我在网上找了很多资料来实现一个分页功能,在这里做了一下记录,分享给大家,有什么不足之处,请尽管指出。废话不多说,看代码。import java.io.IOExcepti
千家信息网最后更新 2024年11月27日hbase分页查询实现

Hbase本身是没有分页查询的,我在网上找了很多资料来实现一个分页功能,在这里做了一下记录,分享给大家,有什么不足之处,请尽管指出。废话不多说,看代码。

import java.io.IOException;

import java.util.LinkedHashMap;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HTableInterface;

import org.apache.hadoop.hbase.client.HTablePool;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;

import org.apache.hadoop.hbase.filter.Filter;

import org.apache.hadoop.hbase.filter.FilterList;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;

import org.apache.hadoop.hbase.util.Bytes;

publicclass HBaseUtils {

privatestatic Configuration config = null;

privatestatic HTablePool tp = null;

static {

// 加载集群配置

config = HBaseConfiguration.create();

config.set("hbase.zookeeper.quorum", "xx.xx.xx");

config.set("hbase.zookeeper.property.clientPort", "2181");

// 创建表池(可伟略提高查询性能,具体说明请百度或官方API)

tp = new HTablePool(config, 10);

}

/*

* 获取hbase的表

*/

publicstatic HTableInterface getTable(StringtableName) {

if (StringUtils.isEmpty(tableName))

returnnull;

returntp.getTable(getBytes(tableName));

}

/* 转换byte数组 */

publicstaticbyte[] getBytes(String str) {

if (str == null)

str= "";

return Bytes.toBytes(str);

}

/**

* 查询数据

* @param tableKey 表标识

* @param queryKey 查询标识

* @param startRow 开始行

* @param paramsMap 参数集合

* @return结果集

*/

publicstatic TBData getDataMap(StringtableName, String startRow,

StringstopRow, Integer currentPage, Integer pageSize)

throws IOException {

List>mapList = null;

mapList = new LinkedList>();

ResultScanner scanner = null;

// 为分页创建的封装类对象,下面有给出具体属性

TBData tbData = null;

try {

// 获取最大返回结果数量

if (pageSize == null || pageSize == 0L)

pageSize = 100;

if (currentPage == null || currentPage == 0)

currentPage = 1;

// 计算起始页和结束页

IntegerfirstPage = (currentPage - 1) * pageSize;

IntegerendPage = firstPage + pageSize;

// 从表池中取出HBASE表对象

HTableInterfacetable = getTable(tableName);

// 获取筛选对象

Scanscan = getScan(startRow, stopRow);

// 给筛选对象放入过滤器(true标识分页,具体方法在下面)

scan.setFilter(packageFilters(true));

// 缓存1000条数据

scan.setCaching(1000);

scan.setCacheBlocks(false);

scanner= table.getScanner(scan);

int i = 0;

List<byte[]> rowList = new LinkedList<byte[]>();

// 遍历扫描器对象, 并将需要查询出来的数据row key取出

for (Result result : scanner) {

String row = toStr(result.getRow());

if (i >= firstPage && i< endPage) {

rowList.add(getBytes(row));

}

i++;

}

// 获取取出的row key的GET对象

ListgetList = getList(rowList);

Result[]results = table.get(getList);

// 遍历结果

for (Result result : results) {

Map<byte[], byte[]> fmap = packFamilyMap(result);

Map rmap = packRowMap(fmap);

mapList.add(rmap);

}

// 封装分页对象

tbData= new TBData();

tbData.setCurrentPage(currentPage);

tbData.setPageSize(pageSize);

tbData.setTotalCount(i);

tbData.setTotalPage(getTotalPage(pageSize, i));

tbData.setResultList(mapList);

} catch (IOException e) {

e.printStackTrace();

} finally {

closeScanner(scanner);

}

return tbData;

}

privatestaticint getTotalPage(int pageSize, int totalCount) {

int n = totalCount / pageSize;

if (totalCount % pageSize == 0) {

return n;

} else {

return ((int) n) + 1;

}

}

// 获取扫描器对象

privatestatic Scan getScan(String startRow,String stopRow) {

Scan scan = new Scan();

scan.setStartRow(getBytes(startRow));

scan.setStopRow(getBytes(stopRow));

return scan;

}

/**

* 封装查询条件

*/

privatestatic FilterList packageFilters(boolean isPage) {

FilterList filterList = null;

// MUST_PASS_ALL(条件 AND) MUST_PASS_ONE(条件OR)

filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);

Filter filter1 = null;

Filter filter2 = null;

filter1 = newFilter(getBytes("family1"), getBytes("column1"),

CompareOp.EQUAL, getBytes("condition1"));

filter2 = newFilter(getBytes("family2"), getBytes("column1"),

CompareOp.LESS, getBytes("condition2"));

filterList.addFilter(filter1);

filterList.addFilter(filter2);

if (isPage) {

filterList.addFilter(new FirstKeyOnlyFilter());

}

return filterList;

}

privatestatic Filter newFilter(byte[] f, byte[] c, CompareOp op, byte[] v) {

returnnew SingleColumnValueFilter(f, c, op,v);

}

privatestaticvoid closeScanner(ResultScannerscanner) {

if (scanner != null)

scanner.close();

}

/**

* 封装每行数据

*/

privatestatic MappackRowMap(Map<byte[], byte[]> dataMap) {

Map map = new LinkedHashMap();

for (byte[] key : dataMap.keySet()) {

byte[] value = dataMap.get(key);

map.put(toStr(key), toStr(value));

}

return map;

}

/* 根据ROW KEY集合获取GET对象集合 */

privatestatic List getList(List<byte[]> rowList) {

List list = new LinkedList();

for (byte[] row : rowList) {

Getget = new Get(row);

get.addColumn(getBytes("family1"), getBytes("column1"));

get.addColumn(getBytes("family1"), getBytes("column2"));

get.addColumn(getBytes("family2"), getBytes("column1"));

list.add(get);

}

return list;

}

/**

* 封装配置的所有字段列族

*/

privatestatic Map<byte[], byte[]> packFamilyMap(Result result){

Map<byte[], byte[]> dataMap = null;

dataMap = new LinkedHashMap<byte[], byte[]>();

dataMap.putAll(result.getFamilyMap(getBytes("family1")));

dataMap.putAll(result.getFamilyMap(getBytes("family2")));

return dataMap;

}

privatestatic String toStr(byte[] bt) {

return Bytes.toString(bt);

}

publicstaticvoid main(String[] args) throws IOException {

// 拿出row key的起始行和结束行

// #<0<9<:

String startRow = "aaaa#";

String stopRow = "aaaa:";

int currentPage = 1;

int pageSize = 20;

// 执行hbase查询

getDataMap("table", startRow, stopRow, currentPage,pageSize);

}

}

class TBData {

private Integer currentPage;

private Integer pageSize;

private Integer totalCount;

private Integer totalPage;

private List> resultList;

public Integer getCurrentPage() {

returncurrentPage;

}

publicvoid setCurrentPage(IntegercurrentPage) {

this.currentPage = currentPage;

}

public Integer getPageSize() {

returnpageSize;

}

publicvoid setPageSize(Integer pageSize) {

this.pageSize = pageSize;

}

public Integer getTotalCount() {

returntotalCount;

}

publicvoid setTotalCount(Integer totalCount){

this.totalCount = totalCount;

}

public Integer getTotalPage() {

returntotalPage;

}

publicvoid setTotalPage(Integer totalPage) {

this.totalPage = totalPage;

}

public List> getResultList() {

returnresultList;

}

publicvoidsetResultList(List> resultList) {

this.resultList = resultList;

}

}


0