千家信息网

PostgreSQL 源码解读(191)- 查询#107(聚合函数#12 - agg_retrieve_direct)

发表于:2025-01-20 作者:千家信息网编辑
千家信息网最后更新 2025年01月20日,本节继续介绍聚合函数的实现,主要介绍了不使用Hash算法的情况下聚合函数的实现,在这种情况下会先排序后执行聚合,在ExecAgg节点执行前,已完成排序的操作.下面介绍在已完成排序的情况下聚合的实现,主
千家信息网最后更新 2025年01月20日PostgreSQL 源码解读(191)- 查询#107(聚合函数#12 - agg_retrieve_direct)

本节继续介绍聚合函数的实现,主要介绍了不使用Hash算法的情况下聚合函数的实现,在这种情况下会先排序后执行聚合,在ExecAgg节点执行前,已完成排序的操作.下面介绍在已完成排序的情况下聚合的实现,主要实现函数是ExecAgg->agg_retrieve_direct.

下面是不使用HashAggregate情况下GroupAggregate的计划树:

",,,,,"select bh,avg(c1),min(c1),max(c2) from t_agg_simple group by bh;",,,"psql"2019-05-16 12:04:45.621 CST,"xdb","testdb",1545,"[local]",5cdce11a.609,5,"SELECT",2019-05-16 12:03:38 CST,3/4,0,LOG,00000,"plan:","   {PLANNEDSTMT    :commandType 1    :queryId 0    :hasReturning false    :hasModifyingCTE false    :canSetTag true    :transientPlan false    :dependsOnRole false    :parallelModeNeeded false    :jitFlags 0    :planTree       {AGG       :startup_cost 52.67       :total_cost 64.42       :plan_rows 200       :plan_width 98       :parallel_aware false       :parallel_safe true       :plan_node_id 0       :targetlist (...      )      :qual <>       :lefttree          {SORT          :startup_cost 52.67          :total_cost 54.52          :plan_rows 740          :plan_width 66          :parallel_aware false          :parallel_safe true          :plan_node_id 1          :targetlist (...         )         :qual <>          :lefttree             {SEQSCAN             :startup_cost 0.00             :total_cost 17.40             :plan_rows 740             :plan_width 66             :parallel_aware false             :parallel_safe true             :plan_node_id 2             :targetlist (...            )            :qual <>             :lefttree <>             :righttree <>             :initPlan <>             :extParam (b)            :allParam (b)            :scanrelid 1            }         :righttree <>          :initPlan <>          :extParam (b)         :allParam (b)         :numCols 1          :sortColIdx 1          :sortOperators 664          :collations 100          :nullsFirst false         }      :righttree <>       :initPlan <>       :extParam (b)      :allParam (b)      :aggstrategy 1       :aggsplit 0       :numCols 1       :grpColIdx 1       :grpOperators 98       :numGroups 200       :aggParams (b)      :groupingSets <>       :chain <>      }   :rtable (...   )   :resultRelations <>    :nonleafResultRelations <>    :rootResultRelations <>    :subplans <>    :rewindPlanIDs (b)   :rowMarks <>    :relationOids (o 270375)   :invalItems <>    :paramExecTypes <>    :utilityStmt <>    :stmt_location 0    :stmt_len 63   }

可以看到,在ExecAgg前会先执行ExecSort.

一、数据结构

AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体

/* --------------------- *    AggState information * *    ss.ss_ScanTupleSlot refers to output of underlying plan. *  ss.ss_ScanTupleSlot指的是基础计划的输出. *    (ss = ScanState,ps = PlanState) * *    Note: ss.ps.ps_ExprContext contains ecxt_aggvalues and *    ecxt_aggnulls arrays, which hold the computed agg values for the current *    input group during evaluation of an Agg node's output tuple(s).  We *    create a second ExprContext, tmpcontext, in which to evaluate input *    expressions and run the aggregate transition functions. *    注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls数组, *      这两个数组保存了在计算agg节点的输出元组时当前输入组已计算的agg值. * --------------------- *//* these structs are private in nodeAgg.c: *///在nodeAgg.c中私有的结构体typedef struct AggStatePerAggData *AggStatePerAgg;typedef struct AggStatePerTransData *AggStatePerTrans;typedef struct AggStatePerGroupData *AggStatePerGroup;typedef struct AggStatePerPhaseData *AggStatePerPhase;typedef struct AggStatePerHashData *AggStatePerHash;typedef struct AggState{    //第一个字段是NodeTag(继承自ScanState)    ScanState    ss;                /* its first field is NodeTag */    //targetlist和quals中所有的Aggref    List       *aggs;            /* all Aggref nodes in targetlist & quals */    //链表的大小(可以为0)    int            numaggs;        /* length of list (could be zero!) */    //pertrans条目大小    int            numtrans;        /* number of pertrans items */    //Agg策略模式    AggStrategy aggstrategy;    /* strategy mode */    //agg-splitting模式,参见nodes.h    AggSplit    aggsplit;        /* agg-splitting mode, see nodes.h */    //指向当前步骤数据的指针    AggStatePerPhase phase;        /* pointer to current phase data */    //步骤数(包括0)    int            numphases;        /* number of phases (including phase 0) */    //当前步骤    int            current_phase;    /* current phase number */    //per-Aggref信息    AggStatePerAgg peragg;        /* per-Aggref information */    //per-Trans状态信息    AggStatePerTrans pertrans;    /* per-Trans state information */    //长生命周期数据的ExprContexts(hashtable)    ExprContext *hashcontext;    /* econtexts for long-lived data (hashtable) */    ////长生命周期数据的ExprContexts(每一个GS使用)    ExprContext **aggcontexts;    /* econtexts for long-lived data (per GS) */    //输入表达式的ExprContext    ExprContext *tmpcontext;    /* econtext for input expressions */#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14    //当前活跃的aggcontext    ExprContext *curaggcontext; /* currently active aggcontext */    //当前活跃的aggregate(如存在)    AggStatePerAgg curperagg;    /* currently active aggregate, if any */#define FIELDNO_AGGSTATE_CURPERTRANS 16    //当前活跃的trans state    AggStatePerTrans curpertrans;    /* currently active trans state, if any */    //输入结束?    bool        input_done;        /* indicates end of input */    //Agg扫描结束?    bool        agg_done;        /* indicates completion of Agg scan */    //最后一个grouping set    int            projected_set;    /* The last projected grouping set */#define FIELDNO_AGGSTATE_CURRENT_SET 20    //将要解析的当前grouping set    int            current_set;    /* The current grouping set being evaluated */    //当前投影操作的分组列    Bitmapset  *grouped_cols;    /* grouped cols in current projection */    //倒序的分组列链表    List       *all_grouped_cols;    /* list of all grouped cols in DESC order */    /* These fields are for grouping set phase data */    //-------- 下面的列用于grouping set步骤数据    //所有步骤中最大的sets大小    int            maxsets;        /* The max number of sets in any phase */    //所有步骤的数组    AggStatePerPhase phases;    /* array of all phases */    //对于phases > 1,已排序的输入信息    Tuplesortstate *sort_in;    /* sorted input to phases > 1 */    //对于下一个步骤,输入已拷贝    Tuplesortstate *sort_out;    /* input is copied here for next phase */    //排序结果的slot    TupleTableSlot *sort_slot;    /* slot for sort results */    /* these fields are used in AGG_PLAIN and AGG_SORTED modes: */    //------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:    //per-group指针的grouping set编号数组    AggStatePerGroup *pergroups;    /* grouping set indexed array of per-group                                     * pointers */    //当前组的第一个元组拷贝    HeapTuple    grp_firstTuple; /* copy of first tuple of current group */    /* these fields are used in AGG_HASHED and AGG_MIXED modes: */    //--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:    //是否已填充hash表?    bool        table_filled;    /* hash table filled yet? */    //hash桶数?    int            num_hashes;    //相应的哈希表数据数组    AggStatePerHash perhash;    /* array of per-hashtable data */    //per-group指针的grouping set编号数组    AggStatePerGroup *hash_pergroup;    /* grouping set indexed array of                                         * per-group pointers */    /* support for evaluation of agg input expressions: */    //---------- agg输入表达式解析支持#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34    //首先是->pergroups,然后是hash_pergroup    AggStatePerGroup *all_pergroups;    /* array of first ->pergroups, than                                         * ->hash_pergroup */    //投影实现机制    ProjectionInfo *combinedproj;    /* projection machinery */} AggState;/* Primitive options supported by nodeAgg.c: *///nodeag .c支持的基本选项#define AGGSPLITOP_COMBINE        0x01    /* substitute combinefn for transfn */#define AGGSPLITOP_SKIPFINAL    0x02    /* skip finalfn, return state as-is */#define AGGSPLITOP_SERIALIZE    0x04    /* apply serializefn to output */#define AGGSPLITOP_DESERIALIZE    0x08    /* apply deserializefn to input *//* Supported operating modes (i.e., useful combinations of these options): *///支持的操作模式typedef enum AggSplit{    /* Basic, non-split aggregation: */    //基本 : 非split聚合    AGGSPLIT_SIMPLE = 0,    /* Initial phase of partial aggregation, with serialization: */    //部分聚合的初始步骤,序列化    AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,    /* Final phase of partial aggregation, with deserialization: */    //部分聚合的最终步骤,反序列化    AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE} AggSplit;/* Test whether an AggSplit value selects each primitive option: *///测试AggSplit选择了哪些基本选项#define DO_AGGSPLIT_COMBINE(as)        (((as) & AGGSPLITOP_COMBINE) != 0)#define DO_AGGSPLIT_SKIPFINAL(as)    (((as) & AGGSPLITOP_SKIPFINAL) != 0)#define DO_AGGSPLIT_SERIALIZE(as)    (((as) & AGGSPLITOP_SERIALIZE) != 0)#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)

二、源码解读

agg_retrieve_direct
agg_retrieve_direct计算聚合的最终结果,适用于不使用Hash算法的情况.

/* * ExecAgg for non-hashed case * 适用于不使用Hash算法的情况. */static TupleTableSlot *agg_retrieve_direct(AggState *aggstate){    Agg           *node = aggstate->phase->aggnode;//aggstate Node    ExprContext *econtext;//表达式解析上下文    ExprContext *tmpcontext;//临时上下文    AggStatePerAgg peragg;//聚合    AggStatePerGroup *pergroups;//分组信息    TupleTableSlot *outerslot;//outer元组slot    TupleTableSlot *firstSlot;//第1个slot    TupleTableSlot *result;//结果元组    bool        hasGroupingSets = aggstate->phase->numsets > 0;//是否有grouping set    int            numGroupingSets = Max(aggstate->phase->numsets, 1);    int            currentSet;    int            nextSetSize;    int            numReset;    int            i;    /*     * get state info from node     * 获取状态信息     *     * econtext is the per-output-tuple expression context     * econtext是per-output-tuple表达式上下文     *     * tmpcontext is the per-input-tuple expression context     * tmpcontext是per-input-tuple表达式上下文     */    econtext = aggstate->ss.ps.ps_ExprContext;    tmpcontext = aggstate->tmpcontext;    peragg = aggstate->peragg;    pergroups = aggstate->pergroups;    firstSlot = aggstate->ss.ss_ScanTupleSlot;    /*     * We loop retrieving groups until we find one matching     * aggstate->ss.ps.qual     * 循环检索分组直至找到一个匹配aggstate->ss.ps.qual表达式的分组.     *     * For grouping sets, we have the invariant that aggstate->projected_set     * is either -1 (initial call) or the index (starting from 0) in     * gset_lengths for the group we just completed (either by projecting a     * row or by discarding it in the qual).     * 对于grouping set,aggstate->projected_set是个不变量,     *   要么是-1(初始调用),要么是已完成的分组在gset_lengths中的索引编号(从0开始)     * (通过投影一行或者在表达式中丢弃一行实现)     */    while (!aggstate->agg_done)    {        //----------- 循环处理        /*         * Clear the per-output-tuple context for each group, as well as         * aggcontext (which contains any pass-by-ref transvalues of the old         * group).  Some aggregate functions store working state in child         * contexts; those now get reset automatically without us needing to         * do anything special.         * 跟aggcontext(包含原分组通过引用传递的转换值)一样,每一个分组都会重置per-output-tuple上下文.         * 某些聚合函数在子上下文中存储工作状态,这种情况下,不需要做额外的工作,会自动重置.         *         * We use ReScanExprContext not just ResetExprContext because we want         * any registered shutdown callbacks to be called.  That allows         * aggregate functions to ensure they've cleaned up any non-memory         * resources.         * 使用ReScanExprContext而不是ResetExprContext是因为我们希望所有已注册的shutdown回调函数可以调用.         * 这可以允许聚合函数确保它们已清理了所有非内存类资源.         */        ReScanExprContext(econtext);        /*         * Determine how many grouping sets need to be reset at this boundary.         * 确定有多少grouping sets在此边界下需要重置.         */        if (aggstate->projected_set >= 0 &&            aggstate->projected_set < numGroupingSets)            numReset = aggstate->projected_set + 1;        else            numReset = numGroupingSets;        /*         * numReset can change on a phase boundary, but that's OK; we want to         * reset the contexts used in _this_ phase, and later, after possibly         * changing phase, initialize the right number of aggregates for the         * _new_ phase.         * numReset可能在每个阶段的边界处出现变化,但这样也不会出现问题.         * 我们希望重置在该阶段的上下文,并在稍后在可能变化的阶段之后,为新的阶段初始化正确的聚合编号.         */        for (i = 0; i < numReset; i++)        {            ReScanExprContext(aggstate->aggcontexts[i]);        }        /*         * Check if input is complete and there are no more groups to project         * in this phase; move to next phase or mark as done.         * 检查输入是否完成并且没有更多的组在本阶段用于投影.         * 移到下一个阶段或者标记为已完成.         */        if (aggstate->input_done == true &&            aggstate->projected_set >= (numGroupingSets - 1))        {            if (aggstate->current_phase < aggstate->numphases - 1)            {                //仍在处理中                initialize_phase(aggstate, aggstate->current_phase + 1);                aggstate->input_done = false;                aggstate->projected_set = -1;                numGroupingSets = Max(aggstate->phase->numsets, 1);                node = aggstate->phase->aggnode;                numReset = numGroupingSets;            }            else if (aggstate->aggstrategy == AGG_MIXED)            {                //照理,不会进入这个分支(AGG_MIXED不是Hash才有吗?)                /*                 * Mixed mode; we've output all the grouped stuff and have                 * full hashtables, so switch to outputting those.                 */                initialize_phase(aggstate, 0);                aggstate->table_filled = true;                ResetTupleHashIterator(aggstate->perhash[0].hashtable,                                       &aggstate->perhash[0].hashiter);                select_current_set(aggstate, 0, true);                return agg_retrieve_hash_table(aggstate);            }            else            {                //已完成处理                aggstate->agg_done = true;                break;            }        }        /*         * Get the number of columns in the next grouping set after the last         * projected one (if any). This is the number of columns to compare to         * see if we reached the boundary of that set too.         * 在最后一次投影操作后获得下一个grouping set的列数.         * 这是要比较的列数,看看我们是否也达到了集合的边界。         */        if (aggstate->projected_set >= 0 &&            aggstate->projected_set < (numGroupingSets - 1))            nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];        else            nextSetSize = 0;        /*----------         * If a subgroup for the current grouping set is present, project it.         * 如果子分组已存在,则执行投影.         *         * We have a new group if:         *    - we're out of input but haven't projected all grouping sets         *      (checked above)         * OR         *      - we already projected a row that wasn't from the last grouping         *        set         *      AND         *      - the next grouping set has at least one grouping column (since         *        empty grouping sets project only once input is exhausted)         *      AND         *      - the previous and pending rows differ on the grouping columns         *        of the next grouping set         *          * 如果出现下面情况,则会有新的分组:         *   - 已完成输入处理,但仍未投影所有的grouping set(上面会执行检查)         *   - 已投影了一行,但这一行并不是从最后一个grouping set而来的          *   同时         *   - 下一个grouping set至少有要一个grouping列(因为空grouping sets投影一次输入就销毁了)         *   同时         *   - 上一个和接下来的行与下一个grouping set中的分组列不同         *----------         */        tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;        if (aggstate->input_done ||            (node->aggstrategy != AGG_PLAIN &&             aggstate->projected_set != -1 &&             aggstate->projected_set < (numGroupingSets - 1) &&             nextSetSize > 0 &&             !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],                               tmpcontext)))        {            aggstate->projected_set += 1;            Assert(aggstate->projected_set < numGroupingSets);            Assert(nextSetSize > 0 || aggstate->input_done);        }        else        {            /*             * We no longer care what group we just projected, the next             * projection will always be the first (or only) grouping set             * (unless the input proves to be empty).             * 不再关心刚才已投影的分组,下一个投影通常会是第一个grouping set(除非输入已验证为空)             */            aggstate->projected_set = 0;            /*             * If we don't already have the first tuple of the new group,             * fetch it from the outer plan.             * 如果不再有新分组的第一个元组,则从outer plan中提取一行.             */            if (aggstate->grp_firstTuple == NULL)            {                //提取一行                outerslot = fetch_input_tuple(aggstate);                if (!TupIsNull(outerslot))                {                    //成功提取一行                    /*                     * Make a copy of the first input tuple; we will use this                     * for comparisons (in group mode) and for projection.                     * 拷贝之                     */                    aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);                }                else                {                    /* outer plan produced no tuples at all */                    //不再产生新行                    if (hasGroupingSets)                    {                        //----------- 存在grouping set                        /*                         * If there was no input at all, we need to project                         * rows only if there are grouping sets of size 0.                         * Note that this implies that there can't be any                         * references to ungrouped Vars, which would otherwise                         * cause issues with the empty output slot.                         * 如果根本就不存在输入,只需要在大小为0的grouping set上投影哪些行即可.                         * 注意这意味着不能依赖未分组的Vars,否则的话会导致输出slot为空.                         *                         * XXX: This is no longer true, we currently deal with                         * this in finalize_aggregates().                         * XXX: 这已不复存在,已在finalize_aggregates中进行处理.                         */                        aggstate->input_done = true;                        while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)                        {                            aggstate->projected_set += 1;                            if (aggstate->projected_set >= numGroupingSets)                            {                                /*                                 * We can't set agg_done here because we might                                 * have more phases to do, even though the                                 * input is empty. So we need to restart the                                 * whole outer loop.                                 * 就算输入为空,但也不能在这里还设置agg_done为T,因为可能还有后续的阶段需要处理.                                 * 因此需要重启整个外循环.                                 */                                break;                            }                        }                        if (aggstate->projected_set >= numGroupingSets)                            continue;                    }                    else                    {                        aggstate->agg_done = true;                        /* If we are grouping, we should produce no tuples too */                        if (node->aggstrategy != AGG_PLAIN)                            return NULL;                    }                }            }            /*             * Initialize working state for a new input tuple group.             * 为新输入的元组组初始化工作状态.             */            initialize_aggregates(aggstate, pergroups, numReset);            if (aggstate->grp_firstTuple != NULL)            {                /*                 * Store the copied first input tuple in the tuple table slot                 * reserved for it.  The tuple will be deleted when it is                 * cleared from the slot.                 * 在元组表slot中拷贝存储第一个输入元组.                 * 该元组在清理slot时会被删除.                 */                ExecStoreTuple(aggstate->grp_firstTuple,                               firstSlot,                               InvalidBuffer,                               true);                aggstate->grp_firstTuple = NULL;    /* 不需要保留双份指针. don't keep two pointers */                /* set up for first advance_aggregates call */                //为第一次advance_aggregates调用设置参数                tmpcontext->ecxt_outertuple = firstSlot;                /*                 * Process each outer-plan tuple, and then fetch the next one,                 * until we exhaust the outer plan or cross a group boundary.                 * 处理每一个outer-plan元组,然后提取下一个,                 *   直至outer plan已消耗完毕或者已跨越分组边界.                 */                for (;;)                {                    /*                     * During phase 1 only of a mixed agg, we need to update                     * hashtables as well in advance_aggregates.                     * 只有在混合AGG的第一阶段,我们还需要在advance_aggregates中更新哈希表.                     */                    if (aggstate->aggstrategy == AGG_MIXED &&                        aggstate->current_phase == 1)                    {                        lookup_hash_entries(aggstate);                    }                    /* Advance the aggregates (or combine functions) */                    //推动聚合(或者组合函数)                    advance_aggregates(aggstate);                    /* Reset per-input-tuple context after each tuple */                    //在每一个元组后重置per-input-tuple上下文                    ResetExprContext(tmpcontext);                    outerslot = fetch_input_tuple(aggstate);                    if (TupIsNull(outerslot))                    {                        /* no more outer-plan tuples available */                        //已无更多可用的outer slot                        if (hasGroupingSets)                        {                            aggstate->input_done = true;                            break;                        }                        else                        {                            aggstate->agg_done = true;                            break;                        }                    }                    /* set up for next advance_aggregates call */                    //为下一次advance_aggregates调用作准备                    tmpcontext->ecxt_outertuple = outerslot;                    /*                     * If we are grouping, check whether we've crossed a group                     * boundary.                     * 如果是分组,检查是否已跨越分组边界.                     */                    if (node->aggstrategy != AGG_PLAIN)                    {                        tmpcontext->ecxt_innertuple = firstSlot;                        if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],                                      tmpcontext))                        {                            aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);                            break;                        }                    }                }            }            /*             * Use the representative input tuple for any references to             * non-aggregated input columns in aggregate direct args, the node             * qual, and the tlist.  (If we are not grouping, and there are no             * input rows at all, we will come here with an empty firstSlot             * ... but if not grouping, there can't be any references to             * non-aggregated input columns, so no problem.)             * 对于聚合直接参数/节点表达式和投影列tlist中的非聚合输入列的引用,使用代表性的输入元组.             * (如果不是grouping而且没有输入元组,将使用空的firstSlot,但如果是非grouping,             *  不可能存在依赖非聚合输入列,因此不会存在问题)             */            econtext->ecxt_outertuple = firstSlot;        }        Assert(aggstate->projected_set >= 0);        currentSet = aggstate->projected_set;        //投影处理        prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);        select_current_set(aggstate, currentSet, false);        finalize_aggregates(aggstate,                            peragg,                            pergroups[currentSet]);        /*         * If there's no row to project right now, we must continue rather         * than returning a null since there might be more groups.         * 如不需要马上进行进行投影,必须继续执行而不是返回NULL,因为还需要处理更多的groups.         */        result = project_aggregates(aggstate);        if (result)            return result;    }    /* No more groups */    //DONE!    return NULL;}

三、跟踪分析

测试脚本

-- 禁用并行set max_parallel_workers_per_gather=0;-- 禁用hashaggset enable_hashagg = off;select bh,avg(c1),min(c1),max(c2) from t_agg_simple group by bh;

跟踪分析

(gdb) b agg_retrieve_directBreakpoint 1 at 0x6ee511: file nodeAgg.c, line 1572.(gdb) cContinuing.Breakpoint 1, agg_retrieve_direct (aggstate=0x268f640) at nodeAgg.c:15721572        Agg           *node = aggstate->phase->aggnode;

输入参数

(gdb) p *aggstate$1 = {ss = {ps = {type = T_AggState, plan = 0x25af578, state = 0x268f428, ExecProcNode = 0x6ee438 ,       ExecProcNodeReal = 0x6ee438 , instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,       qual = 0x0, lefttree = 0x268faf0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,       ps_ResultTupleSlot = 0x2690d50, ps_ExprContext = 0x268fa30, ps_ProjInfo = 0x2690e90, scandesc = 0x26907a0},     ss_currentRelation = 0x0, ss_currentScanDesc = 0x0, ss_ScanTupleSlot = 0x2690a78}, aggs = 0x25d4290, numaggs = 3,   numtrans = 3, aggstrategy = AGG_SORTED, aggsplit = AGGSPLIT_SIMPLE, phase = 0x2691290, numphases = 2, current_phase = 1,   peragg = 0x2690f28, pertrans = 0x26b24d0, hashcontext = 0x0, aggcontexts = 0x268f858, tmpcontext = 0x268f878,   curaggcontext = 0x268f970, curperagg = 0x0, curpertrans = 0x0, input_done = false, agg_done = false, projected_set = -1,   current_set = 0, grouped_cols = 0x0, all_grouped_cols = 0x0, maxsets = 1, phases = 0x2691258, sort_in = 0x0,   sort_out = 0x0, sort_slot = 0x0, pergroups = 0x25d4da0, grp_firstTuple = 0x0, table_filled = false, num_hashes = 0,   perhash = 0x0, hash_pergroup = 0x0, all_pergroups = 0x25d4da0, combinedproj = 0x0}

需要2个阶段,分别是AGG_PLAIN/AGG_SORTED

(gdb) p aggstate->phases[0]$2 = {aggstrategy = AGG_PLAIN, numsets = 0, gset_lengths = 0x0, grouped_cols = 0x0, eqfunctions = 0x0, aggnode = 0x0,   sortnode = 0x0, evaltrans = 0x0}(gdb) p aggstate->phases[1]$3 = {aggstrategy = AGG_SORTED, numsets = 0, gset_lengths = 0x0, grouped_cols = 0x0, eqfunctions = 0x25d4388,   aggnode = 0x25af578, sortnode = 0x0, evaltrans = 0x25d5488}

不存在grouping set.
变量numGroupingSets设置为1

(gdb) n1580        bool        hasGroupingSets = aggstate->phase->numsets > 0;(gdb) p aggstate->phase->numsets$5 = 0(gdb) n1581        int            numGroupingSets = Max(aggstate->phase->numsets, 1);(gdb) n1594        econtext = aggstate->ss.ps.ps_ExprContext;(gdb) p numGroupingSets$6 = 1

设置内存上下文

(gdb) n1595        tmpcontext = aggstate->tmpcontext;(gdb) 1597        peragg = aggstate->peragg;(gdb) p *econtext$7 = {type = T_ExprContext, ecxt_scantuple = 0x0, ecxt_innertuple = 0x0, ecxt_outertuple = 0x0,   ecxt_per_query_memory = 0x268f310, ecxt_per_tuple_memory = 0x26a6370, ecxt_param_exec_vals = 0x0,   ecxt_param_list_info = 0x0, ecxt_aggvalues = 0x25d4d48, ecxt_aggnulls = 0x25d4d80, caseValue_datum = 0,   caseValue_isNull = true, domainValue_datum = 0, domainValue_isNull = true, ecxt_estate = 0x268f428, ecxt_callbacks = 0x0}(gdb) p *tmpcontext$8 = {type = T_ExprContext, ecxt_scantuple = 0x0, ecxt_innertuple = 0x0, ecxt_outertuple = 0x0,   ecxt_per_query_memory = 0x268f310, ecxt_per_tuple_memory = 0x2691320, ecxt_param_exec_vals = 0x0,   ecxt_param_list_info = 0x0, ecxt_aggvalues = 0x0, ecxt_aggnulls = 0x0, caseValue_datum = 0, caseValue_isNull = true,   domainValue_datum = 0, domainValue_isNull = true, ecxt_estate = 0x268f428, ecxt_callbacks = 0x0}(gdb)

获取聚合信息,一共有3个

(gdb) n1598        pergroups = aggstate->pergroups;(gdb)1599        firstSlot = aggstate->ss.ss_ScanTupleSlot;(gdb) p *peragg$9 = {aggref = 0x26a02d0, transno = 0, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,     fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},   numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}(gdb) p peragg[0]$10 = {aggref = 0x26a02d0, transno = 0, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,     fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},   numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}(gdb) p peragg[1]$11 = {aggref = 0x26a0048, transno = 1, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,     fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},   numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}(gdb) p peragg[2]$12 = {aggref = 0x269fdc0, transno = 2, finalfn_oid = 1964, finalfn = {fn_addr = 0x978251 , fn_oid = 1964,     fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x268f310,     fn_expr = 0x25d5190}, numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = -1, resulttypeByVal = false,   shareable = true}

分组只有一个

(gdb) p pergroups[0]$14 = (AggStatePerGroup) 0x25d4dc0(gdb) p *pergroups[0]$15 = {transValue = 0, transValueIsNull = false, noTransValue = false}

进入循环

(gdb) n1610        while (!aggstate->agg_done)(gdb) 1624            ReScanExprContext(econtext);(gdb)

重置内存上下文

(gdb) 1629            if (aggstate->projected_set >= 0 &&(gdb) 1633                numReset = numGroupingSets;(gdb) 1642            for (i = 0; i < numReset; i++)(gdb) 1644                ReScanExprContext(aggstate->aggcontexts[i]);(gdb) 1642            for (i = 0; i < numReset; i++)(gdb)

检查输入是否已完成处理/本组已完成投影(实际不满足条件)

(gdb) 1651            if (aggstate->input_done == true &&(gdb) 1688            if (aggstate->projected_set >= 0 &&(gdb) p aggstate->input_done$16 = false(gdb) p aggstate->projected_set$17 = -1

设置待处理的元组,为NULL

(gdb) 1711            tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;(gdb) 1712            if (aggstate->input_done ||(gdb) (gdb) p *tmpcontext->ecxt_innertupleCannot access memory at address 0x0

如果子分组已存在,则执行投影(实际不满足条件).

(gdb) n1713                (node->aggstrategy != AGG_PLAIN &&(gdb) p node->aggstrategy$18 = AGG_SORTED(gdb) n1712            if (aggstate->input_done ||(gdb) 1714                 aggstate->projected_set != -1 &&(gdb) 1713                (node->aggstrategy != AGG_PLAIN &&(gdb) 1732                aggstate->projected_set = 0;(gdb) p aggstate->input_done$19 = false(gdb) p aggstate->projected_set$20 = -1(gdb) p node->aggstrategy$21 = AGG_SORTED(gdb)

从outer plan中提取一行,并拷贝为首行

(gdb) n1738                if (aggstate->grp_firstTuple == NULL)(gdb) p aggstate->grp_firstTuple$22 = (HeapTuple) 0x0(gdb) n1740                    outerslot = fetch_input_tuple(aggstate);(gdb) 1741                    if (!TupIsNull(outerslot))(gdb) p *outerslot$23 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false,   tts_tuple = 0x26909f8, tts_tupleDescriptor = 0x26907a0, tts_mcxt = 0x268f310, tts_buffer = 0, tts_nvalid = 0,   tts_values = 0x2690a18, tts_isnull = 0x2690a30, tts_mintuple = 0x26b8ad8, tts_minhdr = {t_len = 40, t_self = {ip_blkid = {        bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x26b8ad0}, tts_off = 0,   tts_fixedTupleDescriptor = true}(gdb) (gdb) n1747                        aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);(gdb)

为新输入的元组组初始化工作状态.

(gdb) 1797                initialize_aggregates(aggstate, pergroups, numReset);

把元组拷贝到内存上下文中,并执行聚合运算(advance_aggregates)

(gdb) n1799                if (aggstate->grp_firstTuple != NULL)(gdb) 1806                    ExecStoreTuple(aggstate->grp_firstTuple,(gdb) 1810                    aggstate->grp_firstTuple = NULL;    /* don't keep two pointers */(gdb) 1813                    tmpcontext->ecxt_outertuple = firstSlot;(gdb) 1825                        if (aggstate->aggstrategy == AGG_MIXED &&(gdb) 1832                        advance_aggregates(aggstate);(gdb) 1835                        ResetExprContext(tmpcontext);(gdb)

继续提取行,拷贝到内存上下文中

(gdb) n1837                        outerslot = fetch_input_tuple(aggstate);(gdb) 1838                        if (TupIsNull(outerslot))(gdb) 1853                        tmpcontext->ecxt_outertuple = outerslot;(gdb) 1859                        if (node->aggstrategy != AGG_PLAIN)(gdb) 1861                            tmpcontext->ecxt_innertuple = firstSlot;(gdb) 1862                            if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],(gdb) 1869                    }

执行聚合运算,并继续提取下一行

825                        if (aggstate->aggstrategy == AGG_MIXED &&(gdb) 1832                        advance_aggregates(aggstate);(gdb) 1835                        ResetExprContext(tmpcontext);(gdb) 1837                        outerslot = fetch_input_tuple(aggstate);(gdb) 1838                        if (TupIsNull(outerslot))(gdb) 1853                        tmpcontext->ecxt_outertuple = outerslot;(gdb) 1859                        if (node->aggstrategy != AGG_PLAIN)(gdb) 1861                            tmpcontext->ecxt_innertuple = firstSlot;(gdb)

如果是分组,检查是否已跨越分组边界,如已越界在跳出循环.

1862                            if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],(gdb) 1865                                aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);(gdb) 1866                                break;

已获得一行结果行,返回结果

(gdb) 1880                econtext->ecxt_outertuple = firstSlot;(gdb) n1883            Assert(aggstate->projected_set >= 0);(gdb) 1885            currentSet = aggstate->projected_set;(gdb) 1887            prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);(gdb) p aggstate->projected_set$24 = 0(gdb) n1889            select_current_set(aggstate, currentSet, false);(gdb) 1893                                pergroups[currentSet]);(gdb) 1891            finalize_aggregates(aggstate,(gdb) 1899            result = project_aggregates(aggstate);(gdb) 1900            if (result)(gdb) 1901                return result;(gdb) p *result$25 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false,   tts_tuple = 0x0, tts_tupleDescriptor = 0x2690b38, tts_mcxt = 0x268f310, tts_buffer = 0, tts_nvalid = 4,   tts_values = 0x2690db0, tts_isnull = 0x2690dd0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {        bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}(gdb)

DONE!

四、参考资料

PostgreSQL 源码解读(178)- 查询#95(聚合函数)#1相关数据结构
PostgreSQL 源码解读(186)- 查询#102(聚合函数#7-advance_aggregates)

0