千家信息网

PostgreSQL中函数AssignTransactionId的实现逻辑是什么

发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,本篇内容介绍了"PostgreSQL中函数AssignTransactionId的实现逻辑是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这
千家信息网最后更新 2025年01月25日PostgreSQL中函数AssignTransactionId的实现逻辑是什么

本篇内容介绍了"PostgreSQL中函数AssignTransactionId的实现逻辑是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、数据结构

静态变量
当前事务状态CurrentTransactionState

/* * CurrentTransactionState always points to the current transaction state * block.  It will point to TopTransactionStateData when not in a * transaction at all, or when in a top-level transaction. * CurrentTransactionState通常指向当前事务块. * 如不处于事务中或者处于顶层事务中,则指向TopTransactionStateData */static TransactionStateData TopTransactionStateData = {    .state = TRANS_DEFAULT,    .blockState = TBLOCK_DEFAULT,};/* * unreportedXids holds XIDs of all subtransactions that have not yet been * reported in an XLOG_XACT_ASSIGNMENT record. * unreportedXids保存所有尚未在XLOG_XACT_ASSIGNMENT记录的子事务. */static int  nUnreportedXids;static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];static TransactionState CurrentTransactionState = &TopTransactionStateData;/* * The subtransaction ID and command ID assignment counters are global * to a whole transaction, so we do not keep them in the state stack. * subtransaction ID和command ID全局计数器,对事务可见,在state栈中不记录这些信息. */static SubTransactionId currentSubTransactionId;static CommandId currentCommandId;static bool currentCommandIdUsed;

TransactionState
事务状态结构体

/* *  transaction states - transaction state from server perspective *  事务状态枚举 - 服务器视角的事务状态 */typedef enum TransState{    TRANS_DEFAULT,              /* idle 空闲 */    TRANS_START,                /* transaction starting 事务启动 */    TRANS_INPROGRESS,           /* inside a valid transaction 进行中 */    TRANS_COMMIT,               /* commit in progress 提交中 */    TRANS_ABORT,                /* abort in progress 回滚中 */    TRANS_PREPARE               /* prepare in progress 准备中 */} TransState;/* *  transaction block states - transaction state of client queries *  事务块状态 - 客户端查询的事务状态 * * Note: the subtransaction states are used only for non-topmost * transactions; the others appear only in the topmost transaction. * 注意:subtransaction只用于非顶层事务;其他字段用于顶层事务. */typedef enum TBlockState{    /* not-in-transaction-block states 未进入事务块状态 */    TBLOCK_DEFAULT,             /* idle 空闲  */    TBLOCK_STARTED,             /* running single-query transaction 单个查询事务 */    /* transaction block states 事务块状态 */    TBLOCK_BEGIN,               /* starting transaction block 开始事务块 */    TBLOCK_INPROGRESS,          /* live transaction 进行中 */    TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN 隐式事务,进行中 */    TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker 并行worker中的事务,进行中 */    TBLOCK_END,                 /* COMMIT received 接收到COMMIT */    TBLOCK_ABORT,               /* failed xact, awaiting ROLLBACK 失败,等待ROLLBACK */    TBLOCK_ABORT_END,           /* failed xact, ROLLBACK received 失败,已接收ROLLBACK */    TBLOCK_ABORT_PENDING,       /* live xact, ROLLBACK received 进行中,接收到ROLLBACK */    TBLOCK_PREPARE,             /* live xact, PREPARE received 进行中,接收到PREPARE */    /* subtransaction states 子事务状态 */    TBLOCK_SUBBEGIN,            /* starting a subtransaction 开启 */    TBLOCK_SUBINPROGRESS,       /* live subtransaction 进行中 */    TBLOCK_SUBRELEASE,          /* RELEASE received 接收到RELEASE */    TBLOCK_SUBCOMMIT,           /* COMMIT received while TBLOCK_SUBINPROGRESS 进行中,接收到COMMIT */    TBLOCK_SUBABORT,            /* failed subxact, awaiting ROLLBACK 失败,等待ROLLBACK */    TBLOCK_SUBABORT_END,        /* failed subxact, ROLLBACK received 失败,已接收ROLLBACK */    TBLOCK_SUBABORT_PENDING,    /* live subxact, ROLLBACK received 进行中,接收到ROLLBACK */    TBLOCK_SUBRESTART,          /* live subxact, ROLLBACK TO received 进行中,接收到ROLLBACK TO */    TBLOCK_SUBABORT_RESTART     /* failed subxact, ROLLBACK TO received 失败,已接收ROLLBACK TO */} TBlockState;/* *  transaction state structure *  事务状态结构体 */typedef struct TransactionStateData{    //事务ID    TransactionId transactionId;    /* my XID, or Invalid if none */    //子事务ID    SubTransactionId subTransactionId;  /* my subxact ID */    //保存点名称    char       *name;           /* savepoint name, if any */    //保存点级别    int         savepointLevel; /* savepoint level */    //低级别的事务状态    TransState  state;          /* low-level state */    //高级别的事务状态    TBlockState blockState;     /* high-level state */    //事务嵌套深度    int         nestingLevel;   /* transaction nesting depth */    //GUC上下文嵌套深度    int         gucNestLevel;   /* GUC context nesting depth */    //事务生命周期上下文    MemoryContext curTransactionContext;    /* my xact-lifetime context */    //查询资源    ResourceOwner curTransactionOwner;  /* my query resources */    //按XID顺序保存的已提交的子事务ID    TransactionId *childXids;   /* subcommitted child XIDs, in XID order */    //childXids数组大小    int         nChildXids;     /* # of subcommitted child XIDs */    //分配的childXids数组空间    int         maxChildXids;   /* allocated size of childXids[] */    //上一个CurrentUserId    Oid         prevUser;       /* previous CurrentUserId setting */    //上一个SecurityRestrictionContext    int         prevSecContext; /* previous SecurityRestrictionContext */    //上一事务是否只读?    bool        prevXactReadOnly;   /* entry-time xact r/o state */    //是否处于Recovery?    bool        startedInRecovery;  /* did we start in recovery? */    //XID是否已保存在WAL Record中?    bool        didLogXid;      /* has xid been included in WAL record? */    //Enter/ExitParallelMode计数器    int         parallelModeLevel;  /* Enter/ExitParallelMode counter */    //父事务状态    struct TransactionStateData *parent;    /* back link to parent */} TransactionStateData;//结构体指针typedef TransactionStateData *TransactionState;

二、源码解读

AssignTransactionId函数,给定的TransactionState分配一个新的持久化事务号XID,在此函数调用前,不会为事务分配XIDs.

/* * AssignTransactionId * * Assigns a new permanent XID to the given TransactionState. * We do not assign XIDs to transactions until/unless this is called. * Also, any parent TransactionStates that don't yet have XIDs are assigned * one; this maintains the invariant that a child transaction has an XID * following its parent's. * 为给定的TransactionState分配一个新的持久化事务号XID. * 在此函数调用前,我们不会为事务分配XIDs. * 同时,所有尚未获得XIDs的父TransactionStates也会分配事务号, *   这可以确保子事务的事务号在父事务之后. */static voidAssignTransactionId(TransactionState s){    bool        isSubXact = (s->parent != NULL);    ResourceOwner currentOwner;    bool        log_unknown_top = false;    /* Assert that caller didn't screw up */    //确保调用者没有搞砸    Assert(!TransactionIdIsValid(s->transactionId));    Assert(s->state == TRANS_INPROGRESS);    /*     * Workers synchronize transaction state at the beginning of each parallel     * operation, so we can't account for new XIDs at this point.     * 在每个并行操作前,Parallel Workers同步事务状态,     *   因此我们不能在这时候请求XIDs     */    if (IsInParallelMode() || IsParallelWorker())        elog(ERROR, "cannot assign XIDs during a parallel operation");    /*     * Ensure parent(s) have XIDs, so that a child always has an XID later     * than its parent.  Mustn't recurse here, or we might get a stack overflow     * if we're at the bottom of a huge stack of subtransactions none of which     * have XIDs yet.     * 确保父事务已分配XIDs,这样可以确保子事务的XID后于父事务.     * 不能在这里递归执行,否则如果我们在一个未分配XID子事务大栈的底部,可能会遇到栈溢出     */    if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))    {        TransactionState p = s->parent;        TransactionState *parents;        size_t      parentOffset = 0;        parents = palloc(sizeof(TransactionState) * s->nestingLevel);        while (p != NULL && !TransactionIdIsValid(p->transactionId))        {            parents[parentOffset++] = p;            p = p->parent;        }        /*         * This is technically a recursive call, but the recursion will never         * be more than one layer deep.         * 递归调用,但递归不会超过一层         */        while (parentOffset != 0)            AssignTransactionId(parents[--parentOffset]);        pfree(parents);    }    /*     * When wal_level=logical, guarantee that a subtransaction's xid can only     * be seen in the WAL stream if its toplevel xid has been logged before.     * If necessary we log an xact_assignment record with fewer than     * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set     * for a transaction even though it appears in a WAL record, we just might     * superfluously log something. That can happen when an xid is included     * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in     * xl_standby_locks.     * 如wal_level=logical,确保子事务XID在顶层XID已被"日志"的情况只可以被WAL stream看到.     * 如果有必要,我们将使用小于PGPROC_MAX_CACHED_SUBXIDS的值来记录xact_assignment记录.     * 请注意,即使didLogXid出现在WAL记录中,如果它没有为事务设置,也没有问题,     *   我们可能只是多余地记录了一些东西。     * 当一个xid出现在WAL Record中的某个地方,但不在XLogRecord->xl_xid中     *   (比如在xl_standby_locks中)时,就会发生这种情况。     */    if (isSubXact && XLogLogicalInfoActive() &&        !TopTransactionStateData.didLogXid)        log_unknown_top = true;    /*     * Generate a new Xid and record it in PG_PROC and pg_subtrans.     * 生成一个新的XID,并记录在PG_PROC和pg_subtrans中     *     * NB: we must make the subtrans entry BEFORE the Xid appears anywhere in     * shared storage other than PG_PROC; because if there's no room for it in     * PG_PROC, the subtrans entry is needed to ensure that other backends see     * the Xid as "running".  See GetNewTransactionId.     * 注意:我们必须在Xid出现在除PG_PROC之外的共享存储之前构造subtrans条目.     * 因为如果在PG_PROC没有空闲空间,子事务条目需要确保其他进程看到该XID正在运行.     * 参考函数GetNewTransactionId说明.     *      */    s->transactionId = GetNewTransactionId(isSubXact);    if (!isSubXact)        XactTopTransactionId = s->transactionId;    if (isSubXact)        SubTransSetParent(s->transactionId, s->parent->transactionId);    /*     * If it's a top-level transaction, the predicate locking system needs to     * be told about it too.     * 如为顶层事务,谓词锁系统也需要了解此事务.     */    if (!isSubXact)        RegisterPredicateLockingXid(s->transactionId);    /*     * Acquire lock on the transaction XID.  (We assume this cannot block.) We     * have to ensure that the lock is assigned to the transaction's own     * ResourceOwner.     * 请求锁(我们假定这样做不好引起阻塞).我们必须确保锁已被分配给事务自己的ResourceOwner.     */    currentOwner = CurrentResourceOwner;    CurrentResourceOwner = s->curTransactionOwner;    XactLockTableInsert(s->transactionId);    CurrentResourceOwner = currentOwner;    /*     * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each     * top-level transaction we issue a WAL record for the assignment. We     * include the top-level xid and all the subxids that have not yet been     * reported using XLOG_XACT_ASSIGNMENT records.     * 在每个顶级事务中分配的每个PGPROC_MAX_CACHED_SUBXIDS事务id,     *   我们都会为分配记录一条WAL记录。     * 该记录包括顶级的xid和所有尚未使用XLOG_XACT_ASSIGNMENT记录报告的子xid。     *     * This is required to limit the amount of shared memory required in a hot     * standby server to keep track of in-progress XIDs. See notes for     * RecordKnownAssignedTransactionIds().     * 在跟踪进行中的XIDs的备机上,需要控制共享内存的大小.     * 参见RecordKnownAssignedTransactionIds()函数说明.     *     * We don't keep track of the immediate parent of each subxid, only the     * top-level transaction that each subxact belongs to. This is correct in     * recovery only because aborted subtransactions are separately WAL     * logged.     * 我们不需要跟踪父事务的每个子事务,只需要跟踪子事务归属的顶层事务即可.     * 这样可行是因为在恢复中,已回滚的子事务是通过WAL单独记录的.     *     * This is correct even for the case where several levels above us didn't     * have an xid assigned as we recursed up to them beforehand.     * 即使在我们之前递归到上面的几个级别时没有分配xid的情况下,这也是正确的。     */    if (isSubXact && XLogStandbyInfoActive())    {        unreportedXids[nUnreportedXids] = s->transactionId;        nUnreportedXids++;        /*         * ensure this test matches similar one in         * RecoverPreparedTransactions()         * 确保在RecoverPreparedTransactions()中可以匹配到相似的.         */        if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS ||            log_unknown_top)        {            xl_xact_assignment xlrec;            /*             * xtop is always set by now because we recurse up transaction             * stack to the highest unassigned xid and then come back down             * xtop现在已经设置好了,因为我们将事务堆栈递归到最高的未分配的xid,然后再返回             */            xlrec.xtop = GetTopTransactionId();            Assert(TransactionIdIsValid(xlrec.xtop));            xlrec.nsubxacts = nUnreportedXids;            XLogBeginInsert();            XLogRegisterData((char *) &xlrec, MinSizeOfXactAssignment);            XLogRegisterData((char *) unreportedXids,                             nUnreportedXids * sizeof(TransactionId));            (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT);            nUnreportedXids = 0;            /* mark top, not current xact as having been logged */            //标记为最顶层,而不是当前已记录日志的xact            TopTransactionStateData.didLogXid = true;        }    }}

三、跟踪分析

执行txid_current,触发函数调用

11:10:36 (xdb@[local]:5432)testdb=# begin;BEGIN11:40:20 (xdb@[local]:5432)testdb=#* select txid_current_if_assigned(); txid_current_if_assigned --------------------------(1 row)11:40:43 (xdb@[local]:5432)testdb=#* select txid_current();

启动gdb,设置断点

(gdb) b AssignTransactionIdBreakpoint 5 at 0x546a4c: file xact.c, line 491.(gdb) cContinuing.Breakpoint 5, AssignTransactionId (s=0xf9c720 ) at xact.c:491491     bool        isSubXact = (s->parent != NULL);(gdb)

查看调用栈

(gdb) bt#0  AssignTransactionId (s=0xf9c720 ) at xact.c:491#1  0x000000000054693d in GetTopTransactionId () at xact.c:392#2  0x00000000009fe1f3 in txid_current (fcinfo=0x25835a0) at txid.c:443#3  0x00000000006cfebd in ExecInterpExpr (state=0x25834b8, econtext=0x25831a8, isnull=0x7ffe3d4a31f7)    at execExprInterp.c:654#4  0x00000000006d1ac6 in ExecInterpExprStillValid (state=0x25834b8, econtext=0x25831a8, isNull=0x7ffe3d4a31f7)    at execExprInterp.c:1786#5  0x00000000007140dd in ExecEvalExprSwitchContext (state=0x25834b8, econtext=0x25831a8, isNull=0x7ffe3d4a31f7)    at ../../../src/include/executor/executor.h:303#6  0x000000000071414b in ExecProject (projInfo=0x25834b0) at ../../../src/include/executor/executor.h:337#7  0x0000000000714323 in ExecResult (pstate=0x2583090) at nodeResult.c:136#8  0x00000000006e4c30 in ExecProcNodeFirst (node=0x2583090) at execProcnode.c:445#9  0x00000000006d9974 in ExecProcNode (node=0x2583090) at ../../../src/include/executor/executor.h:237#10 0x00000000006dc22d in ExecutePlan (estate=0x2582e78, planstate=0x2583090, use_parallel_mode=false,     operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x24cd0a0,     execute_once=true) at execMain.c:1723#11 0x00000000006d9f5c in standard_ExecutorRun (queryDesc=0x256b0c8, direction=ForwardScanDirection, count=0,     execute_once=true) at execMain.c:364#12 0x00000000006d9d7f in ExecutorRun (queryDesc=0x256b0c8, direction=ForwardScanDirection, count=0, execute_once=true)    at execMain.c:307#13 0x00000000008ccf5a in PortalRunSelect (portal=0x250c748, forward=true, count=0, dest=0x24cd0a0) at pquery.c:932#14 0x00000000008ccbf3 in PortalRun (portal=0x250c748, count=9223372036854775807, isTopLevel=true, run_once=true,     dest=0x24cd0a0, altdest=0x24cd0a0, completionTag=0x7ffe3d4a3570 "") at pquery.c:773#15 0x00000000008c6b1e in exec_simple_query (query_string=0x24a6ec8 "select txid_current();") at postgres.c:1145#16 0x00000000008cae70 in PostgresMain (argc=1, argv=0x24d2dc8, dbname=0x24d2c30 "testdb", username=0x24a3ba8 "xdb")    at postgres.c:4182    #17 0x000000000082642b in BackendRun (port=0x24c8c00) at postmaster.c:4361---Type  to continue, or q  to quit---#18 0x0000000000825b8f in BackendStartup (port=0x24c8c00) at postmaster.c:4033#19 0x0000000000821f1c in ServerLoop () at postmaster.c:1706#20 0x00000000008217b4 in PostmasterMain (argc=1, argv=0x24a1b60) at postmaster.c:1379#21 0x00000000007488ef in main (argc=1, argv=0x24a1b60) at main.c:228

输入参数TransactionState(全局变量,指向TopTransactionStateData)

(gdb) p s$13 = (TransactionState) 0xf9c720 (gdb) p *s$14 = {transactionId = 0, subTransactionId = 1, name = 0x0, savepointLevel = 0, state = TRANS_INPROGRESS,   blockState = TBLOCK_INPROGRESS, nestingLevel = 1, gucNestLevel = 1, curTransactionContext = 0x2523850,   curTransactionOwner = 0x24d4868, childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0,   prevXactReadOnly = false, startedInRecovery = false, didLogXid = false, parallelModeLevel = 0, parent = 0x0}(gdb)

初始化部分变量并验证

(gdb) n493     bool        log_unknown_top = false;(gdb) 496     Assert(!TransactionIdIsValid(s->transactionId));(gdb) 497     Assert(s->state == TRANS_INPROGRESS);(gdb)

获取事务号

(gdb) 503     if (IsInParallelMode() || IsParallelWorker())(gdb) 512     if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))(gdb) 545     if (isSubXact && XLogLogicalInfoActive() &&(gdb) 557     s->transactionId = GetNewTransactionId(isSubXact);(gdb) 558     if (!isSubXact)(gdb) p s->transactionId$15 = 2407(gdb)

注册,并设置其他信息

(gdb) n559         XactTopTransactionId = s->transactionId;(gdb) 561     if (isSubXact)(gdb) 568     if (!isSubXact)(gdb) 569         RegisterPredicateLockingXid(s->transactionId);(gdb) 576     currentOwner = CurrentResourceOwner;(gdb) 577     CurrentResourceOwner = s->curTransactionOwner;(gdb) 579     XactLockTableInsert(s->transactionId);(gdb) 581     CurrentResourceOwner = currentOwner;(gdb) 601     if (isSubXact && XLogStandbyInfoActive())(gdb) 635 }

完成调用

(gdb) GetTopTransactionId () at xact.c:393393     return XactTopTransactionId;(gdb)

"PostgreSQL中函数AssignTransactionId的实现逻辑是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0