千家信息网

PostgreSQL 源码解读(154)- 后台进程#6(walsender#2)

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的exec_replication_command和StartReplication函数.调用栈如下:(gdb) bt
千家信息网最后更新 2025年01月24日PostgreSQL 源码解读(154)- 后台进程#6(walsender#2)

本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的exec_replication_command和StartReplication函数.
调用栈如下:

(gdb) bt#0  0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6#1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0,     nevents=1) at latch.c:1048#2  0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1,     wait_event_info=83886092) at latch.c:1000#3  0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999,     wait_event_info=83886092) at latch.c:385#4  0x000000000085405b in WalSndLoop (send_data=0x8547fe ) at walsender.c:2229#5  0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684#6  0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")    at walsender.c:1539#7  0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")    at postgres.c:4178#8  0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361#9  0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228

一、数据结构

StringInfo
StringInfoData结构体保存关于扩展字符串的相关信息.

/*------------------------- * StringInfoData holds information about an extensible string. * StringInfoData结构体保存关于扩展字符串的相关信息. *      data    is the current buffer for the string (allocated with palloc). *      data    通过palloc分配的字符串缓存 *      len     is the current string length.  There is guaranteed to be *              a terminating '\0' at data[len], although this is not very *              useful when the string holds binary data rather than text. *      len     是当前字符串的长度.保证以ASCII 0(\0)结束(data[len] = '\0'). *              虽然如果存储的是二进制数据而不是文本时不太好使. *      maxlen  is the allocated size in bytes of 'data', i.e. the maximum *              string size (including the terminating '\0' char) that we can *              currently store in 'data' without having to reallocate *              more space.  We must always have maxlen > len. *      maxlen  以字节为单位已分配的'data'的大小,限定了最大的字符串大小(包括结尾的ASCII 0) *              小于此尺寸的数据可以直接存储而无需重新分配. *      cursor  is initialized to zero by makeStringInfo or initStringInfo, *              but is not otherwise touched by the stringinfo.c routines. *              Some routines use it to scan through a StringInfo. *      cursor  通过makeStringInfo或initStringInfo初始化为0,但不受stringinfo.c例程的影响. *              某些例程使用该字段扫描StringInfo *------------------------- */typedef struct StringInfoData{    char       *data;    int         len;    int         maxlen;    int         cursor;} StringInfoData;typedef StringInfoData *StringInfo;

二、源码解读

exec_replication_command
exec_replication_command执行复制命令,如cmd_string被识别为WalSender命令,返回T,否则返回F.
其主要逻辑如下:
1.执行相关初始化和校验
2.切换内存上下文
3.初始化复制扫描器
4.执行事务相关的判断或校验
5.初始化输入输出消息
6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication

/* * Execute an incoming replication command. * 执行复制命令. * * Returns true if the cmd_string was recognized as WalSender command, false * if not. * 如cmd_string被识别为WalSender命令,返回T,否则返回F */boolexec_replication_command(const char *cmd_string){    int         parse_rc;    Node       *cmd_node;    MemoryContext cmd_context;    MemoryContext old_context;    /*     * If WAL sender has been told that shutdown is getting close, switch its     * status accordingly to handle the next replication commands correctly.     * 如果WAL sender已被通知关闭,切换状态以应对接下来的复制命令.     */    if (got_STOPPING)        WalSndSetState(WALSNDSTATE_STOPPING);    /*     * Throw error if in stopping mode.  We need prevent commands that could     * generate WAL while the shutdown checkpoint is being written.  To be     * safe, we just prohibit all new commands.     * 如在stopping模式,则抛出错误.     * 我们需要在shutdown checkpoint写入期间禁止命令的产生.     * 安全期间,禁止所有新的命令.     */    if (MyWalSnd->state == WALSNDSTATE_STOPPING)        ereport(ERROR,                (errmsg("cannot execute new commands while WAL sender is in stopping mode")));    /*     * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next     * command arrives. Clean up the old stuff if there's anything.     * CREATE_REPLICATION_SLOT ... LOGICAL 导出快照直至下个命令到达.     * 如存在,则清理旧的stuff.     *      */    SnapBuildClearExportedSnapshot();    //检查中断    CHECK_FOR_INTERRUPTS();    //命令上下文    cmd_context = AllocSetContextCreate(CurrentMemoryContext,                                        "Replication command context",                                        ALLOCSET_DEFAULT_SIZES);    old_context = MemoryContextSwitchTo(cmd_context);    //初始化复制扫描器    replication_scanner_init(cmd_string);    parse_rc = replication_yyparse();    if (parse_rc != 0)        ereport(ERROR,                (errcode(ERRCODE_SYNTAX_ERROR),                 (errmsg_internal("replication command parser returned %d",                                  parse_rc))));    cmd_node = replication_parse_result;    /*     * Log replication command if log_replication_commands is enabled. Even     * when it's disabled, log the command with DEBUG1 level for backward     * compatibility. Note that SQL commands are not logged here, and will be     * logged later if log_statement is enabled.     * 如log_replication_commands启用,则记录复制命令在日志中.     * 就算该选项被禁止,通过DEBUG1级别记录日志.     * 注意SQL命令不在这里记录,在log_statement启用的情况下在后续进行记录.     *      */    if (cmd_node->type != T_SQLCmd)        ereport(log_replication_commands ? LOG : DEBUG1,                (errmsg("received replication command: %s", cmd_string)));    /*     * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was     * called outside of transaction the snapshot should be cleared here.     * CREATE_REPLICATION_SLOT ... LOGICAL导出快照.     * 该命令如果在事务的外层被调用,那么快照应在这里清除.     */    if (!IsTransactionBlock())        SnapBuildClearExportedSnapshot();    /*     * For aborted transactions, don't allow anything except pure SQL, the     * exec_simple_query() will handle it correctly.     * 对于废弃的事务,除了纯SQL外不允许其他命令,exec_simple_query()函数可以正确处理这种情况.     */    if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))        ereport(ERROR,                (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),                 errmsg("current transaction is aborted, "                        "commands ignored until end of transaction block")));    CHECK_FOR_INTERRUPTS();    /*     * Allocate buffers that will be used for each outgoing and incoming     * message.  We do this just once per command to reduce palloc overhead.     * 为消息I/O分配缓存.     * 每个命令执行一次以减少palloc的负载.     */    initStringInfo(&output_message);    initStringInfo(&reply_message);    initStringInfo(&tmpbuf);    /* Report to pgstat that this process is running */    //向pgstat报告该进程正在运行.    pgstat_report_activity(STATE_RUNNING, NULL);    //根据命令类型执行相应的命令    switch (cmd_node->type)    {        case T_IdentifySystemCmd:            //识别系统            IdentifySystem();            break;        case T_BaseBackupCmd:            //BASE_BACKUP            PreventInTransactionBlock(true, "BASE_BACKUP");            SendBaseBackup((BaseBackupCmd *) cmd_node);            break;        case T_CreateReplicationSlotCmd:            //创建复制slot            CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);            break;        case T_DropReplicationSlotCmd:            //删除复制slot            DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);            break;        case T_StartReplicationCmd:            //START_REPLICATION            {                StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;                PreventInTransactionBlock(true, "START_REPLICATION");                if (cmd->kind == REPLICATION_KIND_PHYSICAL)                    StartReplication(cmd);                else                    StartLogicalReplication(cmd);                break;            }        case T_TimeLineHistoryCmd:            //构造时间线历史 TIMELINE_HISTORY            PreventInTransactionBlock(true, "TIMELINE_HISTORY");            SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);            break;        case T_VariableShowStmt:            //            {                DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);                VariableShowStmt *n = (VariableShowStmt *) cmd_node;                GetPGVariable(n->name, dest);            }            break;        case T_SQLCmd:            //SQL命令            if (MyDatabaseId == InvalidOid)                ereport(ERROR,                        (errmsg("cannot execute SQL commands in WAL sender for physical replication")));            /* Report to pgstat that this process is now idle */            pgstat_report_activity(STATE_IDLE, NULL);            /* Tell the caller that this wasn't a WalSender command. */            return false;        default:            //其他命令            elog(ERROR, "unrecognized replication command node tag: %u",                 cmd_node->type);    }    /* done */    //执行完毕,回到原来的内存上下文中    MemoryContextSwitchTo(old_context);    MemoryContextDelete(cmd_context);    /* Send CommandComplete message */    //命令结束    EndCommand("SELECT", DestRemote);    /* Report to pgstat that this process is now idle */    //报告状态    pgstat_report_activity(STATE_IDLE, NULL);    return true;}

StartReplication
StartReplication处理START_REPLICATION命令.
其主要逻辑如下:
1.执行相关初始化和校验
2.选择时间线
3.进入COPY模式
3.1设置状态
3.2发送CopyBothResponse消息,启动streaming
3.3初始化相关变量,如共享内存状态等
3.4进入主循环(WalSndLoop)

/* * Handle START_REPLICATION command. * 处理START_REPLICATION命令 * * At the moment, this never returns, but an ereport(ERROR) will take us back * to the main loop. * 该函数不会返回,但ereport(ERROR)调用可以回到主循环 */static voidStartReplication(StartReplicationCmd *cmd){    StringInfoData buf;    XLogRecPtr  FlushPtr;    if (ThisTimeLineID == 0)        //时间线校验        ereport(ERROR,                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),                 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));    /*     * We assume here that we're logging enough information in the WAL for     * log-shipping, since this is checked in PostmasterMain().     * 在这里,由于在PostmasterMain()假定已为log-shipping记录了足够多的信息     *     * NOTE: wal_level can only change at shutdown, so in most cases it is     * difficult for there to be WAL data that we can still see that was     * written at wal_level='minimal'.     * 注意:wal_level只能在shutdown的情况下进行修改,     *   因此在大多数情况下,很难看到在wal_level='minimal'的情况下的WAL数据.     */    if (cmd->slotname)    {        ReplicationSlotAcquire(cmd->slotname, true);        //#define SlotIsLogical ( slot ) (slot->data.database != InvalidOid)        if (SlotIsLogical(MyReplicationSlot))            ereport(ERROR,                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),                     (errmsg("cannot use a logical replication slot for physical replication"))));    }    /*     * Select the timeline. If it was given explicitly by the client, use     * that. Otherwise use the timeline of the last replayed record, which is     * kept in ThisTimeLineID.     * 选择时间线.     * 如果通过客户端明确给出,则使用该值.     * 否则的话,使用最后重放记录的时间线,在ThisTimeLineID中保存.     */    if (am_cascading_walsender)    {        /* this also updates ThisTimeLineID */        //这也会更新ThisTimeLineID变量        FlushPtr = GetStandbyFlushRecPtr();    }    else        FlushPtr = GetFlushRecPtr();    if (cmd->timeline != 0)    {        XLogRecPtr  switchpoint;        sendTimeLine = cmd->timeline;        if (sendTimeLine == ThisTimeLineID)        {            sendTimeLineIsHistoric = false;            sendTimeLineValidUpto = InvalidXLogRecPtr;        }        else        {            List       *timeLineHistory;            sendTimeLineIsHistoric = true;            /*             * Check that the timeline the client requested exists, and the             * requested start location is on that timeline.             * 检查客户端请求的时间线是否存在,请求的开始位置是否在该时间线上.             */            timeLineHistory = readTimeLineHistory(ThisTimeLineID);            switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,                                         &sendTimeLineNextTLI);            list_free_deep(timeLineHistory);            /*             * Found the requested timeline in the history. Check that             * requested startpoint is on that timeline in our history.             * 通过历史文件找到请求的时间线.             * 在历史中检查请求的开始点是否在时间线上.             *             * This is quite loose on purpose. We only check that we didn't             * fork off the requested timeline before the switchpoint. We             * don't check that we switched *to* it before the requested             * starting point. This is because the client can legitimately             * request to start replication from the beginning of the WAL             * segment that contains switchpoint, but on the new timeline, so             * that it doesn't end up with a partial segment. If you ask for             * too old a starting point, you'll get an error later when we             * fail to find the requested WAL segment in pg_wal.             * 这是有意为之.我们只检查在切换点之前没有fork off的请求的时间线.             * 我们不会检查在请求的开始点之前的时间线.             * 这是因为客户端可以合法地请求从包含交换点的WAL端的开始处进行复制,             *   在新的时间线上如此执行,以避免出现由于部分segment的问题导致出错.             * 如果客户端请求一个较旧的开始点,在pg_wal中无法找到请求的WAL段时会报错.             *             * XXX: we could be more strict here and only allow a startpoint             * that's older than the switchpoint, if it's still in the same             * WAL segment.             * XXX: 我们可以更严格,如果仍然在同一个WAL segment中,那么可以只允许比切换点旧的开始点             */            if (!XLogRecPtrIsInvalid(switchpoint) &&                switchpoint < cmd->startpoint)            {                ereport(ERROR,                        (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",                                (uint32) (cmd->startpoint >> 32),                                (uint32) (cmd->startpoint),                                cmd->timeline),                         errdetail("This server's history forked from timeline %u at %X/%X.",                                   cmd->timeline,                                   (uint32) (switchpoint >> 32),                                   (uint32) (switchpoint))));            }            sendTimeLineValidUpto = switchpoint;        }    }    else    {        sendTimeLine = ThisTimeLineID;        sendTimeLineValidUpto = InvalidXLogRecPtr;        sendTimeLineIsHistoric = false;    }    streamingDoneSending = streamingDoneReceiving = false;    /* If there is nothing to stream, don't even enter COPY mode */    //如果没有任何东西需要stream,不需要启动COPY命令    if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)    {        /*         * When we first start replication the standby will be behind the         * primary. For some applications, for example synchronous         * replication, it is important to have a clear state for this initial         * catchup mode, so we can trigger actions when we change streaming         * state later. We may stay in this state for a long time, which is         * exactly why we want to be able to monitor whether or not we are         * still here.         * 在首次启动复制时,standby节点会落后于master节点.         * 对于某些应用,比如同步复制,对于这种初始的catchup模式有一个干净的状态是十分重要的,         *   因此在改变streaming状态时我们可以触发相关的动作.         * 我们可以处于这种状态很长时间,这正是我们希望有能力监控我们是否仍在这里的原因.         */        //设置状态        WalSndSetState(WALSNDSTATE_CATCHUP);        /* Send a CopyBothResponse message, and start streaming */        //发送CopyBothResponse消息,启动streaming        pq_beginmessage(&buf, 'W');//W->COPY命令?        pq_sendbyte(&buf, 0);        pq_sendint16(&buf, 0);        pq_endmessage(&buf);        pq_flush();        /*         * Don't allow a request to stream from a future point in WAL that         * hasn't been flushed to disk in this server yet.         * 不允许请求该服务器上一个尚未刷入到磁盘上的WAL未来位置.         */        if (FlushPtr < cmd->startpoint)        {            ereport(ERROR,                    (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",                            (uint32) (cmd->startpoint >> 32),                            (uint32) (cmd->startpoint),                            (uint32) (FlushPtr >> 32),                            (uint32) (FlushPtr))));        }        /* Start streaming from the requested point */        //从请求点开始streaming        sentPtr = cmd->startpoint;        /* Initialize shared memory status, too */        //初始化共享内存状态        SpinLockAcquire(&MyWalSnd->mutex);        MyWalSnd->sentPtr = sentPtr;        SpinLockRelease(&MyWalSnd->mutex);        SyncRepInitConfig();        /* Main loop of walsender */        //walsender主循环,开始复制,激活复制        replication_active = true;        //主循环        WalSndLoop(XLogSendPhysical);        //完结后设置为非活动状态        replication_active = false;        if (got_STOPPING)            proc_exit(0);//退出        //设置状态        WalSndSetState(WALSNDSTATE_STARTUP);        Assert(streamingDoneSending && streamingDoneReceiving);    }    if (cmd->slotname)        ReplicationSlotRelease();    /*     * Copy is finished now. Send a single-row result set indicating the next     * timeline.     * Copy命令已完结.发送单行结果集以提升下一个timeline     */    if (sendTimeLineIsHistoric)    {        char        startpos_str[8 + 1 + 8 + 1];        DestReceiver *dest;        TupOutputState *tstate;        TupleDesc   tupdesc;        Datum       values[2];        bool        nulls[2];        snprintf(startpos_str, sizeof(startpos_str), "%X/%X",                 (uint32) (sendTimeLineValidUpto >> 32),                 (uint32) sendTimeLineValidUpto);        dest = CreateDestReceiver(DestRemoteSimple);        MemSet(nulls, false, sizeof(nulls));        /*         * Need a tuple descriptor representing two columns. int8 may seem         * like a surprising data type for this, but in theory int4 would not         * be wide enough for this, as TimeLineID is unsigned.         */        tupdesc = CreateTemplateTupleDesc(2);        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",                                  INT8OID, -1, 0);        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",                                  TEXTOID, -1, 0);        /* prepare for projection of tuple */        tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);        values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);        values[1] = CStringGetTextDatum(startpos_str);        /* send it to dest */        do_tup_output(tstate, values, nulls);        end_tup_output(tstate);    }    /* Send CommandComplete message */    pq_puttextmessage('C', "START_STREAMING");}

三、跟踪分析

在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点

[xdb@localhost ~]$ ps -ef|grep postgresxdb       1339     1  2 14:45 pts/0    00:00:00 /appdb/xdb/pg11.2/bin/postgres[xdb@localhost ~]$ gdb -p 1339GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7...(gdb) set follow-fork-mode child(gdb) b exec_replication_commandBreakpoint 1 at 0x852fd2: file walsender.c, line 1438.(gdb) cContinuing.[New process 1356][Thread debugging using libthread_db enabled]Using host libthread_db library "/lib64/libthread_db.so.1".[Switching to Thread 0x7f5df9d2d8c0 (LWP 1356)]Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYSTEM") at walsender.c:14381438        if (got_STOPPING)(gdb)

第一个命令是IDENTIFY_SYSTEM,第二个命令才是需要跟踪的对象START_REPLICATION

(gdb) cContinuing.Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:14381438        if (got_STOPPING)(gdb)

1.执行相关初始化和校验

(gdb) n1446        if (MyWalSnd->state == WALSNDSTATE_STOPPING)(gdb) 1454        SnapBuildClearExportedSnapshot();(gdb) p *MyWalSnd$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0,   writeLag = -1, flushLag = -1, applyLag = -1, mutex = 0 '\000', latch = 0x7f5dee92c4d4, sync_standby_priority = 0}(gdb) n1456        CHECK_FOR_INTERRUPTS();(gdb)

2.切换内存上下文

(gdb) 1458        cmd_context = AllocSetContextCreate(CurrentMemoryContext,(gdb) 1461        old_context = MemoryContextSwitchTo(cmd_context);(gdb)

3.初始化复制扫描器

(gdb) 1463        replication_scanner_init(cmd_string);(gdb) n1464        parse_rc = replication_yyparse();(gdb) 1465        if (parse_rc != 0)(gdb) p parse_rc$3 = 0(gdb) (gdb) n1471        cmd_node = replication_parse_result;(gdb)(gdb) 1479        if (cmd_node->type != T_SQLCmd)(gdb) n1480            ereport(log_replication_commands ? LOG : DEBUG1,(gdb) p cmd_node$4 = (Node *) 0x1df4710(gdb) p *cmd_node$5 = {type = T_StartReplicationCmd}(gdb)

4.执行事务相关的判断或校验

(gdb) n1487        if (!IsTransactionBlock())(gdb) 1488            SnapBuildClearExportedSnapshot();(gdb) 1494        if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))(gdb) 1500        CHECK_FOR_INTERRUPTS();(gdb)

5.初始化输入输出消息

(gdb) 1506        initStringInfo(&output_message);(gdb) 1507        initStringInfo(&reply_message);(gdb) 1508        initStringInfo(&tmpbuf);(gdb) 1511        pgstat_report_activity(STATE_RUNNING, NULL);

6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication

(gdb) n1513        switch (cmd_node->type)(gdb) 1534                    StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;(gdb) 1536                    PreventInTransactionBlock(true, "START_REPLICATION");(gdb) 1538                    if (cmd->kind == REPLICATION_KIND_PHYSICAL)(gdb) 1539                        StartReplication(cmd);

进入StartReplication

1539                        StartReplication(cmd);(gdb) stepStartReplication (cmd=0x1df4710) at walsender.c:532532     if (ThisTimeLineID == 0)(gdb)

1.执行相关初始化和校验

(gdb) n546     if (cmd->slotname)(gdb) 560     if (am_cascading_walsender)(gdb)

2.选择时间线

(gdb) n568     if (cmd->timeline != 0)(gdb) 572         sendTimeLine = cmd->timeline;(gdb) 573         if (sendTimeLine == ThisTimeLineID)(gdb) 575             sendTimeLineIsHistoric = false;(gdb) p FlushPtr$9 = 1560397696(gdb) n576             sendTimeLineValidUpto = InvalidXLogRecPtr;(gdb) 634     streamingDoneSending = streamingDoneReceiving = false;(gdb) p sendTimeLine$10 = 16(gdb) p ThisTimeLineID$11 = 16(gdb) p *cmd$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16,   startpoint = 1560281088, options = 0x0}(gdb)

3.进入COPY模式

(gdb) n637     if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)(gdb)

3.1设置状态

648         WalSndSetState(WALSNDSTATE_CATCHUP);(gdb) p sendTimeLineValidUpto$13 = 0(gdb) p cmd->startpoint$14 = 1560281088(gdb)

3.2发送CopyBothResponse消息,启动streaming

(gdb) n651         pq_beginmessage(&buf, 'W');(gdb) 652         pq_sendbyte(&buf, 0);(gdb) 653         pq_sendint16(&buf, 0);(gdb) 654         pq_endmessage(&buf);(gdb) p buf$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87}(gdb) p buf->data$16 = 0x1df53b0 ""(gdb) x/hb buf->data0x1df53b0:  0(gdb) x/32hb buf->data0x1df53b0:  0   0   0   127 127 127 127 1270x1df53b8:  127 127 127 127 127 127 127 1270x1df53c0:  127 127 127 127 127 127 127 1270x1df53c8:  127 127 127 127 127 127 127 127(gdb)

3.3初始化相关变量,如共享内存状态等

(gdb) n655         pq_flush();(gdb) 661         if (FlushPtr < cmd->startpoint)(gdb) p FlushPtr$17 = 1560397696(gdb) p cmd->startpoint$18 = 1560281088(gdb) n672         sentPtr = cmd->startpoint;(gdb) 675         SpinLockAcquire(&MyWalSnd->mutex);(gdb) 676         MyWalSnd->sentPtr = sentPtr;(gdb) 677         SpinLockRelease(&MyWalSnd->mutex);(gdb) 679         SyncRepInitConfig();(gdb) 682         replication_active = true;

3.4进入主循环(WalSndLoop)

(gdb) 684         WalSndLoop(XLogSendPhysical);(gdb)

DONE!

四、参考资料

PG Source Code

0