千家信息网

Redis命令处理过程实例源码分析

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,这篇文章主要介绍"Redis命令处理过程实例源码分析"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"Redis命令处理过程实例源码分析"文章能帮助大家解决问题。
千家信息网最后更新 2025年02月01日Redis命令处理过程实例源码分析

这篇文章主要介绍"Redis命令处理过程实例源码分析"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"Redis命令处理过程实例源码分析"文章能帮助大家解决问题。

本文基于社区版Redis 4.0.8

1、命令解析

Redis服务器接收到的命令请求首先存储在客户端对象的querybuf输入缓冲区,然后解析命令请求的各个参数,并存储在客户端对象的argv和argc字段。

客户端解析命令请求的入口函数为readQueryFromClient,会读取socket数据存储到客户端对象的输入缓冲区,并调用函数processInputBuffer解析命令请求。

注:内联命令:使用telnet会话输入命令的方式

void processInputBuffer(client *c) {    ......    //循环遍历输入缓冲区,获取命令参数,调用processMultibulkBuffer解析命令参数和长度    while(sdslen(c->querybuf)) {        if (c->reqtype == PROTO_REQ_INLINE) {            if (processInlineBuffer(c) != C_OK) break;//处理telnet方式的内联命令        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {            if (processMultibulkBuffer(c) != C_OK) break; //解析命令参数和长度暂存到客户端结构体中        } else {            serverPanic("Unknown request type");        }    }    }//解析命令参数和长度暂存到客户端结构体中int processMultibulkBuffer(client *c) {    //定位到行尾    newline = strchr(c->querybuf,'\r');    //解析命令请求参数数目,并存储在客户端对象的c->multibulklen字段    serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');    ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);    c->multibulklen = ll;    pos = (newline-c->querybuf)+2;//记录已解析命令的请求长度resp的长度    /* Setup argv array on client structure */    //分配请求参数存储空间    c->argv = zmalloc(sizeof(robj*)*c->multibulklen);        // 开始循环解析每个请求参数    while(c->multibulklen) {        ......        newline = strchr(c->querybuf+pos,'\r');        if (c->querybuf[pos] != '$') {            return C_ERR;        ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);        pos += newline-(c->querybuf+pos)+2;        c->bulklen = ll;//字符串参数长度暂存在客户端对象的bulklen字段                //读取该长度的参数内容,并创建字符串对象,同时更新待解析参数multibulklen        c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen);        pos += c->bulklen+2;        c->multibulklen--;    }

2、命令调用

当multibulklen的值更新为0时,表示参数解析完成,开始调用processCommand来处理命令,处理命令前有很多校验逻辑,如下:

void processInputBuffer(client *c) {        ......     //调用processCommand来处理命令     if (processCommand(c) == C_OK) {         ......     }}//处理命令函数int processCommand(client *c) {    //校验是否是quit命令    if (!strcasecmp(c->argv[0]->ptr,"quit")) {        addReply(c,shared.ok);        c->flags |= CLIENT_CLOSE_AFTER_REPLY;        return C_ERR;    }    //调用lookupCommand,查看该命令是否存在    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);    if (!c->cmd) {        flagTransaction(c);        addReplyErrorFormat(c,"unknown command '%s'",            (char*)c->argv[0]->ptr);        return C_OK;    //检查用户权限    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)    {        addReply(c,shared.noautherr);    //还有很多检查,不一一列举,比如集群/持久化/复制等    /* 真正执行命令 */    if (c->flags & CLIENT_MULTI &&        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)        queueMultiCommand(c);        //将结果写入outbuffer        addReply(c,shared.queued);    } // 调用execCommand执行命令void execCommand(client *c) {    call(c,CMD_CALL_FULL);//调用call执行命令//调用execCommand调用call执行命令void call(client *c, int flags) {    start = ustime();    c->cmd->proc(c);//执行命令    duration = ustime()-start;    //如果是慢查询,记录慢查询    if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {        char *latency_event = (c->cmd->flags & CMD_FAST) ?                              "fast-command" : "command";        latencyAddSampleIfNeeded(latency_event,duration/1000);        //记录到慢日志中        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);    //更新统计信息:当前命令执行时间和调用次数    if (flags & CMD_CALL_STATS) {        c->lastcmd->microseconds += duration;        c->lastcmd->calls++;

3、返回结果

Redis返回结果并不是直接返回给客户端,而是先写入到输出缓冲区(buf字段)或者输出链表(reply字段)

int processCommand(client *c) {    ......    //将结果写入outbuffer    addReply(c,shared.queued);    ......    }//将结果写入outbuffervoid addReply(client *c, robj *obj) {    //调用listAddNodeHead将客户端添加到服务端结构体的client_pending_write链表,以便后续能快速查找出哪些客户端有数据需要发送    if (prepareClientToWrite(c) != C_OK) return;        //然后添加字符串到输出缓冲区    if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)        //如果添加失败,则添加到输出链表中        _addReplyObjectToList(c,obj); }

addReply函数只是将待发送给客户端的数据暂存在输出链表或者输出缓冲区,那么什么时候将这些数据发送给客户端呢?答案是开启事件循环时,调用的beforesleep函数,该函数专门执行一些不是很费时的操作,如过期键删除,向客户端返回命令回复等

void beforeSleep(struct aeEventLoop *eventLoop) {    ......     /* Handle writes with pending output buffers. */    handleClientsWithPendingWrites();}//回复客户端命令函数int handleClientsWithPendingWrites(void) {    listIter li;    listNode *ln;    int processed = listLength(server.clients_pending_write);    listRewind(server.clients_pending_write,&li);    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        c->flags &= ~CLIENT_PENDING_WRITE;        listDelNode(server.clients_pending_write,ln);        /* 发送客户端数据 */        if (writeToClient(c->fd,c,0) == C_ERR) continue;        /* If there is nothing left, do nothing. Otherwise install         * the write handler. */         //如果数据量很大,一次性没有发送完成,则进行添加文件事件,监听当前客户端socket文件描述符的可写事件即可        if (clientHasPendingReplies(c) &&            aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,                sendReplyToClient, c) == AE_ERR)        {            freeClientAsync(c);        }    }    return processed;

关于"Redis命令处理过程实例源码分析"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注行业资讯频道,小编每天都会为大家更新不同的知识点。

0