千家信息网

PostgreSQL 源码解读(150)- PG Tools#2(BaseBackup函数)

发表于:2025-02-09 作者:千家信息网编辑
千家信息网最后更新 2025年02月09日,本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的函数BaseBackup.一、数据结构option使用工具时存储选项的数据结构#ifndef HAVE_S
千家信息网最后更新 2025年02月09日PostgreSQL 源码解读(150)- PG Tools#2(BaseBackup函数)

本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的函数BaseBackup.

一、数据结构

option
使用工具时存储选项的数据结构

#ifndef HAVE_STRUCT_OPTION//工具软件选项struct option{    const char *name;//名称    int         has_arg;//是否包含参数,no_argument/required_argument/optional_argument    int        *flag;//标记    int         val;//参数值};#define no_argument 0#define required_argument 1#define optional_argument 2#endif/* * On OpenBSD and some versions of Solaris, opterr and friends are defined in * core libc rather than in a separate getopt module.  Define these variables * only if configure found they aren't there by default; otherwise, this * module and its callers will just use libc's variables.  (We assume that * testing opterr is sufficient for all of these.) */#ifndef HAVE_INT_OPTERRint         opterr = 1,         /* if error message should be printed */            optind = 1,         /* index into parent argv vector */            optopt;             /* character checked for validity */char       *optarg;             /* argument associated with option */#endif#define BADCH   (int)'?'#define BADARG  (int)':'#define EMSG    ""

pg_result
用于接收PQgetResult的返回结果.

struct pg_result{    //元组数量    int         ntups;    //属性数量    int         numAttributes;    PGresAttDesc *attDescs;    //PGresTuple数组    PGresAttValue **tuples;     /* each PGresTuple is an array of                                 * PGresAttValue's */    //元组数组的大小    int         tupArrSize;     /* allocated size of tuples array */    //参数格式    int         numParameters;    //参数描述符    PGresParamDesc *paramDescs;    //执行状态类型(枚举变量)    ExecStatusType resultStatus;    //从查询返回的命令状态    char        cmdStatus[CMDSTATUS_LEN];   /* cmd status from the query */    //1-二进制的元组数据,否则为文本数据    int         binary;         /* binary tuple values if binary == 1,                                 * otherwise text */    /*     * These fields are copied from the originating PGconn, so that operations     * on the PGresult don't have to reference the PGconn.     * 这些字段从原始的PGconn中拷贝,以便不需要依赖PGconn     */    //钩子函数    PGNoticeHooks noticeHooks;    PGEvent    *events;    int         nEvents;    int         client_encoding;    /* encoding id */    /*     * Error information (all NULL if not an error result).  errMsg is the     * "overall" error message returned by PQresultErrorMessage.  If we have     * per-field info then it is stored in a linked list.     * 错误信息(如没有错误,则全部为NULL)     * errMsg是PQresultErrorMessage返回的"overall"错误信息.     * 如果存在per-field信息,那么会存储在相互链接的链表中     */    //错误信息    char       *errMsg;         /* error message, or NULL if no error */    //按字段拆分的信息    PGMessageField *errFields;  /* message broken into fields */    //如可用,触发查询的文本信息    char       *errQuery;       /* text of triggering query, if available */    /* All NULL attributes in the query result point to this null string */    //查询结果中的所有NULL属性指向该null字符串    char        null_field[1];    /*     * Space management information.  Note that attDescs and error stuff, if     * not null, point into allocated blocks.  But tuples points to a     * separately malloc'd block, so that we can realloc it.     * 空间管理信息.     * 注意attDescs和error,如为not null,则指向已分配的blocks.     * 但元组指向单独的已分配的block,因此可以重新分配空间.     */    //最近已分配的block    PGresult_data *curBlock;    /* most recently allocated block */    //块中空闲空间的开始偏移    int         curOffset;      /* start offset of free space in block */    //块中剩余的空闲字节    int         spaceLeft;      /* number of free bytes remaining in block */    //该PGresult结构体总共的分配空间    size_t      memorySize;     /* total space allocated for this PGresult */};/* Data about a single parameter of a prepared statement *///prepared statement语句的单个参数的数据typedef struct pgresParamDesc{    //类型ID    Oid         typid;          /* type id */} PGresParamDesc;typedef enum{    //空查询串    PGRES_EMPTY_QUERY = 0,      /* empty query string was executed */    //后台进程正常执行了没有结果返回的查询命令    PGRES_COMMAND_OK,           /* a query command that doesn't return                                 * anything was executed properly by the                                 * backend */    //后台进程正常执行了有元组返回的查询命令    //PGresult中有结果元组    PGRES_TUPLES_OK,            /* a query command that returns tuples was                                 * executed properly by the backend, PGresult                                 * contains the result tuples */    //拷贝数据OUT,传输中    PGRES_COPY_OUT,             /* Copy Out data transfer in progress */    //拷贝数据IN,传输中    PGRES_COPY_IN,              /* Copy In data transfer in progress */    //从后台进程中收到非期望中的响应    PGRES_BAD_RESPONSE,         /* an unexpected response was recv'd from the                                 * backend */    //提示或警告信息    PGRES_NONFATAL_ERROR,       /* notice or warning message */    //查询失败    PGRES_FATAL_ERROR,          /* query failed */    //拷贝I/O,传输中    PGRES_COPY_BOTH,            /* Copy In/Out data transfer in progress */    //更大的结果集中的单个元组    PGRES_SINGLE_TUPLE          /* single tuple from larger resultset */} ExecStatusType;typedef union pgresult_data PGresult_data;union pgresult_data{    //链接到下一个block,或者为NULL    PGresult_data *next;        /* link to next block, or NULL */    //以字节形式访问块    char        space[1];       /* dummy for accessing block as bytes */};

二、源码解读

BaseBackup,实际执行备份的函数.
主要逻辑是通过libpq接口向服务器端发起备份请求(BASE_BACKUP命令)

static voidBaseBackup(void){    PGresult   *res;    char       *sysidentifier;    TimeLineID  latesttli;    TimeLineID  starttli;    char       *basebkp;    char        escaped_label[MAXPGPATH];    char       *maxrate_clause = NULL;    int         i;    char        xlogstart[64];    char        xlogend[64];    int         minServerMajor,                maxServerMajor;    int         serverVersion,                serverMajor;    //数据库连接    Assert(conn != NULL);    /*     * Check server version. BASE_BACKUP command was introduced in 9.1, so we     * can't work with servers older than 9.1.     * 检查服务器版本.BASE_BACKUP在9.1+才出现,数据库版本不能低于9.1.     */    minServerMajor = 901;    maxServerMajor = PG_VERSION_NUM / 100;    serverVersion = PQserverVersion(conn);    serverMajor = serverVersion / 100;    if (serverMajor < minServerMajor || serverMajor > maxServerMajor)    {        const char *serverver = PQparameterStatus(conn, "server_version");        fprintf(stderr, _("%s: incompatible server version %s\n"),                progname, serverver ? serverver : "'unknown'");        exit(1);    }    /*     * If WAL streaming was requested, also check that the server is new     * enough for that.     * 要求WAL streaming,检查数据库是否支持     */    if (includewal == STREAM_WAL && !CheckServerVersionForStreaming(conn))    {        /*         * Error message already written in CheckServerVersionForStreaming(),         * but add a hint about using -X none.         * 错误信息已在CheckServerVersionForStreaming()中体现,这里添加-X提示.         */        fprintf(stderr, _("HINT: use -X none or -X fetch to disable log streaming\n"));        exit(1);    }    /*     * Build contents of configuration file if requested     * 如需要创建recovery.conf文件     */    if (writerecoveryconf)        GenerateRecoveryConf(conn);    /*     * Run IDENTIFY_SYSTEM so we can get the timeline     * 执行RunIdentifySystem,获取时间线     */    if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))        exit(1);    /*     * Start the actual backup     * 开始实际的backup     */    PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);    if (maxrate > 0)        maxrate_clause = psprintf("MAX_RATE %u", maxrate);    if (verbose)        //提示信息        fprintf(stderr,                _("%s: initiating base backup, waiting for checkpoint to complete\n"),                progname);    if (showprogress && !verbose)    {        //进度信息        fprintf(stderr, "waiting for checkpoint");        if (isatty(fileno(stderr)))            fprintf(stderr, "\r");        else            fprintf(stderr, "\n");    }    //base backup命令    basebkp =        psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",                 escaped_label,                 showprogress ? "PROGRESS" : "",                 includewal == FETCH_WAL ? "WAL" : "",                 fastcheckpoint ? "FAST" : "",                 includewal == NO_WAL ? "" : "NOWAIT",                 maxrate_clause ? maxrate_clause : "",                 format == 't' ? "TABLESPACE_MAP" : "",                 verify_checksums ? "" : "NOVERIFY_CHECKSUMS");    //调用API    if (PQsendQuery(conn, basebkp) == 0)    {        fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),                progname, "BASE_BACKUP", PQerrorMessage(conn));        exit(1);    }    /*     * Get the starting WAL location     * 获取WAL起始位置     */    //获取PQ执行结果    res = PQgetResult(conn);    if (PQresultStatus(res) != PGRES_TUPLES_OK)    {        fprintf(stderr, _("%s: could not initiate base backup: %s"),                progname, PQerrorMessage(conn));        exit(1);    }    //判断ntuples    if (PQntuples(res) != 1)    {        fprintf(stderr,                _("%s: server returned unexpected response to BASE_BACKUP command; got %d rows and %d fields, expected %d rows and %d fields\n"),                progname, PQntuples(res), PQnfields(res), 1, 2);        exit(1);    }    //获取WAL start位置    strlcpy(xlogstart, PQgetvalue(res, 0, 0), sizeof(xlogstart));    if (verbose)        fprintf(stderr, _("%s: checkpoint completed\n"), progname);    /*     * 9.3 and later sends the TLI of the starting point. With older servers,     * assume it's the same as the latest timeline reported by     * IDENTIFY_SYSTEM.     * 9.3+在起始点就传送了TLI.     */    if (PQnfields(res) >= 2)        starttli = atoi(PQgetvalue(res, 0, 1));    else        starttli = latesttli;    PQclear(res);    MemSet(xlogend, 0, sizeof(xlogend));    if (verbose && includewal != NO_WAL)        fprintf(stderr, _("%s: write-ahead log start point: %s on timeline %u\n"),                progname, xlogstart, starttli);    /*     * Get the header     * 获取头部信息     */    res = PQgetResult(conn);    if (PQresultStatus(res) != PGRES_TUPLES_OK)    {        fprintf(stderr, _("%s: could not get backup header: %s"),                progname, PQerrorMessage(conn));        exit(1);    }    if (PQntuples(res) < 1)    {        fprintf(stderr, _("%s: no data returned from server\n"), progname);        exit(1);    }    /*     * Sum up the total size, for progress reporting     * 统计总大小,用于进度报告     */    totalsize = totaldone = 0;    tablespacecount = PQntuples(res);    for (i = 0; i < PQntuples(res); i++)    {        totalsize += atol(PQgetvalue(res, i, 2));        /*         * Verify tablespace directories are empty. Don't bother with the         * first once since it can be relocated, and it will be checked before         * we do anything anyway.         * 验证表空间目录是否为空.         * 首次验证不需要报警,因为可以重新定位并且在作其他事情前会检查.         */        if (format == 'p' && !PQgetisnull(res, i, 1))        {            char       *path = unconstify(char *, get_tablespace_mapping(PQgetvalue(res, i, 1)));            verify_dir_is_empty_or_create(path, &made_tablespace_dirs, &found_tablespace_dirs);        }    }    /*     * When writing to stdout, require a single tablespace     * 在写入stdout时,要求一个独立的表空间.     */    if (format == 't' && strcmp(basedir, "-") == 0 && PQntuples(res) > 1)    {        fprintf(stderr,                _("%s: can only write single tablespace to stdout, database has %d\n"),                progname, PQntuples(res));        exit(1);    }    /*     * If we're streaming WAL, start the streaming session before we start     * receiving the actual data chunks.     * 如果正在streaming WAL,开始接收实际的数据chunks前,开始streaming session.     */    if (includewal == STREAM_WAL)    {        if (verbose)            fprintf(stderr, _("%s: starting background WAL receiver\n"),                    progname);        StartLogStreamer(xlogstart, starttli, sysidentifier);    }    /*     * Start receiving chunks     * 开始接收chunks     */    for (i = 0; i < PQntuples(res); i++)//所有的表空间    {        if (format == 't')            //tar包            ReceiveTarFile(conn, res, i);        else            //普通文件            ReceiveAndUnpackTarFile(conn, res, i);    }                           /* Loop over all tablespaces */    if (showprogress)    {        progress_report(PQntuples(res), NULL, true);        if (isatty(fileno(stderr)))            fprintf(stderr, "\n");  /* Need to move to next line */    }    PQclear(res);    /*     * Get the stop position     */    res = PQgetResult(conn);    if (PQresultStatus(res) != PGRES_TUPLES_OK)    {        fprintf(stderr,                _("%s: could not get write-ahead log end position from server: %s"),                progname, PQerrorMessage(conn));        exit(1);    }    if (PQntuples(res) != 1)    {        fprintf(stderr,                _("%s: no write-ahead log end position returned from server\n"),                progname);        exit(1);    }    strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend));    if (verbose && includewal != NO_WAL)        fprintf(stderr, _("%s: write-ahead log end point: %s\n"), progname, xlogend);    PQclear(res);    //    res = PQgetResult(conn);    if (PQresultStatus(res) != PGRES_COMMAND_OK)    {        const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);        if (sqlstate &&            strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0)        {            fprintf(stderr, _("%s: checksum error occurred\n"),                    progname);            checksum_failure = true;        }        else        {            fprintf(stderr, _("%s: final receive failed: %s"),                    progname, PQerrorMessage(conn));        }        exit(1);    }    if (bgchild > 0)    {#ifndef WIN32        int         status;        pid_t       r;#else        DWORD       status;        /*         * get a pointer sized version of bgchild to avoid warnings about         * casting to a different size on WIN64.         */        intptr_t    bgchild_handle = bgchild;        uint32      hi,                    lo;#endif        if (verbose)            fprintf(stderr,                    _("%s: waiting for background process to finish streaming ...\n"), progname);#ifndef WIN32//WIN32        if (write(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))        {            fprintf(stderr,                    _("%s: could not send command to background pipe: %s\n"),                    progname, strerror(errno));            exit(1);        }        /* Just wait for the background process to exit */        r = waitpid(bgchild, &status, 0);        if (r == (pid_t) -1)        {            fprintf(stderr, _("%s: could not wait for child process: %s\n"),                    progname, strerror(errno));            exit(1);        }        if (r != bgchild)        {            fprintf(stderr, _("%s: child %d died, expected %d\n"),                    progname, (int) r, (int) bgchild);            exit(1);        }        if (status != 0)        {            fprintf(stderr, "%s: %s\n",                    progname, wait_result_to_str(status));            exit(1);        }        /* Exited normally, we're happy! */#else                           /* WIN32 */        /*         * On Windows, since we are in the same process, we can just store the         * value directly in the variable, and then set the flag that says         * it's there.         * 在Windows平台,因为在同一个进程中,只需要直接存储值到遍历中,然后设置标记即可.         */        if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2)        {            fprintf(stderr,                    _("%s: could not parse write-ahead log location \"%s\"\n"),                    progname, xlogend);            exit(1);        }        xlogendptr = ((uint64) hi) << 32 | lo;        InterlockedIncrement(&has_xlogendptr);        /* First wait for the thread to exit */        if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) !=            WAIT_OBJECT_0)        {            _dosmaperr(GetLastError());            fprintf(stderr, _("%s: could not wait for child thread: %s\n"),                    progname, strerror(errno));            exit(1);        }        if (GetExitCodeThread((HANDLE) bgchild_handle, &status) == 0)        {            _dosmaperr(GetLastError());            fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),                    progname, strerror(errno));            exit(1);        }        if (status != 0)        {            fprintf(stderr, _("%s: child thread exited with error %u\n"),                    progname, (unsigned int) status);            exit(1);        }        /* Exited normally, we're happy */#endif    }    /* Free the configuration file contents */    //释放配置文件内存    destroyPQExpBuffer(recoveryconfcontents);    /*     * End of copy data. Final result is already checked inside the loop.     * 拷贝数据完成.最终结果已在循环中检查.     */    PQclear(res);    PQfinish(conn);    conn = NULL;    /*     * Make data persistent on disk once backup is completed. For tar format     * once syncing the parent directory is fine, each tar file created per     * tablespace has been already synced. In plain format, all the data of     * the base directory is synced, taking into account all the tablespaces.     * Errors are not considered fatal.     * 在备份结束后,持久化数据在磁盘上.     * 对于tar格式只需要同步父目录即可,每一个表空间创建一个tar文件,这些文件已同步.     * 对于普通格式,基础目录中的所有数据已同步,已兼顾了所有的表空间.     * 错误不会认为是致命的异常.     */    if (do_sync)    {        if (verbose)            fprintf(stderr,                    _("%s: syncing data to disk ...\n"), progname);        if (format == 't')        {            if (strcmp(basedir, "-") != 0)                (void) fsync_fname(basedir, true, progname);        }        else        {            (void) fsync_pgdata(basedir, progname, serverVersion);        }    }    if (verbose)        fprintf(stderr, _("%s: base backup completed\n"), progname);}/* * PQgetvalue: *  return the value of field 'field_num' of row 'tup_num' *  返回tuples数组第field_num字段的第tup_num行. */char *PQgetvalue(const PGresult *res, int tup_num, int field_num){    if (!check_tuple_field_number(res, tup_num, field_num))        return NULL;    return res->tuples[tup_num][field_num].value;}

三、跟踪分析

备份命令

pg_basebackup -h localhost -U xdb -p 5432 -D /data/backup -P -Xs -R

启动gdb跟踪

[xdb@localhost ~]$ gdb pg_basebackupGNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-110.el7Copyright (C) 2013 Free Software Foundation, Inc.License GPLv3+: GNU GPL version 3 or later This is free software: you are free to change and redistribute it.There is NO WARRANTY, to the extent permitted by law.  Type "show copying"and "show warranty" for details.This GDB was configured as "x86_64-redhat-linux-gnu".For bug reporting instructions, please see:...Reading symbols from /appdb/atlasdb/pg11.2/bin/pg_basebackup...done.(gdb) b BaseBackup(gdb) set args -h localhost -U xdb -p 5432 -D /data/backup -P -Xs -R (gdb) rStarting program: /appdb/atlasdb/pg11.2/bin/pg_basebackup -h localhost -U xdb -p 5432 -D /data/backup -P -Xs -R [Thread debugging using libthread_db enabled]Using host libthread_db library "/lib64/libthread_db.so.1".Breakpoint 1, BaseBackup () at pg_basebackup.c:17401740        char       *maxrate_clause = NULL;(gdb)

连接pg_conn结构体

(gdb) n1749        Assert(conn != NULL);(gdb) p *conn$1 = {pghost = 0x6282d0 "localhost", pghostaddr = 0x0, pgport = 0x6282f0 "5432", pgtty = 0x628310 "",   connect_timeout = 0x0, client_encoding_initial = 0x0, pgoptions = 0x628330 "", appname = 0x0,   fbappname = 0x628350 "pg_basebackup", dbName = 0x6282b0 "replication", replication = 0x6283d0 "true",   pguser = 0x628290 "xdb", pgpass = 0x0, pgpassfile = 0x628d30 "/home/xdb/.pgpass", keepalives = 0x0,   keepalives_idle = 0x0, keepalives_interval = 0x0, keepalives_count = 0x0, sslmode = 0x628370 "prefer",   sslcompression = 0x628390 "0", sslkey = 0x0, sslcert = 0x0, sslrootcert = 0x0, sslcrl = 0x0, requirepeer = 0x0,   krbsrvname = 0x6283b0 "postgres", target_session_attrs = 0x6283f0 "any", Pfdebug = 0x0, noticeHooks = {    noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0,     noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0,   eventArraySize = 0, status = CONNECTION_OK, asyncStatus = PGASYNC_IDLE, xactStatus = PQTRANS_IDLE,   queryclass = PGQUERY_SIMPLE, last_query = 0x61f1c0 "SHOW wal_segment_size", last_sqlstate = "\000\000\000\000\000",   options_valid = true, nonblocking = false, singleRowMode = false, copy_is_binary = 0 '\000', copy_already_done = 0,   notifyHead = 0x0, notifyTail = 0x0, nconnhost = 1, whichhost = 0, connhost = 0x627a50, sock = 7, laddr = {addr = {      ss_family = 10, __ss_padding = "\307\326", '\000' , "\001", '\000' ,       __ss_align = 0}, salen = 28}, raddr = {addr = {ss_family = 10,       __ss_padding = "\025\070", '\000' , "\001", '\000' , __ss_align = 0},     salen = 28}, pversion = 196608, sversion = 110002, auth_req_received = true, password_needed = false,   sigpipe_so = false, sigpipe_flag = true, try_next_addr = false, try_next_host = false, addr_cur = 0x0,   setenv_state = SETENV_STATE_IDLE, next_eo = 0x0, send_appname = true, be_pid = 1435, be_key = -828773845,   pstatus = 0x629570, client_encoding = 0, std_strings = true, verbosity = PQERRORS_DEFAULT,   show_context = PQSHOW_CONTEXT_ERRORS, lobjfuncs = 0x0, inBuffer = 0x61f600 "T", inBufSize = 16384, inStart = 75,   inCursor = 75, inEnd = 75, outBuffer = 0x623610 "Q", outBufSize = 16384, outCount = 0, outMsgStart = 1, outMsgEnd = 27,   rowBuf = 0x627620, rowBufLen = 32, result = 0x0, next_result = 0x0, sasl_state = 0x0, ssl_in_use = false,   allow_ssl_try = false, wait_ssl_try = false, ssl = 0x0, peer = 0x0, engine = 0x0, gctx = 0x0, gtarg_nam = 0x0,   errorMessage = {data = 0x627830 "", len = 0, maxlen = 256}, workBuffer = {data = 0x627940 "SELECT", len = 6,     maxlen = 256}, addrlist = 0x0, addrlist_family = 0}(gdb)

判断版本,是否支持BaseBackup

(gdb) n1755        minServerMajor = 901;(gdb) 1756        maxServerMajor = PG_VERSION_NUM / 100;(gdb) p PG_VERSION_NUM$2 = 110002(gdb) n1757        serverVersion = PQserverVersion(conn);(gdb) 1758        serverMajor = serverVersion / 100;(gdb) 1759        if (serverMajor < minServerMajor || serverMajor > maxServerMajor)(gdb) p serverVersion$3 = 110002(gdb) n

判断服务器是否支持WAL streaming

(gdb) n1772        if (includewal == STREAM_WAL && !CheckServerVersionForStreaming(conn))(gdb)

如需要,生成recovery.conf文件

1785        if (writerecoveryconf)(gdb) p includewal$4 = STREAM_WAL(gdb) p writerecoveryconf$5 = true(gdb) n1786            GenerateRecoveryConf(conn);(gdb)

获取系统标识符

1791        if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))(gdb) 1797        PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);(gdb) p sysidentifier$6 = 0x6292d0 "6662151435832250464"(gdb) p *sysidentifier$7 = 54 '6'(gdb) p latesttli$8 = 1(gdb)

开始实际的备份工作

(gdb) p escaped_label$9 = "pg_basebackup base backup\000\000\000\001", '\000' , "\"`-\360\377\177\000\000\000\000\000\000\377\177\000\000` u\367\377\177\000\000\000\001\000\000\000\000\000\000\001\000\000\000\002\000\000\000]VA\000\000\000\000\000@\335\377\377\377\177\000\000b\343\377\377\377\177", '\000' , "\343\377\377\377\177\000\000B\335\377\377\377\177\000\000\370Ǹ\367\377\177\000\000\220\325\227\367\377\177\000\000\000\341\377\377\377\177\000\000\000\000\000\000\000\000\000\000\371D"...(gdb) p label$10 = 0x412610 "pg_basebackup base backup"(gdb) p i$11 = 0(gdb)

构造backup命令

(gdb) n1802        if (verbose)(gdb) 1807        if (showprogress && !verbose)(gdb) 1809            fprintf(stderr, "waiting for checkpoint");(gdb) waiting for checkpoint1810          if (isatty(fileno(stderr)))(gdb) 1811                fprintf(stderr, "\r");(gdb) 1817            psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",(gdb) 1824                     format == 't' ? "TABLESPACE_MAP" : "",(gdb) 1817            psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",(gdb) 1822                     includewal == NO_WAL ? "" : "NOWAIT",(gdb) 1817            psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",(gdb) 1820                     includewal == FETCH_WAL ? "WAL" : "",(gdb) 1817            psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",(gdb) 1816        basebkp =(gdb) 1827        if (PQsendQuery(conn, basebkp) == 0)(gdb) (gdb) p basebkp$12 = 0x6291f0 "BASE_BACKUP LABEL 'pg_basebackup base backup' PROGRESS   NOWAIT   "(gdb)

发送命令到服务器端,获取执行结果

(gdb) n1837        res = PQgetResult(conn);(gdb) 1838        if (PQresultStatus(res) != PGRES_TUPLES_OK)(gdb)

返回结果

(gdb) p *res$13 = {ntups = 1, numAttributes = 2, attDescs = 0x629688, tuples = 0x629e90, tupArrSize = 128, numParameters = 0,   paramDescs = 0x0, resultStatus = PGRES_TUPLES_OK,   cmdStatus = "SELECT\000\000\000\000\000\000\000\000\000\000\027\000\000\000\004\000\000\000\377\377\377\377:\226b", '\000' , "\031\000\000\000\377\377\377\377\377\377\377\377B\226b", binary = 0, noticeHooks = {    noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0,     noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0,   client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x629680,   curOffset = 133, spaceLeft = 1915}(gdb)(gdb) p *res->attDescs$14 = {name = 0x6296c8 "recptr", tableid = 0, columnid = 0, format = 0, typid = 25, typlen = -1, atttypmod = 0}(gdb) p *res->tuples$15 = (PGresAttValue *) 0x6296d8(gdb) p **res->tuples$16 = {len = 10, value = 0x6296f8 "1/57000028"}(gdb) p *res->tuples[2]Cannot access memory at address 0x0(gdb) p *res->tuples[0]$17 = {len = 10, value = 0x6296f8 "1/57000028"}(gdb) p *res->tuples[1]Cannot access memory at address 0x15171(gdb)

判断ntuples,获取WAL start位置

(gdb) n1844        if (PQntuples(res) != 1)(gdb) 1852        strlcpy(xlogstart, PQgetvalue(res, 0, 0), sizeof(xlogstart));(gdb) p PQgetvalue(res, 0, 0)$18 = 0x6296f8 "1/57000028"(gdb) p xlogstart$19 = " `-\360\377\177\000\000\353\340\377\377\377\177\000\000\360\340a", '\000' , "\360\337\377\377\377\177", '\000' (gdb) n1854        if (verbose)(gdb) p xlogstart$20 = "1/57000028\000\377\377\177\000\000\360\340a", '\000' , "\360\337\377\377\377\177", '\000' (gdb)

获取时间线timeline

(gdb) n1862        if (PQnfields(res) >= 2)(gdb) p PQnfields(res)$21 = 2(gdb) n1863            starttli = atoi(PQgetvalue(res, 0, 1));(gdb) 1866        PQclear(res);(gdb) p atoi(PQgetvalue(res, 0, 1))$22 = 1(gdb)  p res->tuples[1]$23 = (PGresAttValue *) 0x15171(gdb)  p res->tuples[0]$24 = (PGresAttValue *) 0x6296d8(gdb) (gdb) n1867        MemSet(xlogend, 0, sizeof(xlogend));(gdb) 1869        if (verbose && includewal != NO_WAL)(gdb) p xlogend$25 = '\000' 

Get the header

(gdb) n1876        res = PQgetResult(conn);(gdb) n1877        if (PQresultStatus(res) != PGRES_TUPLES_OK)(gdb) p *res$26 = {ntups = 1, numAttributes = 3, attDescs = 0x629688, tuples = 0x629e90, tupArrSize = 128, numParameters = 0,   paramDescs = 0x0, resultStatus = PGRES_TUPLES_OK,   cmdStatus = "SELECT\000\000\000\000\000\000\000\000\000\000\027\000\000\000\004\000\000\000\377\377\377\377:\226b", '\000' , "\031\000\000\000\377\377\377\377\377\377\377\377B\226b", binary = 0, noticeHooks = {    noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0,     noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0,   client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x629680,   curOffset = 183, spaceLeft = 1865}

统计总大小,用于进度报告

(gdb) 1892        totalsize = totaldone = 0;(gdb) n1893        tablespacecount = PQntuples(res);(gdb) 1894        for (i = 0; i < PQntuples(res); i++)(gdb) p tablespacecount$29 = 1(gdb) n1896            totalsize += atol(PQgetvalue(res, i, 2));(gdb) p PQgetvalue(res, i, 2)$30 = 0x629730 "445480"(gdb) p atol(PQgetvalue(res, i, 2))$31 = 445480(gdb) n1903            if (format == 'p' && !PQgetisnull(res, i, 1))(gdb) 1894        for (i = 0; i < PQntuples(res); i++)(gdb) p res->tuples[0][0]$33 = {len = -1, value = 0x629658 ""}(gdb) p res->tuples[0][1]$34 = {len = -1, value = 0x629658 ""}(gdb) p res->tuples[0][2]$35 = {len = 6, value = 0x629730 "445480"}(gdb)

开始接收实际的数据chunks前,开始streaming session.

(gdb) n1914        if (format == 't' && strcmp(basedir, "-") == 0 && PQntuples(res) > 1)(gdb) 1926        if (includewal == STREAM_WAL)(gdb) 1928            if (verbose)(gdb) 1931            StartLogStreamer(xlogstart, starttli, sysidentifier);(gdb) nDetaching after fork from child process 1511.1937        for (i = 0; i < PQntuples(res); i++)(gdb)

查看操作系统中的日志目录

[xdb@localhost backup]$ lltotal 0drwx------. 3 xdb xdb 60 Mar 15 15:46 pg_wal[xdb@localhost backup]$ ll ./pg_wal/total 16384-rw-------. 1 xdb xdb 16777216 Mar 15 15:46 000000010000000100000057drwx------. 2 xdb xdb        6 Mar 15 15:46 archive_status[xdb@localhost backup]$

Start receiving chunks,开始接收chunks

(gdb) nDetaching after fork from child process 1511.1937        for (i = 0; i < PQntuples(res); i++)(gdb) n1939            if (format == 't')(gdb) 1942                ReceiveAndUnpackTarFile(conn, res, i);(gdb) 193789/445489 kB(gdb) for (i = 0; i < PQntuples(res); i++)

查看操作系统中的备份目录

[xdb@localhost backup]$ lltotal 56-rw-------. 1 xdb xdb   226 Mar 15 15:47 backup_labeldrwx------. 6 xdb xdb    58 Mar 15 15:48 basedrwx------. 2 xdb xdb  4096 Mar 15 15:48 globaldrwx------. 2 xdb xdb     6 Mar 15 15:47 pg_commit_tsdrwx------. 2 xdb xdb     6 Mar 15 15:47 pg_dynshmem-rw-------. 1 xdb xdb  4513 Mar 15 15:48 pg_hba.conf-rw-------. 1 xdb xdb  1636 Mar 15 15:48 pg_ident.confdrwx------. 4 xdb xdb    68 Mar 15 15:48 pg_logicaldrwx------. 4 xdb xdb    36 Mar 15 15:47 pg_multixactdrwx------. 2 xdb xdb     6 Mar 15 15:47 pg_notifydrwx------. 2 xdb xdb     6 Mar 15 15:48 pg_replslotdrwx------. 2 xdb xdb     6 Mar 15 15:47 pg_serialdrwx------. 2 xdb xdb     6 Mar 15 15:47 pg_snapshotsdrwx------. 2 xdb xdb     6 Mar 15 15:48 pg_statdrwx------. 2 xdb xdb     6 Mar 15 15:48 pg_stat_tmpdrwx------. 2 xdb xdb     6 Mar 15 15:47 pg_subtransdrwx------. 2 xdb xdb     6 Mar 15 15:48 pg_tblspcdrwx------. 2 xdb xdb     6 Mar 15 15:47 pg_twophase-rw-------. 1 xdb xdb     3 Mar 15 15:48 PG_VERSIONdrwx------. 3 xdb xdb    92 Mar 15 15:48 pg_waldrwx------. 2 xdb xdb    18 Mar 15 15:48 pg_xact-rw-------. 1 xdb xdb    88 Mar 15 15:48 postgresql.auto.conf-rw-------. 1 xdb xdb 23812 Mar 15 15:48 postgresql.conf-rw-------. 1 xdb xdb   183 Mar 15 15:48 recovery.conf[xdb@localhost backup]$

显示进度

(gdb) n1945        if (showprogress)(gdb) 1947            progress_report(PQntuples(res), NULL, true);(gdb) 194889/445489 kB (100%),if (isatty(fileno(stderr)))(gdb) (gdb) n1949                fprintf(stderr, "\n");  /* Need to move to next line */(gdb) 1952        PQclear(res);(gdb)

Get the stop position

(gdb) 1957        res = PQgetResult(conn);(gdb) n1958        if (PQresultStatus(res) != PGRES_TUPLES_OK)(gdb) p *res$36 = {ntups = 1, numAttributes = 2, attDescs = 0x6295a8, tuples = 0x629db0, tupArrSize = 128, numParameters = 0,   paramDescs = 0x0, resultStatus = PGRES_TUPLES_OK,   cmdStatus = "SELECT", '\000' , "\300!", , binary = 0, noticeHooks = {    noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0,     noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0,   client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x6295a0,   curOffset = 133, spaceLeft = 1915}(gdb) (gdb) n1965        if (PQntuples(res) != 1)(gdb) 1972        strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend));(gdb) p xlogend$37 = '\000' (gdb) p *xlogend$38 = 0 '\000'(gdb) n1973        if (verbose && includewal != NO_WAL)(gdb) 1975        PQclear(res);(gdb)

COMMAND is OK

(gdb) 1977        res = PQgetResult(conn);(gdb) 1978        if (PQresultStatus(res) != PGRES_COMMAND_OK)(gdb) p *res$39 = {ntups = 0, numAttributes = 0, attDescs = 0x0, tuples = 0x0, tupArrSize = 0, numParameters = 0, paramDescs = 0x0,   resultStatus = PGRES_COMMAND_OK, cmdStatus = "SELECT", '\000' , "\300!", ,   binary = 0, noticeHooks = {noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0,     noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0,   client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x0, curOffset = 0,   spaceLeft = 0}(gdb)

善后工作,如在备份结束后,持久化数据在磁盘上等

(gdb) n1997        if (bgchild > 0)(gdb) 2014            if (verbose)(gdb) 2019            if (write(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))(gdb) 2028            r = waitpid(bgchild, &status, 0);(gdb) p bgchild$40 = 1511(gdb) n2029            if (r == -1)(gdb) 2035            if (r != bgchild)(gdb) p r$41 = 1511(gdb) n2041            if (!WIFEXITED(status))(gdb) 2047            if (WEXITSTATUS(status) != 0)(gdb) 2098        destroyPQExpBuffer(recoveryconfcontents);(gdb) 2103        PQclear(res);(gdb) 2104        PQfinish(conn);(gdb) 2113        if (do_sync)(gdb) 2115            if (format == 't')(gdb) 2122                (void) fsync_pgdata(basedir, progname, serverVersion);(gdb) 2126        if (verbose)(gdb) 2128    }(gdb) main (argc=12, argv=0x7fffffffe4b8) at pg_basebackup.c:25342534        success = true;(gdb)

再次启动跟踪,监控后台数据库的活动.
进入BaseBackup函数

Breakpoint 1, BaseBackup () at pg_basebackup.c:17401740        char       *maxrate_clause = NULL;(gdb)

数据库活动

15:56:25 (xdb@[local]:5432)testdb=# select * from pg_stat_activity[local] xdb@testdb-# where backend_type not in ('walwriter', 'checkpointer', 'background writer', 'logical replication launcher', 'autovacuum launcher') and query not like '%pg_stat_activity%';-[ RECORD 1 ]----+------------------------------datid            | datname          | pid              | 1566usesysid         | 10usename          | xdbapplication_name | pg_basebackupclient_addr      | ::1client_hostname  | client_port      | 51162backend_start    | 2019-03-15 15:56:13.82013+08xact_start       | query_start      | state_change     | 2019-03-15 15:56:13.821507+08wait_event_type  | Clientwait_event       | ClientReadstate            | idlebackend_xid      | backend_xmin     | query            | backend_type     | walsender

开启WAL streaming

1931            StartLogStreamer(xlogstart, starttli, sysidentifier);(gdb) Detaching after fork from child process 1602.1937        for (i = 0; i < PQntuples(res); i++)(gdb)

数据库活动

16:01:25 (xdb@[local]:5432)testdb=# select * from pg_stat_activitywhere backend_type not in ('walwriter', 'checkpointer', 'background writer', 'logical replication launcher', 'autovacuum launcher') and query not like '%pg_stat_activity%';-[ RECORD 1 ]----+------------------------------datid            | datname          | pid              | 1566usesysid         | 10usename          | xdbapplication_name | pg_basebackupclient_addr      | ::1client_hostname  | client_port      | 51162backend_start    | 2019-03-15 15:56:13.82013+08xact_start       | query_start      | state_change     | 2019-03-15 16:00:47.345326+08wait_event_type  | Clientwait_event       | ClientWritestate            | activebackend_xid      | backend_xmin     | query            | backend_type     | walsender-[ RECORD 2 ]----+------------------------------datid            | datname          | pid              | 1601usesysid         | 10usename          | xdbapplication_name | pg_basebackupclient_addr      | ::1client_hostname  | client_port      | 51164backend_start    | 2019-03-15 16:01:47.150434+08xact_start       | query_start      | state_change     | 2019-03-15 16:01:47.159234+08wait_event_type  | Activitywait_event       | WalSenderMainstate            | activebackend_xid      | backend_xmin     | query            | backend_type     | walsender16:01:56 (xdb@[local]:5432)testdb=# 16:01:56 (xdb@[local]:5432)testdb=

拷贝数据

(gdb) 1942                ReceiveAndUnpackTarFile(conn, res, i);(gdb) 193789/445489 kBfor (i = 0; i < PQntuples(res); i++)(gdb) 1945        if (showprogress)(gdb)

数据库活动

...-[ RECORD 3 ]----+------------------------------datid            | datname          | pid              | 1352usesysid         | usename          | application_name | client_addr      | client_hostname  | client_port      | backend_start    | 2019-03-15 15:03:01.923092+08xact_start       | query_start      | state_change     | wait_event_type  | Activitywait_event       | WalWriterMainstate            | backend_xid      | backend_xmin     | query            | backend_type     | walwriter

执行善后工作

2113        if (do_sync)(gdb) 2115            if (format == 't')(gdb) 2122                (void) fsync_pgdata(basedir, progname, serverVersion);(gdb) 2126        if (verbose)(gdb) 2128    }(gdb)

数据库活动

16:05:01 (xdb@[local]:5432)testdb=# select * from pg_stat_activitywhere backend_type not in ('checkpointer', 'background writer', 'logical replication launcher', 'autovacuum launcher') and query not like '%pg_stat_activity%';-[ RECORD 1 ]----+------------------------------datid            | datname          | pid              | 1352usesysid         | usename          | application_name | client_addr      | client_hostname  | client_port      | backend_start    | 2019-03-15 15:03:01.923092+08xact_start       | query_start      | state_change     | wait_event_type  | Activitywait_event       | WalWriterMainstate            | backend_xid      | backend_xmin     | query            | backend_type     | walwriter

DONE!

四、参考资料

PG Source Code

0