千家信息网

PostgreSQL 源码解读(182)- 查询#98(聚合函数#3-ExecAgg)

发表于:2025-01-26 作者:千家信息网编辑
千家信息网最后更新 2025年01月26日,本节简单介绍了PostgreSQL执行聚合函数的实现,主要实现函数是ExecAgg.这里先行介绍ExecAgg->agg_fill_hash_table函数,其他子函数后续再行介绍.通过设置log输出
千家信息网最后更新 2025年01月26日PostgreSQL 源码解读(182)- 查询#98(聚合函数#3-ExecAgg)

本节简单介绍了PostgreSQL执行聚合函数的实现,主要实现函数是ExecAgg.这里先行介绍ExecAgg->agg_fill_hash_table函数,其他子函数后续再行介绍.

通过设置log输出,可得到SQL的planTree:

",,,,,"select bh,avg(c1),min(c1),max(c2) from t_agg group by bh;",,,"psql"2019-04-30 14:33:11.998 CST,"xdb","testdb",1387,"[local]",5cc7ec00.56b,3,"SELECT",2019-04-30 14:32:32 CST,3/3,0,LOG,00000,"plan:","   {PLANNEDSTMT    :commandType 1    :queryId 0    :hasReturning false    :hasModifyingCTE false    :canSetTag true    :transientPlan false    :dependsOnRole false    :parallelModeNeeded false    :jitFlags 0    :planTree       {AGG       :startup_cost 13677.00       :total_cost 13677.06       :plan_rows 5       :plan_width 45       :parallel_aware false       :parallel_safe false       :plan_node_id 0       :targetlist (...      )      :qual <>       :lefttree          {SEQSCAN          :startup_cost 0.00          :total_cost 8677.00          :plan_rows 500000          :plan_width 13          :parallel_aware false          :parallel_safe false          :plan_node_id 1          :targetlist (...         )         :qual <>          :lefttree <>          :righttree <>          :initPlan <>          :extParam (b)         :allParam (b)         :scanrelid 1         }      :righttree <>       :initPlan <>       :extParam (b)      :allParam (b)      :aggstrategy 2       :aggsplit 0       :numCols 1       :grpColIdx 1       :grpOperators 98       :numGroups 5       :aggParams (b)      :groupingSets <>       :chain <>      }   :rtable (...   )   :resultRelations <>    :nonleafResultRelations <>    :rootResultRelations <>    :subplans <>    :rewindPlanIDs (b)   :rowMarks <>    :relationOids (o 245801)   :invalItems <>    :paramExecTypes <>    :utilityStmt <>    :stmt_location 0    :stmt_len 56   }",,,,,"select bh,avg(c1),min(c1),max(c2) from t_agg group by bh;",,,"psql"

第一个节点为AGG,相应的实现函数为ExecAgg.

一、数据结构

AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体

/* --------------------- *    AggState information * *    ss.ss_ScanTupleSlot refers to output of underlying plan. *  ss.ss_ScanTupleSlot指的是基础计划的输出. *    (ss = ScanState,ps = PlanState) * *    Note: ss.ps.ps_ExprContext contains ecxt_aggvalues and *    ecxt_aggnulls arrays, which hold the computed agg values for the current *    input group during evaluation of an Agg node's output tuple(s).  We *    create a second ExprContext, tmpcontext, in which to evaluate input *    expressions and run the aggregate transition functions. *    注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls数组, *      这两个数组保存了在计算agg节点的输出元组时当前输入组已计算的agg值. * --------------------- *//* these structs are private in nodeAgg.c: *///在nodeAgg.c中私有的结构体typedef struct AggStatePerAggData *AggStatePerAgg;typedef struct AggStatePerTransData *AggStatePerTrans;typedef struct AggStatePerGroupData *AggStatePerGroup;typedef struct AggStatePerPhaseData *AggStatePerPhase;typedef struct AggStatePerHashData *AggStatePerHash;typedef struct AggState{    //第一个字段是NodeTag(继承自ScanState)    ScanState    ss;                /* its first field is NodeTag */    //targetlist和quals中所有的Aggref    List       *aggs;            /* all Aggref nodes in targetlist & quals */    //链表的大小(可以为0)    int            numaggs;        /* length of list (could be zero!) */    //pertrans条目大小    int            numtrans;        /* number of pertrans items */    //Agg策略模式    AggStrategy aggstrategy;    /* strategy mode */    //agg-splitting模式,参见nodes.h    AggSplit    aggsplit;        /* agg-splitting mode, see nodes.h */    //指向当前步骤数据的指针    AggStatePerPhase phase;        /* pointer to current phase data */    //步骤数(包括0)    int            numphases;        /* number of phases (including phase 0) */    //当前步骤    int            current_phase;    /* current phase number */    //per-Aggref信息    AggStatePerAgg peragg;        /* per-Aggref information */    //per-Trans状态信息    AggStatePerTrans pertrans;    /* per-Trans state information */    //长生命周期数据的ExprContexts(hashtable)    ExprContext *hashcontext;    /* econtexts for long-lived data (hashtable) */    ////长生命周期数据的ExprContexts(每一个GS使用)    ExprContext **aggcontexts;    /* econtexts for long-lived data (per GS) */    //输入表达式的ExprContext    ExprContext *tmpcontext;    /* econtext for input expressions */#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14    //当前活跃的aggcontext    ExprContext *curaggcontext; /* currently active aggcontext */    //当前活跃的aggregate(如存在)    AggStatePerAgg curperagg;    /* currently active aggregate, if any */#define FIELDNO_AGGSTATE_CURPERTRANS 16    //当前活跃的trans state    AggStatePerTrans curpertrans;    /* currently active trans state, if any */    //输入结束?    bool        input_done;        /* indicates end of input */    //Agg扫描结束?    bool        agg_done;        /* indicates completion of Agg scan */    //最后一个grouping set    int            projected_set;    /* The last projected grouping set */#define FIELDNO_AGGSTATE_CURRENT_SET 20    //将要解析的当前grouping set    int            current_set;    /* The current grouping set being evaluated */    //当前投影操作的分组列    Bitmapset  *grouped_cols;    /* grouped cols in current projection */    //倒序的分组列链表    List       *all_grouped_cols;    /* list of all grouped cols in DESC order */    /* These fields are for grouping set phase data */    //-------- 下面的列用于grouping set步骤数据    //所有步骤中最大的sets大小    int            maxsets;        /* The max number of sets in any phase */    //所有步骤的数组    AggStatePerPhase phases;    /* array of all phases */    //对于phases > 1,已排序的输入信息    Tuplesortstate *sort_in;    /* sorted input to phases > 1 */    //对于下一个步骤,输入已拷贝    Tuplesortstate *sort_out;    /* input is copied here for next phase */    //排序结果的slot    TupleTableSlot *sort_slot;    /* slot for sort results */    /* these fields are used in AGG_PLAIN and AGG_SORTED modes: */    //------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:    //per-group指针的grouping set编号数组    AggStatePerGroup *pergroups;    /* grouping set indexed array of per-group                                     * pointers */    //当前组的第一个元组拷贝    HeapTuple    grp_firstTuple; /* copy of first tuple of current group */    /* these fields are used in AGG_HASHED and AGG_MIXED modes: */    //--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:    //是否已填充hash表?    bool        table_filled;    /* hash table filled yet? */    //hash桶数?    int            num_hashes;    //相应的哈希表数据数组    AggStatePerHash perhash;    /* array of per-hashtable data */    //per-group指针的grouping set编号数组    AggStatePerGroup *hash_pergroup;    /* grouping set indexed array of                                         * per-group pointers */    /* support for evaluation of agg input expressions: */    //---------- agg输入表达式解析支持#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34    //首先是->pergroups,然后是hash_pergroup    AggStatePerGroup *all_pergroups;    /* array of first ->pergroups, than                                         * ->hash_pergroup */    //投影实现机制    ProjectionInfo *combinedproj;    /* projection machinery */} AggState;/* Primitive options supported by nodeAgg.c: *///nodeag .c支持的基本选项#define AGGSPLITOP_COMBINE        0x01    /* substitute combinefn for transfn */#define AGGSPLITOP_SKIPFINAL    0x02    /* skip finalfn, return state as-is */#define AGGSPLITOP_SERIALIZE    0x04    /* apply serializefn to output */#define AGGSPLITOP_DESERIALIZE    0x08    /* apply deserializefn to input *//* Supported operating modes (i.e., useful combinations of these options): *///支持的操作模式typedef enum AggSplit{    /* Basic, non-split aggregation: */    //基本 : 非split聚合    AGGSPLIT_SIMPLE = 0,    /* Initial phase of partial aggregation, with serialization: */    //部分聚合的初始步骤,序列化    AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,    /* Final phase of partial aggregation, with deserialization: */    //部分聚合的最终步骤,反序列化    AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE} AggSplit;/* Test whether an AggSplit value selects each primitive option: *///测试AggSplit选择了哪些基本选项#define DO_AGGSPLIT_COMBINE(as)        (((as) & AGGSPLITOP_COMBINE) != 0)#define DO_AGGSPLIT_SKIPFINAL(as)    (((as) & AGGSPLITOP_SKIPFINAL) != 0)#define DO_AGGSPLIT_SERIALIZE(as)    (((as) & AGGSPLITOP_SERIALIZE) != 0)#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)

二、源码解读

ExecAgg接收从outer子计划返回的元组合适的属性上为每一个聚合函数(出现在投影列或节点表达式)执行聚合.需要聚合的元组数量依赖于是否已分组或者选择普通聚合.在已分组的聚合操作宏,为每一个组产生结果行;普通聚合,整个查询只有一个结果行.
不管哪种情况,每一个聚合结果值都会存储在表达式上下文中(ExecProject会解析结果元组)

/* * ExecAgg - * *      ExecAgg receives tuples from its outer subplan and aggregates over *      the appropriate attribute for each aggregate function use (Aggref *      node) appearing in the targetlist or qual of the node.  The number *      of tuples to aggregate over depends on whether grouped or plain *      aggregation is selected.  In grouped aggregation, we produce a result *      row for each group; in plain aggregation there's a single result row *      for the whole query.  In either case, the value of each aggregate is *      stored in the expression context to be used when ExecProject evaluates *      the result tuple. *       ExecAgg接收从outer子计划返回的元组合适的属性上为每一个聚合函数(出现在投影列或节点表达式)执行聚合. *    需要聚合的元组数量依赖于是否已分组或者选择普通聚合. *    在已分组的聚合操作宏,为每一个组产生结果行;普通聚合,整个查询只有一个结果行. *    不管哪种情况,每一个聚合结果值都会存储在表达式上下文中(ExecProject会解析结果元组) */static TupleTableSlot *ExecAgg(PlanState *pstate){    AggState   *node = castNode(AggState, pstate);    TupleTableSlot *result = NULL;    CHECK_FOR_INTERRUPTS();    if (!node->agg_done)    {        /* Dispatch based on strategy */        //基于策略进行分发        switch (node->phase->aggstrategy)        {            case AGG_HASHED:                if (!node->table_filled)                    agg_fill_hash_table(node);                /* FALLTHROUGH */                //填充后,执行MIXED            case AGG_MIXED:                result = agg_retrieve_hash_table(node);                break;            case AGG_PLAIN:            case AGG_SORTED:                result = agg_retrieve_direct(node);                break;        }        if (!TupIsNull(result))            return result;    }    return NULL;}

agg_fill_hash_table
读取输入并构建哈希表,逻辑较为简单,详细参考下面源码

/* * ExecAgg for hashed case: read input and build hash table * 读取输入并构建哈希表 */static voidagg_fill_hash_table(AggState *aggstate){    TupleTableSlot *outerslot;    ExprContext *tmpcontext = aggstate->tmpcontext;    /*     * Process each outer-plan tuple, and then fetch the next one, until we     * exhaust the outer plan.     * 处理每一个outer-plan返回的元组,然后继续提取下一个,直至完成所有元组的处理.     */    for (;;)    {        //--------- 循环直至完成所有元组的处理        //提取输入的元组        outerslot = fetch_input_tuple(aggstate);        if (TupIsNull(outerslot))            break;//已完成处理,退出循环        /* set up for lookup_hash_entries and advance_aggregates */        //配置lookup_hash_entries和advance_aggregates函数        //把元组放在临时内存上下文中        tmpcontext->ecxt_outertuple = outerslot;        /* Find or build hashtable entries */        //检索或构建哈希表条目        lookup_hash_entries(aggstate);        /* Advance the aggregates (or combine functions) */        //增加聚合(或组合函数)        advance_aggregates(aggstate);        /*         * Reset per-input-tuple context after each tuple, but note that the         * hash lookups do this too         * 重置per-input-tuple内存上下文,但需要注意hash检索也会做这个事情         */        ResetExprContext(aggstate->tmpcontext);    }    aggstate->table_filled = true;    /* Initialize to walk the first hash table */    //初始化用于遍历第一个哈希表    select_current_set(aggstate, 0, true);    ResetTupleHashIterator(aggstate->perhash[0].hashtable,                           &aggstate->perhash[0].hashiter);}/* * Advance each aggregate transition state for one input tuple.  The input * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is * accessible to ExecEvalExpr. * * We have two sets of transition states to handle: one for sorted aggregation * and one for hashed; we do them both here, to avoid multiple evaluation of * the inputs. * * When called, CurrentMemoryContext should be the per-query context. */static voidadvance_aggregates(AggState *aggstate){    bool        dummynull;    ExecEvalExprSwitchContext(aggstate->phase->evaltrans,                              aggstate->tmpcontext,                              &dummynull);}/* * ExecEvalExprSwitchContext * * Same as ExecEvalExpr, but get into the right allocation context explicitly. */#ifndef FRONTENDstatic inline DatumExecEvalExprSwitchContext(ExprState *state,                          ExprContext *econtext,                          bool *isNull){    Datum        retDatum;    MemoryContext oldContext;    oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);    retDatum = state->evalfunc(state, econtext, isNull);    MemoryContextSwitchTo(oldContext);    return retDatum;}#endif/* * Look up hash entries for the current tuple in all hashed grouping sets, * returning an array of pergroup pointers suitable for advance_aggregates. * 为当前元组在所有已完成hash的grouping sets中检索hash条目, *   为后续的advance_aggregates函数调用返回pergroup指针数组. * * Be aware that lookup_hash_entry can reset the tmpcontext. * 需要提醒的是lookup_hash_entry可以重置tmpcontext */static voidlookup_hash_entries(AggState *aggstate){    //hash个数    int            numHashes = aggstate->num_hashes;    //获取pergroup    AggStatePerGroup *pergroup = aggstate->hash_pergroup;    int            setno;    for (setno = 0; setno < numHashes; setno++)    {        //设置当前集合        select_current_set(aggstate, setno, true);        //检索哈希条目        pergroup[setno] = lookup_hash_entry(aggstate)->additional;    }}/* * Find or create a hashtable entry for the tuple group containing the current * tuple (already set in tmpcontext's outertuple slot), in the current grouping * set (which the caller must have selected - note that initialize_aggregate * depends on this). * 为包含当前元组的组检索或创建哈希表条目(已在tmpcontext上下文中设置了outertuple slot), *   在当前grouping set中设置(调用者已完成选择 - 注意initialize_aggregate依赖于此) * * When called, CurrentMemoryContext should be the per-query context. * 一旦完成调用,CurrentMemoryContext应该是per-query上下文 */static TupleHashEntryData *lookup_hash_entry(AggState *aggstate){    //输入的元组    TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;    //perhash    AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];    //hashslot    TupleTableSlot *hashslot = perhash->hashslot;    //条目入口    TupleHashEntryData *entry;    //变量    bool        isnew;    int            i;    /* transfer just the needed columns into hashslot */    //转换需要的列到hashslot中    slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);    ExecClearTuple(hashslot);    for (i = 0; i < perhash->numhashGrpCols; i++)    {        //遍历分组列        //列编号        int            varNumber = perhash->hashGrpColIdxInput[i] - 1;        //赋值        hashslot->tts_values[i] = inputslot->tts_values[varNumber];        hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];    }    //存储虚拟元组    ExecStoreVirtualTuple(hashslot);    /* find or create the hashtable entry using the filtered tuple */    //使用已过滤的元组检索或者创建哈希表条目    entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);    if (isnew)    {        //新条目        AggStatePerGroup pergroup;        int            transno;        //分配内存        pergroup = (AggStatePerGroup)            MemoryContextAlloc(perhash->hashtable->tablecxt,                               sizeof(AggStatePerGroupData) * aggstate->numtrans);        entry->additional = pergroup;        /*         * Initialize aggregates for new tuple group, lookup_hash_entries()         * already has selected the relevant grouping set.         * 为新元组group初始化聚合操作, lookup_hash_entries()已选择了相应的grouping set         */        for (transno = 0; transno < aggstate->numtrans; transno++)        {            //遍历转换函数            AggStatePerTrans pertrans = &aggstate->pertrans[transno];            AggStatePerGroup pergroupstate = &pergroup[transno];            //初始化聚合            initialize_aggregate(aggstate, pertrans, pergroupstate);        }    }    return entry;}/* * Find or create a hashtable entry for the tuple group containing the * given tuple.  The tuple must be the same type as the hashtable entries. * 为包含给定元组的元组group检索或创建哈希表条目. * 元组必须与哈希表条目具有相同的类型. * * If isnew is NULL, we do not create new entries; we return NULL if no * match is found. * 如isnew为NULL,不需要创建新的条目;如无匹配则返回NULL. * * If isnew isn't NULL, then a new entry is created if no existing entry * matches.  On return, *isnew is true if the entry is newly created, * false if it existed already.  ->additional_data in the new entry has * been zeroed. * 如isnew不是NULL,如没有与之匹配的现存条目,则创建新的条目. * 在返回的时候,如新创建了条目,则*isnew为T,如已存在条目则为F. * 新条目中的->additional_data已初始化为0. */TupleHashEntryLookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot,                     bool *isnew){    //哈希条目    TupleHashEntryData *entry;    MemoryContext oldContext;    bool        found;    MinimalTuple key;    /* Need to run the hash functions in short-lived context */    //在短生命周期中执行哈希函数    oldContext = MemoryContextSwitchTo(hashtable->tempcxt);    /* set up data needed by hash and match functions */    //设置哈希和匹配函数需要的数据    hashtable->inputslot = slot;    hashtable->in_hash_funcs = hashtable->tab_hash_funcs;    hashtable->cur_eq_func = hashtable->tab_eq_func;    //参考inputslot的flag    key = NULL;                    /* flag to reference inputslot */    if (isnew)    {        //新条目,插入到哈希表中        entry = tuplehash_insert(hashtable->hashtab, key, &found);        if (found)        {            /* found pre-existing entry */            //发现上一个已存在的条目            *isnew = false;        }        else        {            /* created new entry */            //创建新条目            *isnew = true;            /* zero caller data */            //初始化调用者的数据            entry->additional = NULL;            MemoryContextSwitchTo(hashtable->tablecxt);            /* Copy the first tuple into the table context */            //拷贝第一个条目到数据表上下文中            entry->firstTuple = ExecCopySlotMinimalTuple(slot);        }    }    else    {        //isnew为NULL,调用tuplehash_lookup        entry = tuplehash_lookup(hashtable->hashtab, key);    }    MemoryContextSwitchTo(oldContext);    return entry;}/* * (Re)Initialize an individual aggregate. * (重新)初始化单独的聚合函数. * * This function handles only one grouping set, already set in * aggstate->current_set. * 该函数只处理一个grouping set(已在aggstate->current_set设置) * * When called, CurrentMemoryContext should be the per-query context. * 调用完毕,CurrentMemoryContext应为per-query上下文. */static voidinitialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,                     AggStatePerGroup pergroupstate){    /*     * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.     * 为每一个DISTINCT/ORDER BY聚合启动刷新排序操作     */    if (pertrans->numSortCols > 0)    {        /*         * In case of rescan, maybe there could be an uncompleted sort         * operation?  Clean it up if so.         * 如为重新扫描,可能存在未完成的排序操作.如存在,则需清除.         */        if (pertrans->sortstates[aggstate->current_set])            tuplesort_end(pertrans->sortstates[aggstate->current_set]);        /*         * We use a plain Datum sorter when there's a single input column;         * otherwise sort the full tuple.  (See comments for         * process_ordered_aggregate_single.)         * 如存在一个独立的输入列,使用普通的Datum排序器即可.         * 否则的话,排序全部元组(参见process_ordered_aggregate_single中的注释)         */        if (pertrans->numInputs == 1)        {            //属性信息            Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);            //Datum sorter            pertrans->sortstates[aggstate->current_set] =                tuplesort_begin_datum(attr->atttypid,                                      pertrans->sortOperators[0],                                      pertrans->sortCollations[0],                                      pertrans->sortNullsFirst[0],                                      work_mem, NULL, false);        }        else            //full tuple sorter            pertrans->sortstates[aggstate->current_set] =                tuplesort_begin_heap(pertrans->sortdesc,                                     pertrans->numSortCols,                                     pertrans->sortColIdx,                                     pertrans->sortOperators,                                     pertrans->sortCollations,                                     pertrans->sortNullsFirst,                                     work_mem, NULL, false);    }    /*     * (Re)set transValue to the initial value.     * (重新)设置transValue为初始值     *     * Note that when the initial value is pass-by-ref, we must copy it (into     * the aggcontext) since we will pfree the transValue later.     * 注意初始值为pass-by-ref(引用传递),必须拷贝该参数(到aggcontext中),因为在后续会用pfree释放transValue.     */    if (pertrans->initValueIsNull)        pergroupstate->transValue = pertrans->initValue;    else    {        MemoryContext oldContext;        oldContext = MemoryContextSwitchTo(                                           aggstate->curaggcontext->ecxt_per_tuple_memory);        //拷贝        pergroupstate->transValue = datumCopy(pertrans->initValue,                                              pertrans->transtypeByVal,                                              pertrans->transtypeLen);        MemoryContextSwitchTo(oldContext);    }    pergroupstate->transValueIsNull = pertrans->initValueIsNull;    /*     * If the initial value for the transition state doesn't exist in the     * pg_aggregate table then we will let the first non-NULL value returned     * from the outer procNode become the initial value. (This is useful for     * aggregates like max() and min().) The noTransValue flag signals that we     * still need to do this.     * 如转换状态的初始值在pg_aggregate表中不存在,那么让outer procNode中的第一个非NULL值返回作为初始值.     * (这在max()和min()聚合时会非常有用).noTransValue标记提示需要执行该动作.     */    pergroupstate->noTransValue = pertrans->initValueIsNull;}/* * Select the current grouping set; affects current_set and * curaggcontext. * 选择当前的goruping set;影响的参数包括current_set和curaggcontext. */static voidselect_current_set(AggState *aggstate, int setno, bool is_hash){    /* when changing this, also adapt ExecInterpExpr() and friends */    //在修改的时候,会同时调整ExecInterpExpr()和友元    if (is_hash)        aggstate->curaggcontext = aggstate->hashcontext;    else        aggstate->curaggcontext = aggstate->aggcontexts[setno];    aggstate->current_set = setno;}

三、跟踪分析

测试脚本

//禁用并行testdb=# set max_parallel_workers_per_gather=0;SETtestdb=# explain verbose select bh,avg(c1),min(c1),max(c2) from t_agg group by bh;                                QUERY PLAN                                 --------------------------------------------------------------------------- HashAggregate  (cost=13677.00..13677.06 rows=5 width=45)   Output: bh, avg(c1), min(c1), max(c2)   Group Key: t_agg.bh   ->  Seq Scan on public.t_agg  (cost=0.00..8677.00 rows=500000 width=13)         Output: bh, c1, c2, c3, c4, c5, c6(5 rows)

跟踪分析

(gdb) b ExecAggBreakpoint 1 at 0x6ee444: file nodeAgg.c, line 1536.(gdb) cContinuing.Breakpoint 1, ExecAgg (pstate=0x1f895a0) at nodeAgg.c:15361536        AggState   *node = castNode(AggState, pstate);(gdb)

输入参数,AggState,在ExecInitAgg函数中初始化

(gdb) p *pstate$1 = {type = T_AggState, plan = 0x1f7b1e0, state = 0x1f89388, ExecProcNode = 0x6ee438 ,   ExecProcNodeReal = 0x6ee438 , instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,   qual = 0x0, lefttree = 0x1f89b10, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,   ps_ResultTupleSlot = 0x1f8a710, ps_ExprContext = 0x1f89a50, ps_ProjInfo = 0x1f8a850, scandesc = 0x1f89e60}

使用Hash实现

(gdb) n1537        TupleTableSlot *result = NULL;(gdb) 1539        CHECK_FOR_INTERRUPTS();(gdb) 1541        if (!node->agg_done)(gdb) 1544            switch (node->phase->aggstrategy)(gdb) p node->phase->aggstrategy$2 = AGG_HASHED(gdb) n1547                    if (!node->table_filled)(gdb) 1548                        agg_fill_hash_table(node);(gdb)

进入agg_fill_hash_table

(gdb) stepagg_fill_hash_table (aggstate=0x1f895a0) at nodeAgg.c:19151915        ExprContext *tmpcontext = aggstate->tmpcontext;

agg_fill_hash_table->提取输入的元组

(gdb) n1923            outerslot = fetch_input_tuple(aggstate);(gdb) stepfetch_input_tuple (aggstate=0x1f895a0) at nodeAgg.c:396396        if (aggstate->sort_in)(gdb) p aggstate->sort_in$3 = (Tuplesortstate *) 0x0(gdb) n406            slot = ExecProcNode(outerPlanState(aggstate));(gdb) 408        if (!TupIsNull(slot) && aggstate->sort_out)(gdb) p *slot$4 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false,   tts_tuple = 0x1fa5998, tts_tupleDescriptor = 0x7ff7dd2d1380, tts_mcxt = 0x1f89270, tts_buffer = 124, tts_nvalid = 0,   tts_values = 0x1f89d48, tts_isnull = 0x1f89d80, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {        bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}(gdb) n411        return slot;(gdb) 412    }(gdb) agg_fill_hash_table (aggstate=0x1f895a0) at nodeAgg.c:19241924            if (TupIsNull(outerslot))(gdb)

lookup_hash_entries->进入lookup_hash_entries,为当前元组在所有已完成hash的grouping sets中检索hash条目,为后续的advance_aggregates函数调用返回pergroup指针数组.

(gdb) n1928            tmpcontext->ecxt_outertuple = outerslot;(gdb) 1931            lookup_hash_entries(aggstate);(gdb) (gdb) steplookup_hash_entries (aggstate=0x1f895a0) at nodeAgg.c:15091509        int            numHashes = aggstate->num_hashes;(gdb) n1510        AggStatePerGroup *pergroup = aggstate->hash_pergroup;(gdb) p numHashes$5 = 1(gdb) n1513        for (setno = 0; setno < numHashes; setno++)(gdb) p *pergroup$6 = (AggStatePerGroup) 0x0(gdb) n1515            select_current_set(aggstate, setno, true);(gdb) stepselect_current_set (aggstate=0x1f895a0, setno=0, is_hash=true) at nodeAgg.c:306306        if (is_hash)(gdb) n307            aggstate->curaggcontext = aggstate->hashcontext;(gdb) 311        aggstate->current_set = setno;(gdb) 312    }(gdb) lookup_hash_entries (aggstate=0x1f895a0) at nodeAgg.c:15161516            pergroup[setno] = lookup_hash_entry(aggstate)->additional;(gdb)

lookup_hash_entry->调用lookup_hash_entry,该函数为包含当前元组的组检索或创建哈希表条目.

(gdb) steplookup_hash_entry (aggstate=0x1f895a0) at nodeAgg.c:14511451        TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;(gdb) n1452        AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];(gdb) p aggstate->current_set$7 = 0(gdb) n1453        TupleTableSlot *hashslot = perhash->hashslot;(gdb) p *perhash$8 = {hashtable = 0x1f9fc98, hashiter = {cur = 0, end = 0, done = false}, hashslot = 0x1f8b198, hashfunctions = 0x1f8b230,   eqfuncoids = 0x1f9fc50, numCols = 1, numhashGrpCols = 1, largestGrpColIdx = 1, hashGrpColIdxInput = 0x1f9fbb0,   hashGrpColIdxHash = 0x1f9fbd0, aggnode = 0x1f7b1e0}(gdb) n1459        slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);(gdb) 1460        ExecClearTuple(hashslot);(gdb) p *perhash$9 = {hashtable = 0x1f9fc98, hashiter = {cur = 0, end = 0, done = false}, hashslot = 0x1f8b198, hashfunctions = 0x1f8b230,   eqfuncoids = 0x1f9fc50, numCols = 1, numhashGrpCols = 1, largestGrpColIdx = 1, hashGrpColIdxInput = 0x1f9fbb0,   hashGrpColIdxHash = 0x1f9fbd0, aggnode = 0x1f7b1e0}(gdb) p *perhash->hashslot$10 = {type = T_TupleTableSlot, tts_isempty = true, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false,   tts_tuple = 0x0, tts_tupleDescriptor = 0x1f8b080, tts_mcxt = 0x1f89270, tts_buffer = 0, tts_nvalid = 0,   tts_values = 0x1f8b1f8, tts_isnull = 0x1f8b200, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {        bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}(gdb) p *perhash->hashfunctions$11 = {fn_addr = 0x4c8a31 , fn_oid = 400, fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002',   fn_extra = 0x0, fn_mcxt = 0x1f89270, fn_expr = 0x0}(gdb) p *perhash->eqfuncoids$12 = 67(gdb) p *perhash->hashGrpColIdxInput$13 = 1(gdb) p *perhash->hashGrpColIdxHash$14 = 1(gdb) p *perhash->aggnode$15 = {plan = {type = T_Agg, startup_cost = 13677, total_cost = 13677.0625, plan_rows = 5, plan_width = 45,     parallel_aware = false, parallel_safe = false, plan_node_id = 0, targetlist = 0x1f84108, qual = 0x0,     lefttree = 0x1f83bc8, righttree = 0x0, initPlan = 0x0, extParam = 0x0, allParam = 0x0}, aggstrategy = AGG_HASHED,   aggsplit = AGGSPLIT_SIMPLE, numCols = 1, grpColIdx = 0x1f83eb8, grpOperators = 0x1f83e98, numGroups = 5, aggParams = 0x0,   groupingSets = 0x0, chain = 0x0}(gdb)

lookup_hash_entry->遍历分组键(这里是bh列)

(gdb) n1462        for (i = 0; i < perhash->numhashGrpCols; i++)(gdb) 1464            int            varNumber = perhash->hashGrpColIdxInput[i] - 1;(gdb) 1466            hashslot->tts_values[i] = inputslot->tts_values[varNumber];(gdb) p varNumber$16 = 0(gdb) n1467            hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];(gdb) 1462        for (i = 0; i < perhash->numhashGrpCols; i++)(gdb) p *hashslot$17 = {type = T_TupleTableSlot, tts_isempty = true, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false,   tts_tuple = 0x0, tts_tupleDescriptor = 0x1f8b080, tts_mcxt = 0x1f89270, tts_buffer = 0, tts_nvalid = 0,   tts_values = 0x1f8b1f8, tts_isnull = 0x1f8b200, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {        bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}(gdb) p *hashslot->tts_values[0]$18 = 811222795(gdb)

lookup_hash_entry->调用LookupTupleHashEntry,该函数使用已过滤的元组检索或者创建哈希表条目

(gdb) stepLookupTupleHashEntry (hashtable=0x1f9fc98, slot=0x1f8b198, isnew=0x7fff7f065e17) at execGrouping.c:290290        oldContext = MemoryContextSwitchTo(hashtable->tempcxt);(gdb) p *hashtable$19 = {hashtab = 0x1f9fd30, numCols = 1, keyColIdx = 0x1f9fbd0, tab_hash_funcs = 0x1f8b230, tab_eq_func = 0x1fa0050,   tablecxt = 0x1f91370, tempcxt = 0x1f9d8e0, entrysize = 24, tableslot = 0x1f9ffb8, inputslot = 0x0, in_hash_funcs = 0x0,   cur_eq_func = 0x0, hash_iv = 0, exprcontext = 0x1fa0970}(gdb) p *hashtable->hashtab$20 = {size = 8, members = 0, sizemask = 7, grow_threshold = 7, data = 0x1f9fd88, ctx = 0x1f89270, private_data = 0x1f9fc98}(gdb) p *hashtable->keyColIdx$21 = 1(gdb) p *hashtable->tab_hash_funcs$22 = {fn_addr = 0x4c8a31 , fn_oid = 400, fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002',   fn_extra = 0x0, fn_mcxt = 0x1f89270, fn_expr = 0x0}(gdb) n293        hashtable->inputslot = slot;(gdb) 294        hashtable->in_hash_funcs = hashtable->tab_hash_funcs;(gdb) 295        hashtable->cur_eq_func = hashtable->tab_eq_func;(gdb) 297        key = NULL;                    /* flag to reference inputslot */(gdb) 299        if (isnew)(gdb) 301            entry = tuplehash_insert(hashtable->hashtab, key, &found);(gdb) steptuplehash_insert (tb=0x1f9fd30, key=0x0, found=0x7fff7f065dd7) at ../../../src/include/lib/simplehash.h:490490        uint32        hash = SH_HASH_KEY(tb, key);(gdb) finishRun till exit from #0  tuplehash_insert (tb=0x1f9fd30, key=0x0, found=0x7fff7f065dd7)    at ../../../src/include/lib/simplehash.h:4900x00000000006d3a1e in LookupTupleHashEntry (hashtable=0x1f9fc98, slot=0x1f8b198, isnew=0x7fff7f065e17) at execGrouping.c:301301            entry = tuplehash_insert(hashtable->hashtab, key, &found);Value returned is $23 = (TupleHashEntryData *) 0x1f9fdb8(gdb) n303            if (found)(gdb) p found$24 = false(gdb)

LookupTupleHashEntry->插入新条目,返回entry

(gdb) n311                *isnew = true;(gdb) 313                entry->additional = NULL;(gdb) 314                MemoryContextSwitchTo(hashtable->tablecxt);(gdb) 316                entry->firstTuple = ExecCopySlotMinimalTuple(slot);(gdb) 324        MemoryContextSwitchTo(oldContext);(gdb) 326        return entry;(gdb) 327    }

lookup_hash_entry->回到lookup_hash_entry

(gdb) lookup_hash_entry (aggstate=0x1f895a0) at nodeAgg.c:14741474        if (isnew)(gdb)

lookup_hash_entry->分配内存,设置条目的额外信息

(gdb) n1481                                   sizeof(AggStatePerGroupData) * aggstate->numtrans);(gdb) p *entry$25 = {firstTuple = 0x1f91488, additional = 0x0, status = 1, hash = 443809650}(gdb) n1480                MemoryContextAlloc(perhash->hashtable->tablecxt,(gdb) 1479            pergroup = (AggStatePerGroup)(gdb) 1482            entry->additional = pergroup;(gdb)

lookup_hash_entry->为新元组group初始化聚合操作, lookup_hash_entries()已选择了相应的grouping set(这里有3个聚合列)

1488            for (transno = 0; transno < aggstate->numtrans; transno++)(gdb) p aggstate->numtrans$26 = 3(gdb) (gdb) n1490                AggStatePerTrans pertrans = &aggstate->pertrans[transno];(gdb) 1491                AggStatePerGroup pergroupstate = &pergroup[transno];(gdb) p *pertrans$27 = {aggref = 0x1f84650, aggshared = false, numInputs = 1, numTransInputs = 1, transfn_oid = 768, serialfn_oid = 0,   deserialfn_oid = 0, aggtranstype = 23, transfn = {fn_addr = 0x93e877 , fn_oid = 768, fn_nargs = 2,     fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x1f89270, fn_expr = 0x1fa0b00},   serialfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false, fn_stats = 0 '\000',     fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, deserialfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,     fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},   aggCollation = 0, numSortCols = 0, numDistinctCols = 0, sortColIdx = 0x0, sortOperators = 0x0, sortCollations = 0x0,   sortNullsFirst = 0x0, equalfnOne = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false,     fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, equalfnMulti = 0x0, initValue = 0,   initValueIsNull = true, inputtypeLen = 0, transtypeLen = 4, inputtypeByVal = false, transtypeByVal = true,   sortslot = 0x0, uniqslot = 0x0, sortdesc = 0x0, sortstates = 0x1f9fb70, transfn_fcinfo = {flinfo = 0x1f99418,     context = 0x1f895a0, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 2, arg = {0 },     argnull = {false }}, serialfn_fcinfo = {flinfo = 0x0, context = 0x0, resultinfo = 0x0,     fncollation = 0, isnull = false, nargs = 0, arg = {0 }, argnull = {false }},   deserialfn_fcinfo = {flinfo = 0x0, context = 0x0, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 0, arg = {      0 }, argnull = {false }}}(gdb) n1493                initialize_aggregate(aggstate, pertrans, pergroupstate);(gdb) p *pergroupstate$28 = {transValue = 9187201950435737471, transValueIsNull = 127, noTransValue = 127}(gdb) n1488            for (transno = 0; transno < aggstate->numtrans; transno++)(gdb) p *aggstate$29 = {ss = {ps = {type = T_AggState, plan = 0x1f7b1e0, state = 0x1f89388, ExecProcNode = 0x6ee438 ,       ExecProcNodeReal = 0x6ee438 , instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,       qual = 0x0, lefttree = 0x1f89b10, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,       ps_ResultTupleSlot = 0x1f8a710, ps_ExprContext = 0x1f89a50, ps_ProjInfo = 0x1f8a850, scandesc = 0x1f89e60},     ss_currentRelation = 0x0, ss_currentScanDesc = 0x0, ss_ScanTupleSlot = 0x1f8a3b8}, aggs = 0x1f8ad60, numaggs = 3,   numtrans = 3, aggstrategy = AGG_HASHED, aggsplit = AGGSPLIT_SIMPLE, phase = 0x1f8ae58, numphases = 1, current_phase = 0,   peragg = 0x1f9f930, pertrans = 0x1f993f0, hashcontext = 0x1f89990, aggcontexts = 0x1f897b8, tmpcontext = 0x1f897d8,   curaggcontext = 0x1f89990, curperagg = 0x0, curpertrans = 0x0, input_done = false, agg_done = false, projected_set = -1,   current_set = 0, grouped_cols = 0x0, all_grouped_cols = 0x1f8aff0, maxsets = 1, phases = 0x1f8ae58, sort_in = 0x0,   sort_out = 0x0, sort_slot = 0x0, pergroups = 0x0, grp_firstTuple = 0x0, table_filled = false, num_hashes = 1,   perhash = 0x1f8aeb0, hash_pergroup = 0x1f9fb48, all_pergroups = 0x1f9fb48, combinedproj = 0x0}(gdb) n1490                AggStatePerTrans pertrans = &aggstate->pertrans[transno];(gdb) 1491                AggStatePerGroup pergroupstate = &pergroup[transno];(gdb) 1493                initialize_aggregate(aggstate, pertrans, pergroupstate);(gdb) 1488            for (transno = 0; transno < aggstate->numtrans; transno++)(gdb) 1490                AggStatePerTrans pertrans = &aggstate->pertrans[transno];(gdb) 1491                AggStatePerGroup pergroupstate = &pergroup[transno];(gdb) 1493                initialize_aggregate(aggstate, pertrans, pergroupstate);(gdb) 1488            for (transno = 0; transno < aggstate->numtrans; transno++)(gdb) 1497        return entry;(gdb) 1498    }(gdb)

lookup_hash_entries->回到lookup_hash_entries

(gdb) nlookup_hash_entries (aggstate=0x1f895a0) at nodeAgg.c:15131513        for (setno = 0; setno < numHashes; setno++)

agg_fill_hash_table->回到agg_fill_hash_table

(gdb) n1518    }(gdb) agg_fill_hash_table (aggstate=0x1f895a0) at nodeAgg.c:19341934            advance_aggregates(aggstate);(gdb)

advance_aggregates->进入advance_aggregates

(gdb) stepadvance_aggregates (aggstate=0x1f895a0) at nodeAgg.c:680680        ExecEvalExprSwitchContext(aggstate->phase->evaltrans,(gdb) p *aggstate->phase->evaltrans$30 = {tag = {type = T_ExprState}, flags = 6 '\006', resnull = false, resvalue = 0, resultslot = 0x0, steps = 0x1fa10d0,   evalfunc = 0x6cd882 , expr = 0x1f895a0, evalfunc_private = 0x6cb43e ,   steps_len = 16, steps_alloc = 16, parent = 0x1f895a0, ext_params = 0x0, innermost_caseval = 0x0,   innermost_casenull = 0x0, innermost_domainval = 0x0, innermost_domainnull = 0x0}(gdb) stepExecEvalExprSwitchContext (state=0x1fa1038, econtext=0x1f897d8, isNull=0x7fff7f065e9f)    at ../../../src/include/executor/executor.h:312312        oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);(gdb) finishRun till exit from #0  ExecEvalExprSwitchContext (state=0x1fa1038, econtext=0x1f897d8, isNull=0x7fff7f065e9f)    at ../../../src/include/executor/executor.h:312advance_aggregates (aggstate=0x1f895a0) at nodeAgg.c:683683    }Value returned is $31 = 0

进入第2轮循环

(gdb) stepagg_fill_hash_table (aggstate=0x1f895a0) at nodeAgg.c:19401940            ResetExprContext(aggstate->tmpcontext);(gdb) n1941        }

查看相关信息

(gdb) n1941        }(gdb) 1923            outerslot = fetch_input_tuple(aggstate);(gdb) 1924            if (TupIsNull(outerslot))(gdb) n1928            tmpcontext->ecxt_outertuple = outerslot;(gdb) 1931            lookup_hash_entries(aggstate);(gdb) 1934            advance_aggregates(aggstate);(gdb) 1940            ResetExprContext(aggstate->tmpcontext);(gdb) p *outerslot$32 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = true,   tts_tuple = 0x1fa5998, tts_tupleDescriptor = 0x7ff7dd2d1380, tts_mcxt = 0x1f89270, tts_buffer = 124, tts_nvalid = 3,   tts_values = 0x1f89d48, tts_isnull = 0x1f89d80, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {        bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 16, tts_fixedTupleDescriptor = true}(gdb) x/32x outerslot->tts_values0x1f89d48:    0x28    0xf6    0x0b    0xb1    0xf7    0x7f    0x00    0x000x1f89d50:    0x02    0x00    0x00    0x00    0x00    0x00    0x00    0x000x1f89d58:    0x02    0x00    0x00    0x00    0x00    0x00    0x00    0x000x1f89d60:    0x00    0x00    0x00    0x00    0x00    0x00    0x00    0x00

tuple数据

(gdb) x/56x outerslot->tts_tuple->t_data->t_bits0x7ff7b2e1365f:    0x00    0x0b    0x47    0x5a    0x30    0x31    0x00    0x000x7ff7b2e13667:    0x00    0x01    0x00    0x00    0x00    0x01    0x00    0x000x7ff7b2e1366f:    0x00    0x01    0x00    0x00    0x00    0x01    0x00    0x000x7ff7b2e13677:    0x00    0x01    0x00    0x00    0x00    0x01    0x00    0x000x7ff7b2e1367f:    0x00    0x00    0x00    0x00    0x00    0x00    0x00    0x000x7ff7b2e13687:    0x00    0x00    0x00    0x00    0x00    0x00    0x00    0x000x7ff7b2e1368f:    0x00    0x00    0x00    0x00    0x00    0x00    0x00    0x00

DONE!

四、参考资料

N/A

0