千家信息网

PostgreSQL中StartLogStreamer分析

发表于:2025-01-21 作者:千家信息网编辑
千家信息网最后更新 2025年01月21日,本篇内容主要讲解"PostgreSQL中StartLogStreamer分析",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"PostgreSQL中Start
千家信息网最后更新 2025年01月21日PostgreSQL中StartLogStreamer分析

本篇内容主要讲解"PostgreSQL中StartLogStreamer分析",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"PostgreSQL中StartLogStreamer分析"吧!

本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的BaseBackup中对WAL数据进行备份的实现函数StartLogStreamer.

一、数据结构

logstreamer_param
WAL data streamer参数.

typedef struct{     ////后台连接    PGconn     *bgconn;    //开始位置    XLogRecPtr  startptr;    //目录或者tar文件,依赖于使用的模式    char        xlog[MAXPGPATH];    /* directory or tarfile depending on mode */    //系统标识符    char       *sysidentifier;    //时间线    int         timeline;} logstreamer_param;

StreamCtl
接收xlog流数据时的全局参数

/* * Global parameters when receiving xlog stream. For details about the individual fields, * see the function comment for ReceiveXlogStream(). * 接收xlog流数据时的全局参数. * 每个域字段的详细解释,参见ReceiveXlogStream()函数注释. */typedef struct StreamCtl{    //streaming的开始位置    XLogRecPtr  startpos;       /* Start position for streaming */    //时间线    TimeLineID  timeline;       /* Timeline to stream data from */    //系统标识符    char       *sysidentifier;  /* Validate this system identifier and                                 * timeline */    //standby超时信息    int         standby_message_timeout;    /* Send status messages this often */    //是否同步(写入时是否马上Flush WAL data)    bool        synchronous;    /* Flush immediately WAL data on write */    //在已归档的数据中标记segment为已完成    bool        mark_done;      /* Mark segment as done in generated archive */    //刷新到磁盘上以确保数据的一致性状态(是否已刷新到磁盘上)    bool        do_sync;        /* Flush to disk to ensure consistent state of                                 * data */    //在返回T时停止streaming    stream_stop_callback stream_stop;   /* Stop streaming when returns true */    //如有效,监测该socket中的输入并检查stream_stop()的返回    pgsocket    stop_socket;    /* if valid, watch for input on this socket                                 * and check stream_stop() when there is any */    //如何写WAL    WalWriteMethod *walmethod;  /* How to write the WAL */    //附加到部分接受文件的后缀    char       *partial_suffix; /* Suffix appended to partially received files */    //使用的replication slot,如无则为NULL    char       *replication_slot;   /* Replication slot to use, or NULL */} StreamCtl;

二、源码解读

StartLogStreamer
StartLogStreamer用于在备份时初始化后台进程用于接收WAL.接收进程将创建自己的数据库连接以并行的方式对文件进行streaming复制.

/* * Initiate background process for receiving xlog during the backup. * The background stream will use its own database connection so we can * stream the logfile in parallel with the backups. * 在备份时初始化后台进程用于接收WAL. * 后台stream进程将用自己的数据库连接以使以并行的方式stream文件. */static voidStartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier){    //参数    logstreamer_param *param;    uint32      hi,                lo;//高位/低位    char        statusdir[MAXPGPATH];    param = pg_malloc0(sizeof(logstreamer_param));    param->timeline = timeline;    param->sysidentifier = sysidentifier;    /* Convert the starting position */    //转换开始位置(高低位转换)    if (sscanf(startpos, "%X/%X", &hi, &lo) != 2)    {        fprintf(stderr,                _("%s: could not parse write-ahead log location \"%s\"\n"),                progname, startpos);        exit(1);    }    //开始位置,转换为64bit的地址    param->startptr = ((uint64) hi) << 32 | lo;    /* Round off to even segment position */    //按segment取整    param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz);#ifndef WIN32    //WIN32使用的代码    /* Create our background pipe */    if (pipe(bgpipe) < 0)    {        fprintf(stderr,                _("%s: could not create pipe for background process: %s\n"),                progname, strerror(errno));        exit(1);    }#endif    /* Get a second connection */    //获取第二个连接    param->bgconn = GetConnection();    if (!param->bgconn)        /* Error message already written in GetConnection() */        exit(1);    /* In post-10 cluster, pg_xlog has been renamed to pg_wal */    //在PG 10,pg_xlog已命名为pg_wal    snprintf(param->xlog, sizeof(param->xlog), "%s/%s",             basedir,             PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?             "pg_xlog" : "pg_wal");    /* Temporary replication slots are only supported in 10 and newer */    //临时复制slots只在PG10+支持    if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)        temp_replication_slot = false;    /*     * Create replication slot if requested     * 如要求,则创建复制slot     */    //static char *replication_slot = NULL;    //static bool temp_replication_slot = true;    if (temp_replication_slot && !replication_slot)        //创建replication slot        replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));    if (temp_replication_slot || create_slot)    {        //创建replication slot        if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,                                   temp_replication_slot, true, true, false))            exit(1);        if (verbose)        {            //显示诊断信息            if (temp_replication_slot)                fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),                        progname, replication_slot);            else                fprintf(stderr, _("%s: created replication slot \"%s\"\n"),                        progname, replication_slot);        }    }    if (format == 'p')    {        /*         * Create pg_wal/archive_status or pg_xlog/archive_status (and thus         * pg_wal or pg_xlog) depending on the target server so we can write         * to basedir/pg_wal or basedir/pg_xlog as the directory entry in the         * tar file may arrive later.         * 基于目标服务器创建pg_wal/archive_status或pg_xlog/archive_status,         * 这样可以写入到basedir/pg_wal 货 basedir/pg_xlog,可作为后续访问的tar文件目录条目         */        snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",                 basedir,                 PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?                 "pg_xlog" : "pg_wal");        if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST)        {            fprintf(stderr,                    _("%s: could not create directory \"%s\": %s\n"),                    progname, statusdir, strerror(errno));            exit(1);        }    }    /*     * Start a child process and tell it to start streaming. On Unix, this is     * a fork(). On Windows, we create a thread.     * 启动子进程开始streaming.     * 在UNIX平台,是一个fork进程,在Windows平台,创建线程.     */#ifndef WIN32    //UNIX:fork进程    bgchild = fork();    if (bgchild == 0)    {        //这是子进程,返回0        /* in child process */        //启动新进程        exit(LogStreamerMain(param));    }    else if (bgchild < 0)    {        fprintf(stderr, _("%s: could not create background process: %s\n"),                progname, strerror(errno));        exit(1);    }    /*     * Else we are in the parent process and all is well.     * 在父进程中,返回的bgchild是子进程PID.     */    atexit(kill_bgchild_atexit);#else                           /* WIN32 */    //WIN32:创建线程    bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);    if (bgchild == 0)    {        fprintf(stderr, _("%s: could not create background thread: %s\n"),                progname, strerror(errno));        exit(1);    }#endif}

LogStreamerMain
WAL流复制主函数,用于fork后的子进程调用

static intLogStreamerMain(logstreamer_param *param){    StreamCtl   stream;//接收xlog流数据时的全局参数    in_log_streamer = true;    //初始化StreamCtl结构体    MemSet(&stream, 0, sizeof(stream));    stream.startpos = param->startptr;    stream.timeline = param->timeline;    stream.sysidentifier = param->sysidentifier;    stream.stream_stop = reached_end_position;#ifndef WIN32    stream.stop_socket = bgpipe[0];#else    stream.stop_socket = PGINVALID_SOCKET;#endif    stream.standby_message_timeout = standby_message_timeout;    stream.synchronous = false;    stream.do_sync = do_sync;    stream.mark_done = true;    stream.partial_suffix = NULL;    stream.replication_slot = replication_slot;    if (format == 'p')        stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);    else        stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);    //接收数据    if (!ReceiveXlogStream(param->bgconn, &stream))        /*         * Any errors will already have been reported in the function process,         * but we need to tell the parent that we didn't shutdown in a nice         * way.         * 在函数执行过程中出现的错误已通过警告的方式发出,         * 但仍需要告知父进程不能优雅的关闭本进程.         */        return 1;    if (!stream.walmethod->finish())    {        fprintf(stderr,                _("%s: could not finish writing WAL files: %s\n"),                progname, strerror(errno));        return 1;    }    //结束连接    PQfinish(param->bgconn);    //普通文件格式    if (format == 'p')        FreeWalDirectoryMethod();    else        FreeWalTarMethod();    //是否内存    pg_free(stream.walmethod);    return 0;}

三、跟踪分析

备份命令

pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

启动gdb跟踪

[xdb@localhost ~]$ gdb pg_basebackupGNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-110.el7Copyright (C) 2013 Free Software Foundation, Inc.License GPLv3+: GNU GPL version 3 or later This is free software: you are free to change and redistribute it.There is NO WARRANTY, to the extent permitted by law.  Type "show copying"and "show warranty" for details.This GDB was configured as "x86_64-redhat-linux-gnu".For bug reporting instructions, please see:...Reading symbols from /appdb/atlasdb/pg11.2/bin/pg_basebackup...done.(gdb) b StartLogStreamerBreakpoint 1 at 0x403e6b: file pg_basebackup.c, line 555.(gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v(gdb) rStarting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v[Thread debugging using libthread_db enabled]Using host libthread_db library "/lib64/libthread_db.so.1".Password: pg_basebackup: initiating base backup, waiting for checkpoint to completepg_basebackup: checkpoint completedpg_basebackup: write-ahead log start point: 0/57000060 on timeline 16pg_basebackup: starting background WAL receiverBreakpoint 1, StartLogStreamer (startpos=0x7fffffffdf60 "0/57000060", timeline=16,     sysidentifier=0x61f1a0 "6666964067616600474") at pg_basebackup.c:555555     param = pg_malloc0(sizeof(logstreamer_param));(gdb)

输入参数
startpos=0x7fffffffdf60 "0/57000060",
timeline=16,
sysidentifier=0x61f1a0 "6666964067616600474"
构造参数

(gdb) n556     param->timeline = timeline;(gdb) 557     param->sysidentifier = sysidentifier;(gdb) 560     if (sscanf(startpos, "%X/%X", &hi, &lo) != 2)(gdb) 567     param->startptr = ((uint64) hi) << 32 | lo;(gdb) p hi$1 = 0(gdb) p lo$2 = 1459617888(gdb) n569     param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz);(gdb) n573     if (pipe(bgpipe) < 0)(gdb) p *param$3 = {bgconn = 0x0, startptr = 1459617792, xlog = '\000' ,   sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}(gdb)

建立连接,创建replication slot

(gdb) n583     param->bgconn = GetConnection();(gdb) 584     if (!param->bgconn)(gdb) 591              PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?(gdb) 589     snprintf(param->xlog, sizeof(param->xlog), "%s/%s",(gdb) 595     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)(gdb) 601     if (temp_replication_slot && !replication_slot)(gdb) 602         replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));(gdb) 603     if (temp_replication_slot || create_slot)(gdb) 605         if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,(gdb) 609         if (verbose)(gdb) 611             if (temp_replication_slot)(gdb) 612                 fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),(gdb) pg_basebackup: created temporary replication slot "pg_basebackup_59378"620     if (format == 'p')(gdb) (gdb) n630                  PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?(gdb) 628         snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",

创建备份目录

(gdb) 633         if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST)(gdb) p *param$4 = {bgconn = 0x62a280, startptr = 1459617792, xlog = "/data/backup/pg_wal", '\000' ,   sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}(gdb) n647     bgchild = fork();(gdb) #############[xdb@localhost backup]$ lspg_wal

fork进程,父进程返回子进程的PID

(gdb) n647     bgchild = fork();(gdb) nDetaching after fork from child process 43001.648     if (bgchild == 0)(gdb) p bgchild$5 = 43001(gdb)

子进程(PID=43001)

[xdb@localhost backup]$ ps -ef|grep 43001xdb      43001 42820  1 11:54 pts/1    00:00:01 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v[xdb@localhost backup]$ ps -ef|grep 192.168.26.25xdb      42820 42756  0 11:48 pts/1    00:00:00 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -vxdb      43001 42820  0 11:54 pts/1    00:00:01 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

完成调用

(gdb) n653     else if (bgchild < 0)(gdb) 672 }(gdb) BaseBackup () at pg_basebackup.c:19371937        for (i = 0; i < PQntuples(res); i++)(gdb)

pg_wal目录中的数据

[xdb@localhost backup]$ ls -l ./pg_wal/total 16388-rw-------. 1 xdb xdb 16777216 Mar 18 11:54 000000100000000000000057-rw-------. 1 xdb xdb      217 Mar 18 11:54 00000010.historydrwx------. 2 xdb xdb       35 Mar 18 11:54 archive_status[xdb@localhost backup]$

到此,相信大家对"PostgreSQL中StartLogStreamer分析"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0