hbase callQueue的示例分析
小编给大家分享一下hbase callQueue的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
1. Hbase RPC Server
先简单描述一下Server端(Master/RegionServer)Rpc请求的主要过程:
Server启动的时候就会按配置起对应数量的Rpc请求处理线程handler监听CallQueues,当有任务到来的时候就会添加到CallQueue,Handler去消费处理请求
//Server端接到Client端请求后,会调用RpcScheduler去分发这个请求,默认RpcScheduler就是SimpleRpcSchedulerorg.apache.hadoop.hbase.ipc.ServerRpcConnection#processRequest this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call)) #703行//SimpleRpcScheduler会将这个请求添加到CallQueueorg.apache.hadoop.hbase.ipc.SimpleRpcScheduler#dispatch //每个handler会从自己的CallQueue中取任务处理,org.apache.hadoop.hbase.ipc.RpcExecutor.Handler#run()
那么这些监听线程是何时启动的呢?
//在master或regionserver启动的时候,创建rpcServicesorg.apache.hadoop.hbase.master.HMaster#createRpcServices 719org.apache.hadoop.hbase.regionserver.HRegionServer#createRpcServices 781 org.apache.hadoop.hbase.master.MasterRpcServices#MasterRpcServicesorg.apache.hadoop.hbase.regionserver.HRegionServer#createRpcServices org.apache.hadoop.hbase.regionserver.RSRpcServices#RSRpcServices(org.apache.hadoop.hbase.regionserver.HRegionServer)//创建处理RPC请求的Server端org.apache.hadoop.hbase.regionserver.RSRpcServices#createRpcServer 1294L//创建默认的RpcServer(SimpleRpcScheduler)org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory#create(org.apache.hadoop.conf.Configuration, org.apache.hadoop.hbase.ipc.PriorityFunction, org.apache.hadoop.hbase.Abortable) //为不同的RpcExecutor启动handlers org.apache.hadoop.hbase.regionserver.RSRpcServices#start org.apache.hadoop.hbase.ipc.SimpleRpcServer#start org.apache.hadoop.hbase.ipc.SimpleRpcScheduler#start
2. RpcScheduler分类
hbase中RpScheduler主要就两种:SimpleRpcScheduler和FifoRpcScheduler
其使用类型由hbase.region.server.rpc.scheduler.factory.class决定,默认为SimpleRpcSchedulerFactory.class
我们安装phoenix后还可以使用org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory
3. SimpleRpcScheduler
SimpleRpcScheduler中有四种CallQueue,不同的条件每种CallQueue对应的rpcExecutor可能也是不一样的
其实我们比较关心的,每种callExecuotr的以下四个值:callQ数量numCallQueues,handler的数量,rpcExecutor的实现类,maxQueueLength
我们在日志中或者hbase ui中可以看到如上图关于handler及queue的描述
1. Call Queue 普通队列
handlerCount=hbase.regionserver.handler.count 默认30numCallQueues=hbase.regionserver.handler.count*0.1callqReadShare=hbase.ipc.server.callqueue.read.ratio 默认0callQueueType=hbase.ipc.server.callqueue.type 默认fifomaxQueueLength=hbase.ipc.server.max.callqueue.length 默认10*handlerCount
如果 callqReadShare >0
表示读写分离,假设为0.7,则表示读队列占70%,写队列占30%
假设handlerCount=100,则numCallQueues=handlerCount*0.1=10,读队列有7个,写队列有3个
此时的rpcExecutor的实现类为RWQueueRpcExecutor,handlers及callQueue的对应关系如下图
name=default.RWQ
如果 callqReadShare<=0,callQueueType=fifo
此时的rpcExecutor的实现类为FastPathBalancedQueueRpcExecutor
name=default.FPBQ
如果 callqReadShare<=0,callQueueType=其它
此时的rpcExecutor的实现类为BalancedQueueRpcExecutor
name=default.BQ
2. Priority Queue 优先级队列
priorityHandlerCount=hbase.regionserver.metahandler.count 默认20metaCallqReadShare=hbase.ipc.server.metacallqueue.read.ratio 默认0.9fmaxQueueLength=hbase.ipc.server.max.callqueue.length 默认10*handlerCountmaxPriorityQueueLength=hbase.ipc.server.priority.max.callqueue.length 默认=maxQueueLength
如果 metaCallqReadShare > 0
different read/write handler for meta, at least 1 read handler and 1 write handler
此时的rpcExecutor的实现类为MetaRWQueueRpcExecutor
name=priority.RWQ
如果 metaCallqReadShare <=0,priorityHandlerCount > 0
此时的rpcExecutor的实现类为FastPathBalancedQueueRpcExecutor
name=priority.FPBQ
3. Replication Queue 复本队列
replicationHandlerCount=hbase.regionserver.replication.handler.count 默认为3maxQueueLength=hbase.ipc.server.max.callqueue.length 默认10*handlerCount
replicationHandlerCount > 0 才有这个队列
此时的rpcExecutor的实现类为FastPathBalancedQueueRpcExecutor
name=replication.FPBQ
4. Meta Transition Queue meta迁移队列
metaTransitionHandler=hbase.master.meta.transition.handler.count 默认1maxQueueLength=hbase.ipc.server.max.callqueue.length 默认10*handlerCountmaxPriorityQueueLength=hbase.ipc.server.priority.max.callqueue.length 默认=maxQueueLength
metaTransitionHandler>0 才有这个队列
此时的rpcExecutor的实现类为FastPathBalancedQueueRpcExecutor
name=metaPriority.FPBQ
下面这张图详细画出了4种rpcExecutor的选择和条件。
4. hanlder与queues的关系
Master或RegionServer启动的时候会启对应数量的handlers,这些handlers是干啥的呢,简单说就是处理各客户端对该regionserver的各种rpc请求的,一个handler就是一个线程,handler是怎么处理请求的呢?他不是直接处理请求,它是到指定队列里去取,也就是所有请求来了要先进某个队列,然后有handler去消费它。队列的存在可以认为是请求非常多,handler不能及时消费的时候的一个缓冲。
那么下面几个问题我们比较关心:
在程序中handler和queue的对应关系是什么样的呢?
有多少handler?
又有多少队列呢?
每个队列的长度又是多少呢?
**下面通过一幅图回答这几个问题:**hbase的callQueue有4种,这里以普通callQueue为例,原理是一样的
先介绍两个参数,再回答上面的4个问题:
hbase.regionserver.handler.count 默认30
在RegionServers上启动RPC Listener实例的数量,也就是处理rpc请求的线程数。Master handlers数量也是使用这个参数配置。太多的handler可能会适得其反。可以设置成为CPU数量的倍数。如果主要是只读的,handler和cpu数量相同就可以了。可以从CPU数量的两倍开始,再做调整
hbase.ipc.server.callqueue.handler.factor 默认0.1
确定callQueue数量的因素。值为0表示在所有handlers之间共享单个队列。值为1意味着每个handler都有自己的队列。
hbase.ipc.server.max.callqueue.length 默认为10倍hander大小
每个callQueue的队列长度,也就是可以容纳多少个请求
回答问题:
所以handler的数量默认是30个,我们可以通过
hbase.regionserver.handler.count
去修改它,对应回答问题2callQueue的数量=
hbase.regionserver.handler.count * hbase.ipc.server.callqueue.handler.factor
,也就是默认30个handler,factor为0.1的话,会有3个callQueue,对应回答问题3每个callQueue都是一个
,它的capacity就是hbase.ipc.server.max.callqueue.length
,对应回答问题4handler和callQueues的关系:对应回答问题1
这个在handler启动的时候就决定了,比如要起10个handlers,就遍历每个handler通过下标计算
index = qindex + (i % qsize);
qindex是callQueue队列的索引,qindex的取值,跟所有队列是否根据读写分离配置有关系,没有就是0开始,参考SimpleRpcScheduler图。
for (int i = 0; i < numHandlers; i++) { final int index = qindex + (i % qsize); String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index + ",port=" + port; Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index), activeHandlerCount); handler.start(); handlers.add(handler);}
5. Call queue is full问题
在hbase集群日志中经常会看到Call queue is full的问题,我们先找到报这个错误的源头:
/* 队列长度参数:hbase.ipc.server.max.callqueue.length 默认为10倍hander大小队列满的大小 = 请求1*请求1大小 + 请求2*请求2大小 + ...+ 请求length*请求length大小在队列中的大小:callQueueSizeInBytes.sum=队列中的所有请求大小之和从队列中已经读取出来的大小:totalRequestSize=buf.limit() 也就是ByteBuff中的大小是否报Call queue is full的条件就是:1. 在队列中的大小+从队列中已经读取出来的大小> maxQueueSizeInBytesmaxQueueSizeInBytes参数:hbase.ipc.server.max.callqueue.size 默认为1024*1024*1024=1G*/// Enforcing the call queue size, this triggers a retry in the client // This is a bit late to be doing this check - we have already read in the // total request. if ((totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { final ServerCall> callTooBig = createCall(id, this.service, null, null, null, null, totalRequestSize, null, 0, this.callCleanup); this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, "Call queue is full on " + this.rpcServer.server.getServerName() + ", is hbase.ipc.server.max.callqueue.size too small?"); callTooBig.sendResponseIfReady(); return; }// 或//2. 这种情况在相应的rpcScheduler dispatch的时候会判断当前队列大小是否超过maxQueueLength,如果超过了会直接返回false if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, "Call queue is full on " + this.rpcServer.server.getServerName() + ", too many items queued ?"); call.sendResponseIfReady(); }
从代码中我们知道Call queue is full会有两种情况:CALL_QUEUE_TOO_BIG_EXCEPTION
一种 hbase.ipc.server.max.callqueue.size too small?
一种 too many items queued?
尝试办法:
当报hbase.ipc.server.max.callqueue.size too small的时候,可以尝试增大
hbase.ipc.server.max.callqueue.size
默认是102410241024=1073741824=1G
当报 too many items queued的时候,可以尝试增大
hbase.ipc.server.max.callqueue.length
默认=`handlerCount*10
以上是"hbase callQueue的示例分析"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!