千家信息网

PostgreSQL 源码解读(189)- 查询#105(聚合函数#10 - agg_retrieve_hash_table)

发表于:2025-02-09 作者:千家信息网编辑
千家信息网最后更新 2025年02月09日,本节继续介绍聚合函数的实现,主要介绍了agg_retrieve_hash_table函数中与投影相关的实现逻辑,包括函数prepare_projection_slot/finalize_aggrega
千家信息网最后更新 2025年02月09日PostgreSQL 源码解读(189)- 查询#105(聚合函数#10 - agg_retrieve_hash_table)

本节继续介绍聚合函数的实现,主要介绍了agg_retrieve_hash_table函数中与投影相关的实现逻辑,包括函数prepare_projection_slot/finalize_aggregates/project_aggregates.

一、数据结构

AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体

/* --------------------- *    AggState information * *    ss.ss_ScanTupleSlot refers to output of underlying plan. *  ss.ss_ScanTupleSlot指的是基础计划的输出. *    (ss = ScanState,ps = PlanState) * *    Note: ss.ps.ps_ExprContext contains ecxt_aggvalues and *    ecxt_aggnulls arrays, which hold the computed agg values for the current *    input group during evaluation of an Agg node's output tuple(s).  We *    create a second ExprContext, tmpcontext, in which to evaluate input *    expressions and run the aggregate transition functions. *    注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls数组, *      这两个数组保存了在计算agg节点的输出元组时当前输入组已计算的agg值. * --------------------- *//* these structs are private in nodeAgg.c: *///在nodeAgg.c中私有的结构体typedef struct AggStatePerAggData *AggStatePerAgg;typedef struct AggStatePerTransData *AggStatePerTrans;typedef struct AggStatePerGroupData *AggStatePerGroup;typedef struct AggStatePerPhaseData *AggStatePerPhase;typedef struct AggStatePerHashData *AggStatePerHash;typedef struct AggState{    //第一个字段是NodeTag(继承自ScanState)    ScanState    ss;                /* its first field is NodeTag */    //targetlist和quals中所有的Aggref    List       *aggs;            /* all Aggref nodes in targetlist & quals */    //链表的大小(可以为0)    int            numaggs;        /* length of list (could be zero!) */    //pertrans条目大小    int            numtrans;        /* number of pertrans items */    //Agg策略模式    AggStrategy aggstrategy;    /* strategy mode */    //agg-splitting模式,参见nodes.h    AggSplit    aggsplit;        /* agg-splitting mode, see nodes.h */    //指向当前步骤数据的指针    AggStatePerPhase phase;        /* pointer to current phase data */    //步骤数(包括0)    int            numphases;        /* number of phases (including phase 0) */    //当前步骤    int            current_phase;    /* current phase number */    //per-Aggref信息    AggStatePerAgg peragg;        /* per-Aggref information */    //per-Trans状态信息    AggStatePerTrans pertrans;    /* per-Trans state information */    //长生命周期数据的ExprContexts(hashtable)    ExprContext *hashcontext;    /* econtexts for long-lived data (hashtable) */    ////长生命周期数据的ExprContexts(每一个GS使用)    ExprContext **aggcontexts;    /* econtexts for long-lived data (per GS) */    //输入表达式的ExprContext    ExprContext *tmpcontext;    /* econtext for input expressions */#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14    //当前活跃的aggcontext    ExprContext *curaggcontext; /* currently active aggcontext */    //当前活跃的aggregate(如存在)    AggStatePerAgg curperagg;    /* currently active aggregate, if any */#define FIELDNO_AGGSTATE_CURPERTRANS 16    //当前活跃的trans state    AggStatePerTrans curpertrans;    /* currently active trans state, if any */    //输入结束?    bool        input_done;        /* indicates end of input */    //Agg扫描结束?    bool        agg_done;        /* indicates completion of Agg scan */    //最后一个grouping set    int            projected_set;    /* The last projected grouping set */#define FIELDNO_AGGSTATE_CURRENT_SET 20    //将要解析的当前grouping set    int            current_set;    /* The current grouping set being evaluated */    //当前投影操作的分组列    Bitmapset  *grouped_cols;    /* grouped cols in current projection */    //倒序的分组列链表    List       *all_grouped_cols;    /* list of all grouped cols in DESC order */    /* These fields are for grouping set phase data */    //-------- 下面的列用于grouping set步骤数据    //所有步骤中最大的sets大小    int            maxsets;        /* The max number of sets in any phase */    //所有步骤的数组    AggStatePerPhase phases;    /* array of all phases */    //对于phases > 1,已排序的输入信息    Tuplesortstate *sort_in;    /* sorted input to phases > 1 */    //对于下一个步骤,输入已拷贝    Tuplesortstate *sort_out;    /* input is copied here for next phase */    //排序结果的slot    TupleTableSlot *sort_slot;    /* slot for sort results */    /* these fields are used in AGG_PLAIN and AGG_SORTED modes: */    //------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:    //per-group指针的grouping set编号数组    AggStatePerGroup *pergroups;    /* grouping set indexed array of per-group                                     * pointers */    //当前组的第一个元组拷贝    HeapTuple    grp_firstTuple; /* copy of first tuple of current group */    /* these fields are used in AGG_HASHED and AGG_MIXED modes: */    //--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:    //是否已填充hash表?    bool        table_filled;    /* hash table filled yet? */    //hash桶数?    int            num_hashes;    //相应的哈希表数据数组    AggStatePerHash perhash;    /* array of per-hashtable data */    //per-group指针的grouping set编号数组    AggStatePerGroup *hash_pergroup;    /* grouping set indexed array of                                         * per-group pointers */    /* support for evaluation of agg input expressions: */    //---------- agg输入表达式解析支持#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34    //首先是->pergroups,然后是hash_pergroup    AggStatePerGroup *all_pergroups;    /* array of first ->pergroups, than                                         * ->hash_pergroup */    //投影实现机制    ProjectionInfo *combinedproj;    /* projection machinery */} AggState;/* Primitive options supported by nodeAgg.c: *///nodeag .c支持的基本选项#define AGGSPLITOP_COMBINE        0x01    /* substitute combinefn for transfn */#define AGGSPLITOP_SKIPFINAL    0x02    /* skip finalfn, return state as-is */#define AGGSPLITOP_SERIALIZE    0x04    /* apply serializefn to output */#define AGGSPLITOP_DESERIALIZE    0x08    /* apply deserializefn to input *//* Supported operating modes (i.e., useful combinations of these options): *///支持的操作模式typedef enum AggSplit{    /* Basic, non-split aggregation: */    //基本 : 非split聚合    AGGSPLIT_SIMPLE = 0,    /* Initial phase of partial aggregation, with serialization: */    //部分聚合的初始步骤,序列化    AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,    /* Final phase of partial aggregation, with deserialization: */    //部分聚合的最终步骤,反序列化    AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE} AggSplit;/* Test whether an AggSplit value selects each primitive option: *///测试AggSplit选择了哪些基本选项#define DO_AGGSPLIT_COMBINE(as)        (((as) & AGGSPLITOP_COMBINE) != 0)#define DO_AGGSPLIT_SKIPFINAL(as)    (((as) & AGGSPLITOP_SKIPFINAL) != 0)#define DO_AGGSPLIT_SERIALIZE(as)    (((as) & AGGSPLITOP_SERIALIZE) != 0)#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)

二、源码解读

prepare_projection_slot
prepare_projection_slot函数基于指定的典型元组slot和grouping set准备finalize和project.
比如初始化isnull数组等.

/* * Prepare to finalize and project based on the specified representative tuple * slot and grouping set. * 基于指定的典型元组slot和grouping set准备finalize和project. * * In the specified tuple slot, force to null all attributes that should be * read as null in the context of the current grouping set.  Also stash the * current group bitmap where GroupingExpr can get at it. * 在指定的元组slot,强制在当前grouping set上下文中应为null的所有属性值为null. * 还可以将当前组位图保存在GroupingExpr可以获得的位置. * * This relies on three conditions: * 这取决于下面3个条件: * * 1) Nothing is ever going to try and extract the whole tuple from this slot, * only reference it in evaluations, which will only access individual * attributes. * 1) 永远不会尝试从该slot中提取整个元组,只是在解析中依赖它,这只会访问单个属性. * * 2) No system columns are going to need to be nulled. (If a system column is * referenced in a group clause, it is actually projected in the outer plan * tlist.) * 2) 系统列不需要设置为null. *    (如在group语句中依赖系统列,实际上已在outer plan tlist中已完成投影) * * 3) Within a given phase, we never need to recover the value of an attribute * once it has been set to null. * 3) 在给定的阶段,一旦属性被设置为null,就不需要恢复属性值. * * Poking into the slot this way is a bit ugly, but the consensus is that the * alternative was worse. * 以这种方法使用slot有点丑陋,但其他方式更糟糕. */static voidprepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet){    if (aggstate->phase->grouped_cols)    {        Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];        aggstate->grouped_cols = grouped_cols;        if (slot->tts_isempty)        {            /*             * Force all values to be NULL if working on an empty input tuple             * (i.e. an empty grouping set for which no input rows were             * supplied).             * 如输入tuple为空,则强制所有值为NULL.             * (如不提供输入行的空grouping set)             */            ExecStoreAllNullTuple(slot);        }        else if (aggstate->all_grouped_cols)        {            ListCell   *lc;            /* all_grouped_cols is arranged in desc order */            //all_grouped_cols以倒序的方式组织            slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));            foreach(lc, aggstate->all_grouped_cols)            {                int            attnum = lfirst_int(lc);                if (!bms_is_member(attnum, grouped_cols))                    slot->tts_isnull[attnum - 1] = true;            }        }    }}

finalize_aggregates
finalize_aggregates函数计算某一组所有聚合的最终值,实现函数是finalize_aggregate,该实现函数下节再行介绍.

/* * Compute the final value of all aggregates for one group. * 计算某一组所有聚合的最终值 * * This function handles only one grouping set at a time, which the caller must * have selected.  It's also the caller's responsibility to adjust the supplied * pergroup parameter to point to the current set's transvalues. * 该函数一次只会处理一个grouping set(调用者负责选择). * 调用者同样有职责调整提供的pergroup参数为指向当前集合的transvalues. * * Results are stored in the output econtext aggvalues/aggnulls. */static voidfinalize_aggregates(AggState *aggstate,                    AggStatePerAgg peraggs,                    AggStatePerGroup pergroup){    ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;    Datum       *aggvalues = econtext->ecxt_aggvalues;    bool       *aggnulls = econtext->ecxt_aggnulls;    int            aggno;    int            transno;    /*     * If there were any DISTINCT and/or ORDER BY aggregates, sort their     * inputs and run the transition functions.     * 如存在DISTINCT或ORDER BY 聚合,排序这些输入并执行转换函数.     */    //遍历转换函数    for (transno = 0; transno < aggstate->numtrans; transno++)    {        //转换函数        AggStatePerTrans pertrans = &aggstate->pertrans[transno];        //pergroup        AggStatePerGroup pergroupstate;        pergroupstate = &pergroup[transno];        if (pertrans->numSortCols > 0)        {            //--- 存在DISTINCT/ORDER BY            //验证,Hash不需要排序            Assert(aggstate->aggstrategy != AGG_HASHED &&                   aggstate->aggstrategy != AGG_MIXED);            if (pertrans->numInputs == 1)                //单独                process_ordered_aggregate_single(aggstate,                                                 pertrans,                                                 pergroupstate);            else                //多个                process_ordered_aggregate_multi(aggstate,                                                pertrans,                                                pergroupstate);        }    }    /*     * Run the final functions.     * 执行获取最终值的函数     */    //遍历聚合    for (aggno = 0; aggno < aggstate->numaggs; aggno++)    {        //获取peragg        AggStatePerAgg peragg = &peraggs[aggno];        int            transno = peragg->transno;        AggStatePerGroup pergroupstate;        //pergroup        pergroupstate = &pergroup[transno];        if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))            //并行处理结果            finalize_partialaggregate(aggstate, peragg, pergroupstate,                                      &aggvalues[aggno], &aggnulls[aggno]);        else            //调用finalize_aggregate获取结果            finalize_aggregate(aggstate, peragg, pergroupstate,                               &aggvalues[aggno], &aggnulls[aggno]);    }}

project_aggregates
project_aggregates函数投影某一组的结果(该组结果已通过finalize_aggregates函数计算得到).

/* * Project the result of a group (whose aggs have already been calculated by * finalize_aggregates). Returns the result slot, or NULL if no row is * projected (suppressed by qual). * 投影某一组的结果(该组结果已通过finalize_aggregates函数计算得到). * 返回结果slot,如无结果行投影(通过qual处理)则返回NULL. */static TupleTableSlot *project_aggregates(AggState *aggstate){    ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;    /*     * Check the qual (HAVING clause); if the group does not match, ignore it.     * 检查条件表达式(HAVING子句).如跟group不匹配,则忽略之.     */    if (ExecQual(aggstate->ss.ps.qual, econtext))    {        /*         * Form and return projection tuple using the aggregate results and         * the representative input tuple.         * 使用聚合结果和相应的输入tuple组成并返回投影元组.         */        return ExecProject(aggstate->ss.ps.ps_ProjInfo);    }    else        InstrCountFiltered1(aggstate, 1);    return NULL;}#define InstrCountFiltered1(node, delta) \    do { \        if (((PlanState *)(node))->instrument) \            ((PlanState *)(node))->instrument->nfiltered1 += (delta); \    } while(0)

ExecProject
ExecProject函数基于投影信息投影元组并把元组存储在传递给ExecBuildProjectInfo()的slot参数中.

/* * ExecProject * * Projects a tuple based on projection info and stores it in the slot passed * to ExecBuildProjectInfo(). * 基于投影信息投影元组并把元组存储在传递给ExecBuildProjectInfo()的slot参数中. * * Note: the result is always a virtual tuple; therefore it may reference * the contents of the exprContext's scan tuples and/or temporary results * constructed in the exprContext.  If the caller wishes the result to be * valid longer than that data will be valid, he must call ExecMaterializeSlot * on the result slot. * 注意:结果通常是虚拟元组.因此该元组可能会依赖exprContext扫描元组的内容和/或在exprContext中构建的临时结果. * 如果调用者希望结果比数据更长久有效,调用者必须调用在结果slot上调用ExecMaterializeSlot(物化). */#ifndef FRONTENDstatic inline TupleTableSlot *ExecProject(ProjectionInfo *projInfo){    ExprContext *econtext = projInfo->pi_exprContext;    ExprState  *state = &projInfo->pi_state;    TupleTableSlot *slot = state->resultslot;    bool        isnull;    /*     * Clear any former contents of the result slot.  This makes it safe for     * us to use the slot's Datum/isnull arrays as workspace.     * 清理结果slot的形式内容.     * 这可以确保slot的Datum/isnull数组是OK的.     */    ExecClearTuple(slot);    /* Run the expression, discarding scalar result from the last column. */    //执行表达式解析,丢弃scalar结果.    (void) ExecEvalExprSwitchContext(state, econtext, &isnull);    /*     * Successfully formed a result row.  Mark the result slot as containing a     * valid virtual tuple (inlined version of ExecStoreVirtualTuple()).     * 成功组成一个结果行.     * 标记结果slot为包含有效虚拟元组(内联版本的ExecStoreVirtualTuple)     */    slot->tts_isempty = false;    slot->tts_nvalid = slot->tts_tupleDescriptor->natts;    return slot;}#endif

三、跟踪分析

N/A

四、参考资料

PostgreSQL 源码解读(178)- 查询#95(聚合函数)#1相关数据结构

0