PostgreSQL 源码解读(154)- 后台进程#6(walsender#2)
本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的exec_replication_command和StartReplication函数.
调用栈如下:
(gdb) bt#0 0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6#1 0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0, nevents=1) at latch.c:1048#2 0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1, wait_event_info=83886092) at latch.c:1000#3 0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999, wait_event_info=83886092) at latch.c:385#4 0x000000000085405b in WalSndLoop (send_data=0x8547fe ) at walsender.c:2229#5 0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684#6 0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:1539#7 0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator") at postgres.c:4178#8 0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361#9 0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228
一、数据结构
StringInfo
StringInfoData结构体保存关于扩展字符串的相关信息.
/*------------------------- * StringInfoData holds information about an extensible string. * StringInfoData结构体保存关于扩展字符串的相关信息. * data is the current buffer for the string (allocated with palloc). * data 通过palloc分配的字符串缓存 * len is the current string length. There is guaranteed to be * a terminating '\0' at data[len], although this is not very * useful when the string holds binary data rather than text. * len 是当前字符串的长度.保证以ASCII 0(\0)结束(data[len] = '\0'). * 虽然如果存储的是二进制数据而不是文本时不太好使. * maxlen is the allocated size in bytes of 'data', i.e. the maximum * string size (including the terminating '\0' char) that we can * currently store in 'data' without having to reallocate * more space. We must always have maxlen > len. * maxlen 以字节为单位已分配的'data'的大小,限定了最大的字符串大小(包括结尾的ASCII 0) * 小于此尺寸的数据可以直接存储而无需重新分配. * cursor is initialized to zero by makeStringInfo or initStringInfo, * but is not otherwise touched by the stringinfo.c routines. * Some routines use it to scan through a StringInfo. * cursor 通过makeStringInfo或initStringInfo初始化为0,但不受stringinfo.c例程的影响. * 某些例程使用该字段扫描StringInfo *------------------------- */typedef struct StringInfoData{ char *data; int len; int maxlen; int cursor;} StringInfoData;typedef StringInfoData *StringInfo;
二、源码解读
exec_replication_command
exec_replication_command执行复制命令,如cmd_string被识别为WalSender命令,返回T,否则返回F.
其主要逻辑如下:
1.执行相关初始化和校验
2.切换内存上下文
3.初始化复制扫描器
4.执行事务相关的判断或校验
5.初始化输入输出消息
6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication
/* * Execute an incoming replication command. * 执行复制命令. * * Returns true if the cmd_string was recognized as WalSender command, false * if not. * 如cmd_string被识别为WalSender命令,返回T,否则返回F */boolexec_replication_command(const char *cmd_string){ int parse_rc; Node *cmd_node; MemoryContext cmd_context; MemoryContext old_context; /* * If WAL sender has been told that shutdown is getting close, switch its * status accordingly to handle the next replication commands correctly. * 如果WAL sender已被通知关闭,切换状态以应对接下来的复制命令. */ if (got_STOPPING) WalSndSetState(WALSNDSTATE_STOPPING); /* * Throw error if in stopping mode. We need prevent commands that could * generate WAL while the shutdown checkpoint is being written. To be * safe, we just prohibit all new commands. * 如在stopping模式,则抛出错误. * 我们需要在shutdown checkpoint写入期间禁止命令的产生. * 安全期间,禁止所有新的命令. */ if (MyWalSnd->state == WALSNDSTATE_STOPPING) ereport(ERROR, (errmsg("cannot execute new commands while WAL sender is in stopping mode"))); /* * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next * command arrives. Clean up the old stuff if there's anything. * CREATE_REPLICATION_SLOT ... LOGICAL 导出快照直至下个命令到达. * 如存在,则清理旧的stuff. * */ SnapBuildClearExportedSnapshot(); //检查中断 CHECK_FOR_INTERRUPTS(); //命令上下文 cmd_context = AllocSetContextCreate(CurrentMemoryContext, "Replication command context", ALLOCSET_DEFAULT_SIZES); old_context = MemoryContextSwitchTo(cmd_context); //初始化复制扫描器 replication_scanner_init(cmd_string); parse_rc = replication_yyparse(); if (parse_rc != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), (errmsg_internal("replication command parser returned %d", parse_rc)))); cmd_node = replication_parse_result; /* * Log replication command if log_replication_commands is enabled. Even * when it's disabled, log the command with DEBUG1 level for backward * compatibility. Note that SQL commands are not logged here, and will be * logged later if log_statement is enabled. * 如log_replication_commands启用,则记录复制命令在日志中. * 就算该选项被禁止,通过DEBUG1级别记录日志. * 注意SQL命令不在这里记录,在log_statement启用的情况下在后续进行记录. * */ if (cmd_node->type != T_SQLCmd) ereport(log_replication_commands ? LOG : DEBUG1, (errmsg("received replication command: %s", cmd_string))); /* * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was * called outside of transaction the snapshot should be cleared here. * CREATE_REPLICATION_SLOT ... LOGICAL导出快照. * 该命令如果在事务的外层被调用,那么快照应在这里清除. */ if (!IsTransactionBlock()) SnapBuildClearExportedSnapshot(); /* * For aborted transactions, don't allow anything except pure SQL, the * exec_simple_query() will handle it correctly. * 对于废弃的事务,除了纯SQL外不允许其他命令,exec_simple_query()函数可以正确处理这种情况. */ if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd)) ereport(ERROR, (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), errmsg("current transaction is aborted, " "commands ignored until end of transaction block"))); CHECK_FOR_INTERRUPTS(); /* * Allocate buffers that will be used for each outgoing and incoming * message. We do this just once per command to reduce palloc overhead. * 为消息I/O分配缓存. * 每个命令执行一次以减少palloc的负载. */ initStringInfo(&output_message); initStringInfo(&reply_message); initStringInfo(&tmpbuf); /* Report to pgstat that this process is running */ //向pgstat报告该进程正在运行. pgstat_report_activity(STATE_RUNNING, NULL); //根据命令类型执行相应的命令 switch (cmd_node->type) { case T_IdentifySystemCmd: //识别系统 IdentifySystem(); break; case T_BaseBackupCmd: //BASE_BACKUP PreventInTransactionBlock(true, "BASE_BACKUP"); SendBaseBackup((BaseBackupCmd *) cmd_node); break; case T_CreateReplicationSlotCmd: //创建复制slot CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node); break; case T_DropReplicationSlotCmd: //删除复制slot DropReplicationSlot((DropReplicationSlotCmd *) cmd_node); break; case T_StartReplicationCmd: //START_REPLICATION { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; PreventInTransactionBlock(true, "START_REPLICATION"); if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else StartLogicalReplication(cmd); break; } case T_TimeLineHistoryCmd: //构造时间线历史 TIMELINE_HISTORY PreventInTransactionBlock(true, "TIMELINE_HISTORY"); SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); break; case T_VariableShowStmt: // { DestReceiver *dest = CreateDestReceiver(DestRemoteSimple); VariableShowStmt *n = (VariableShowStmt *) cmd_node; GetPGVariable(n->name, dest); } break; case T_SQLCmd: //SQL命令 if (MyDatabaseId == InvalidOid) ereport(ERROR, (errmsg("cannot execute SQL commands in WAL sender for physical replication"))); /* Report to pgstat that this process is now idle */ pgstat_report_activity(STATE_IDLE, NULL); /* Tell the caller that this wasn't a WalSender command. */ return false; default: //其他命令 elog(ERROR, "unrecognized replication command node tag: %u", cmd_node->type); } /* done */ //执行完毕,回到原来的内存上下文中 MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); /* Send CommandComplete message */ //命令结束 EndCommand("SELECT", DestRemote); /* Report to pgstat that this process is now idle */ //报告状态 pgstat_report_activity(STATE_IDLE, NULL); return true;}
StartReplication
StartReplication处理START_REPLICATION命令.
其主要逻辑如下:
1.执行相关初始化和校验
2.选择时间线
3.进入COPY模式
3.1设置状态
3.2发送CopyBothResponse消息,启动streaming
3.3初始化相关变量,如共享内存状态等
3.4进入主循环(WalSndLoop)
/* * Handle START_REPLICATION command. * 处理START_REPLICATION命令 * * At the moment, this never returns, but an ereport(ERROR) will take us back * to the main loop. * 该函数不会返回,但ereport(ERROR)调用可以回到主循环 */static voidStartReplication(StartReplicationCmd *cmd){ StringInfoData buf; XLogRecPtr FlushPtr; if (ThisTimeLineID == 0) //时间线校验 ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION"))); /* * We assume here that we're logging enough information in the WAL for * log-shipping, since this is checked in PostmasterMain(). * 在这里,由于在PostmasterMain()假定已为log-shipping记录了足够多的信息 * * NOTE: wal_level can only change at shutdown, so in most cases it is * difficult for there to be WAL data that we can still see that was * written at wal_level='minimal'. * 注意:wal_level只能在shutdown的情况下进行修改, * 因此在大多数情况下,很难看到在wal_level='minimal'的情况下的WAL数据. */ if (cmd->slotname) { ReplicationSlotAcquire(cmd->slotname, true); //#define SlotIsLogical ( slot ) (slot->data.database != InvalidOid) if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errmsg("cannot use a logical replication slot for physical replication")))); } /* * Select the timeline. If it was given explicitly by the client, use * that. Otherwise use the timeline of the last replayed record, which is * kept in ThisTimeLineID. * 选择时间线. * 如果通过客户端明确给出,则使用该值. * 否则的话,使用最后重放记录的时间线,在ThisTimeLineID中保存. */ if (am_cascading_walsender) { /* this also updates ThisTimeLineID */ //这也会更新ThisTimeLineID变量 FlushPtr = GetStandbyFlushRecPtr(); } else FlushPtr = GetFlushRecPtr(); if (cmd->timeline != 0) { XLogRecPtr switchpoint; sendTimeLine = cmd->timeline; if (sendTimeLine == ThisTimeLineID) { sendTimeLineIsHistoric = false; sendTimeLineValidUpto = InvalidXLogRecPtr; } else { List *timeLineHistory; sendTimeLineIsHistoric = true; /* * Check that the timeline the client requested exists, and the * requested start location is on that timeline. * 检查客户端请求的时间线是否存在,请求的开始位置是否在该时间线上. */ timeLineHistory = readTimeLineHistory(ThisTimeLineID); switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, &sendTimeLineNextTLI); list_free_deep(timeLineHistory); /* * Found the requested timeline in the history. Check that * requested startpoint is on that timeline in our history. * 通过历史文件找到请求的时间线. * 在历史中检查请求的开始点是否在时间线上. * * This is quite loose on purpose. We only check that we didn't * fork off the requested timeline before the switchpoint. We * don't check that we switched *to* it before the requested * starting point. This is because the client can legitimately * request to start replication from the beginning of the WAL * segment that contains switchpoint, but on the new timeline, so * that it doesn't end up with a partial segment. If you ask for * too old a starting point, you'll get an error later when we * fail to find the requested WAL segment in pg_wal. * 这是有意为之.我们只检查在切换点之前没有fork off的请求的时间线. * 我们不会检查在请求的开始点之前的时间线. * 这是因为客户端可以合法地请求从包含交换点的WAL端的开始处进行复制, * 在新的时间线上如此执行,以避免出现由于部分segment的问题导致出错. * 如果客户端请求一个较旧的开始点,在pg_wal中无法找到请求的WAL段时会报错. * * XXX: we could be more strict here and only allow a startpoint * that's older than the switchpoint, if it's still in the same * WAL segment. * XXX: 我们可以更严格,如果仍然在同一个WAL segment中,那么可以只允许比切换点旧的开始点 */ if (!XLogRecPtrIsInvalid(switchpoint) && switchpoint < cmd->startpoint) { ereport(ERROR, (errmsg("requested starting point %X/%X on timeline %u is not in this server's history", (uint32) (cmd->startpoint >> 32), (uint32) (cmd->startpoint), cmd->timeline), errdetail("This server's history forked from timeline %u at %X/%X.", cmd->timeline, (uint32) (switchpoint >> 32), (uint32) (switchpoint)))); } sendTimeLineValidUpto = switchpoint; } } else { sendTimeLine = ThisTimeLineID; sendTimeLineValidUpto = InvalidXLogRecPtr; sendTimeLineIsHistoric = false; } streamingDoneSending = streamingDoneReceiving = false; /* If there is nothing to stream, don't even enter COPY mode */ //如果没有任何东西需要stream,不需要启动COPY命令 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto) { /* * When we first start replication the standby will be behind the * primary. For some applications, for example synchronous * replication, it is important to have a clear state for this initial * catchup mode, so we can trigger actions when we change streaming * state later. We may stay in this state for a long time, which is * exactly why we want to be able to monitor whether or not we are * still here. * 在首次启动复制时,standby节点会落后于master节点. * 对于某些应用,比如同步复制,对于这种初始的catchup模式有一个干净的状态是十分重要的, * 因此在改变streaming状态时我们可以触发相关的动作. * 我们可以处于这种状态很长时间,这正是我们希望有能力监控我们是否仍在这里的原因. */ //设置状态 WalSndSetState(WALSNDSTATE_CATCHUP); /* Send a CopyBothResponse message, and start streaming */ //发送CopyBothResponse消息,启动streaming pq_beginmessage(&buf, 'W');//W->COPY命令? pq_sendbyte(&buf, 0); pq_sendint16(&buf, 0); pq_endmessage(&buf); pq_flush(); /* * Don't allow a request to stream from a future point in WAL that * hasn't been flushed to disk in this server yet. * 不允许请求该服务器上一个尚未刷入到磁盘上的WAL未来位置. */ if (FlushPtr < cmd->startpoint) { ereport(ERROR, (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X", (uint32) (cmd->startpoint >> 32), (uint32) (cmd->startpoint), (uint32) (FlushPtr >> 32), (uint32) (FlushPtr)))); } /* Start streaming from the requested point */ //从请求点开始streaming sentPtr = cmd->startpoint; /* Initialize shared memory status, too */ //初始化共享内存状态 SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sentPtr = sentPtr; SpinLockRelease(&MyWalSnd->mutex); SyncRepInitConfig(); /* Main loop of walsender */ //walsender主循环,开始复制,激活复制 replication_active = true; //主循环 WalSndLoop(XLogSendPhysical); //完结后设置为非活动状态 replication_active = false; if (got_STOPPING) proc_exit(0);//退出 //设置状态 WalSndSetState(WALSNDSTATE_STARTUP); Assert(streamingDoneSending && streamingDoneReceiving); } if (cmd->slotname) ReplicationSlotRelease(); /* * Copy is finished now. Send a single-row result set indicating the next * timeline. * Copy命令已完结.发送单行结果集以提升下一个timeline */ if (sendTimeLineIsHistoric) { char startpos_str[8 + 1 + 8 + 1]; DestReceiver *dest; TupOutputState *tstate; TupleDesc tupdesc; Datum values[2]; bool nulls[2]; snprintf(startpos_str, sizeof(startpos_str), "%X/%X", (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto); dest = CreateDestReceiver(DestRemoteSimple); MemSet(nulls, false, sizeof(nulls)); /* * Need a tuple descriptor representing two columns. int8 may seem * like a surprising data type for this, but in theory int4 would not * be wide enough for this, as TimeLineID is unsigned. */ tupdesc = CreateTemplateTupleDesc(2); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli", INT8OID, -1, 0); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos", TEXTOID, -1, 0); /* prepare for projection of tuple */ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); values[0] = Int64GetDatum((int64) sendTimeLineNextTLI); values[1] = CStringGetTextDatum(startpos_str); /* send it to dest */ do_tup_output(tstate, values, nulls); end_tup_output(tstate); } /* Send CommandComplete message */ pq_puttextmessage('C', "START_STREAMING");}
三、跟踪分析
在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点
[xdb@localhost ~]$ ps -ef|grep postgresxdb 1339 1 2 14:45 pts/0 00:00:00 /appdb/xdb/pg11.2/bin/postgres[xdb@localhost ~]$ gdb -p 1339GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7...(gdb) set follow-fork-mode child(gdb) b exec_replication_commandBreakpoint 1 at 0x852fd2: file walsender.c, line 1438.(gdb) cContinuing.[New process 1356][Thread debugging using libthread_db enabled]Using host libthread_db library "/lib64/libthread_db.so.1".[Switching to Thread 0x7f5df9d2d8c0 (LWP 1356)]Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYSTEM") at walsender.c:14381438 if (got_STOPPING)(gdb)
第一个命令是IDENTIFY_SYSTEM,第二个命令才是需要跟踪的对象START_REPLICATION
(gdb) cContinuing.Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:14381438 if (got_STOPPING)(gdb)
1.执行相关初始化和校验
(gdb) n1446 if (MyWalSnd->state == WALSNDSTATE_STOPPING)(gdb) 1454 SnapBuildClearExportedSnapshot();(gdb) p *MyWalSnd$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0, writeLag = -1, flushLag = -1, applyLag = -1, mutex = 0 '\000', latch = 0x7f5dee92c4d4, sync_standby_priority = 0}(gdb) n1456 CHECK_FOR_INTERRUPTS();(gdb)
2.切换内存上下文
(gdb) 1458 cmd_context = AllocSetContextCreate(CurrentMemoryContext,(gdb) 1461 old_context = MemoryContextSwitchTo(cmd_context);(gdb)
3.初始化复制扫描器
(gdb) 1463 replication_scanner_init(cmd_string);(gdb) n1464 parse_rc = replication_yyparse();(gdb) 1465 if (parse_rc != 0)(gdb) p parse_rc$3 = 0(gdb) (gdb) n1471 cmd_node = replication_parse_result;(gdb)(gdb) 1479 if (cmd_node->type != T_SQLCmd)(gdb) n1480 ereport(log_replication_commands ? LOG : DEBUG1,(gdb) p cmd_node$4 = (Node *) 0x1df4710(gdb) p *cmd_node$5 = {type = T_StartReplicationCmd}(gdb)
4.执行事务相关的判断或校验
(gdb) n1487 if (!IsTransactionBlock())(gdb) 1488 SnapBuildClearExportedSnapshot();(gdb) 1494 if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))(gdb) 1500 CHECK_FOR_INTERRUPTS();(gdb)
5.初始化输入输出消息
(gdb) 1506 initStringInfo(&output_message);(gdb) 1507 initStringInfo(&reply_message);(gdb) 1508 initStringInfo(&tmpbuf);(gdb) 1511 pgstat_report_activity(STATE_RUNNING, NULL);
6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication
(gdb) n1513 switch (cmd_node->type)(gdb) 1534 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;(gdb) 1536 PreventInTransactionBlock(true, "START_REPLICATION");(gdb) 1538 if (cmd->kind == REPLICATION_KIND_PHYSICAL)(gdb) 1539 StartReplication(cmd);
进入StartReplication
1539 StartReplication(cmd);(gdb) stepStartReplication (cmd=0x1df4710) at walsender.c:532532 if (ThisTimeLineID == 0)(gdb)
1.执行相关初始化和校验
(gdb) n546 if (cmd->slotname)(gdb) 560 if (am_cascading_walsender)(gdb)
2.选择时间线
(gdb) n568 if (cmd->timeline != 0)(gdb) 572 sendTimeLine = cmd->timeline;(gdb) 573 if (sendTimeLine == ThisTimeLineID)(gdb) 575 sendTimeLineIsHistoric = false;(gdb) p FlushPtr$9 = 1560397696(gdb) n576 sendTimeLineValidUpto = InvalidXLogRecPtr;(gdb) 634 streamingDoneSending = streamingDoneReceiving = false;(gdb) p sendTimeLine$10 = 16(gdb) p ThisTimeLineID$11 = 16(gdb) p *cmd$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16, startpoint = 1560281088, options = 0x0}(gdb)
3.进入COPY模式
(gdb) n637 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)(gdb)
3.1设置状态
648 WalSndSetState(WALSNDSTATE_CATCHUP);(gdb) p sendTimeLineValidUpto$13 = 0(gdb) p cmd->startpoint$14 = 1560281088(gdb)
3.2发送CopyBothResponse消息,启动streaming
(gdb) n651 pq_beginmessage(&buf, 'W');(gdb) 652 pq_sendbyte(&buf, 0);(gdb) 653 pq_sendint16(&buf, 0);(gdb) 654 pq_endmessage(&buf);(gdb) p buf$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87}(gdb) p buf->data$16 = 0x1df53b0 ""(gdb) x/hb buf->data0x1df53b0: 0(gdb) x/32hb buf->data0x1df53b0: 0 0 0 127 127 127 127 1270x1df53b8: 127 127 127 127 127 127 127 1270x1df53c0: 127 127 127 127 127 127 127 1270x1df53c8: 127 127 127 127 127 127 127 127(gdb)
3.3初始化相关变量,如共享内存状态等
(gdb) n655 pq_flush();(gdb) 661 if (FlushPtr < cmd->startpoint)(gdb) p FlushPtr$17 = 1560397696(gdb) p cmd->startpoint$18 = 1560281088(gdb) n672 sentPtr = cmd->startpoint;(gdb) 675 SpinLockAcquire(&MyWalSnd->mutex);(gdb) 676 MyWalSnd->sentPtr = sentPtr;(gdb) 677 SpinLockRelease(&MyWalSnd->mutex);(gdb) 679 SyncRepInitConfig();(gdb) 682 replication_active = true;
3.4进入主循环(WalSndLoop)
(gdb) 684 WalSndLoop(XLogSendPhysical);(gdb)
DONE!
四、参考资料
PG Source Code