PostgreSQL 源码解读(182)- 查询#98(聚合函数#3-ExecAgg)
本节简单介绍了PostgreSQL执行聚合函数的实现,主要实现函数是ExecAgg.这里先行介绍ExecAgg->agg_fill_hash_table函数,其他子函数后续再行介绍.
通过设置log输出,可得到SQL的planTree:
",,,,,"select bh,avg(c1),min(c1),max(c2) from t_agg group by bh;",,,"psql"2019-04-30 14:33:11.998 CST,"xdb","testdb",1387,"[local]",5cc7ec00.56b,3,"SELECT",2019-04-30 14:32:32 CST,3/3,0,LOG,00000,"plan:"," {PLANNEDSTMT :commandType 1 :queryId 0 :hasReturning false :hasModifyingCTE false :canSetTag true :transientPlan false :dependsOnRole false :parallelModeNeeded false :jitFlags 0 :planTree {AGG :startup_cost 13677.00 :total_cost 13677.06 :plan_rows 5 :plan_width 45 :parallel_aware false :parallel_safe false :plan_node_id 0 :targetlist (... ) :qual <> :lefttree {SEQSCAN :startup_cost 0.00 :total_cost 8677.00 :plan_rows 500000 :plan_width 13 :parallel_aware false :parallel_safe false :plan_node_id 1 :targetlist (... ) :qual <> :lefttree <> :righttree <> :initPlan <> :extParam (b) :allParam (b) :scanrelid 1 } :righttree <> :initPlan <> :extParam (b) :allParam (b) :aggstrategy 2 :aggsplit 0 :numCols 1 :grpColIdx 1 :grpOperators 98 :numGroups 5 :aggParams (b) :groupingSets <> :chain <> } :rtable (... ) :resultRelations <> :nonleafResultRelations <> :rootResultRelations <> :subplans <> :rewindPlanIDs (b) :rowMarks <> :relationOids (o 245801) :invalItems <> :paramExecTypes <> :utilityStmt <> :stmt_location 0 :stmt_len 56 }",,,,,"select bh,avg(c1),min(c1),max(c2) from t_agg group by bh;",,,"psql"
第一个节点为AGG,相应的实现函数为ExecAgg.
一、数据结构
AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体
/* --------------------- * AggState information * * ss.ss_ScanTupleSlot refers to output of underlying plan. * ss.ss_ScanTupleSlot指的是基础计划的输出. * (ss = ScanState,ps = PlanState) * * Note: ss.ps.ps_ExprContext contains ecxt_aggvalues and * ecxt_aggnulls arrays, which hold the computed agg values for the current * input group during evaluation of an Agg node's output tuple(s). We * create a second ExprContext, tmpcontext, in which to evaluate input * expressions and run the aggregate transition functions. * 注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls数组, * 这两个数组保存了在计算agg节点的输出元组时当前输入组已计算的agg值. * --------------------- *//* these structs are private in nodeAgg.c: *///在nodeAgg.c中私有的结构体typedef struct AggStatePerAggData *AggStatePerAgg;typedef struct AggStatePerTransData *AggStatePerTrans;typedef struct AggStatePerGroupData *AggStatePerGroup;typedef struct AggStatePerPhaseData *AggStatePerPhase;typedef struct AggStatePerHashData *AggStatePerHash;typedef struct AggState{ //第一个字段是NodeTag(继承自ScanState) ScanState ss; /* its first field is NodeTag */ //targetlist和quals中所有的Aggref List *aggs; /* all Aggref nodes in targetlist & quals */ //链表的大小(可以为0) int numaggs; /* length of list (could be zero!) */ //pertrans条目大小 int numtrans; /* number of pertrans items */ //Agg策略模式 AggStrategy aggstrategy; /* strategy mode */ //agg-splitting模式,参见nodes.h AggSplit aggsplit; /* agg-splitting mode, see nodes.h */ //指向当前步骤数据的指针 AggStatePerPhase phase; /* pointer to current phase data */ //步骤数(包括0) int numphases; /* number of phases (including phase 0) */ //当前步骤 int current_phase; /* current phase number */ //per-Aggref信息 AggStatePerAgg peragg; /* per-Aggref information */ //per-Trans状态信息 AggStatePerTrans pertrans; /* per-Trans state information */ //长生命周期数据的ExprContexts(hashtable) ExprContext *hashcontext; /* econtexts for long-lived data (hashtable) */ ////长生命周期数据的ExprContexts(每一个GS使用) ExprContext **aggcontexts; /* econtexts for long-lived data (per GS) */ //输入表达式的ExprContext ExprContext *tmpcontext; /* econtext for input expressions */#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14 //当前活跃的aggcontext ExprContext *curaggcontext; /* currently active aggcontext */ //当前活跃的aggregate(如存在) AggStatePerAgg curperagg; /* currently active aggregate, if any */#define FIELDNO_AGGSTATE_CURPERTRANS 16 //当前活跃的trans state AggStatePerTrans curpertrans; /* currently active trans state, if any */ //输入结束? bool input_done; /* indicates end of input */ //Agg扫描结束? bool agg_done; /* indicates completion of Agg scan */ //最后一个grouping set int projected_set; /* The last projected grouping set */#define FIELDNO_AGGSTATE_CURRENT_SET 20 //将要解析的当前grouping set int current_set; /* The current grouping set being evaluated */ //当前投影操作的分组列 Bitmapset *grouped_cols; /* grouped cols in current projection */ //倒序的分组列链表 List *all_grouped_cols; /* list of all grouped cols in DESC order */ /* These fields are for grouping set phase data */ //-------- 下面的列用于grouping set步骤数据 //所有步骤中最大的sets大小 int maxsets; /* The max number of sets in any phase */ //所有步骤的数组 AggStatePerPhase phases; /* array of all phases */ //对于phases > 1,已排序的输入信息 Tuplesortstate *sort_in; /* sorted input to phases > 1 */ //对于下一个步骤,输入已拷贝 Tuplesortstate *sort_out; /* input is copied here for next phase */ //排序结果的slot TupleTableSlot *sort_slot; /* slot for sort results */ /* these fields are used in AGG_PLAIN and AGG_SORTED modes: */ //------- 下面的列用于AGG_PLAIN和AGG_SORTED模式: //per-group指针的grouping set编号数组 AggStatePerGroup *pergroups; /* grouping set indexed array of per-group * pointers */ //当前组的第一个元组拷贝 HeapTuple grp_firstTuple; /* copy of first tuple of current group */ /* these fields are used in AGG_HASHED and AGG_MIXED modes: */ //--------- 下面的列用于AGG_HASHED和AGG_MIXED模式: //是否已填充hash表? bool table_filled; /* hash table filled yet? */ //hash桶数? int num_hashes; //相应的哈希表数据数组 AggStatePerHash perhash; /* array of per-hashtable data */ //per-group指针的grouping set编号数组 AggStatePerGroup *hash_pergroup; /* grouping set indexed array of * per-group pointers */ /* support for evaluation of agg input expressions: */ //---------- agg输入表达式解析支持#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34 //首先是->pergroups,然后是hash_pergroup AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than * ->hash_pergroup */ //投影实现机制 ProjectionInfo *combinedproj; /* projection machinery */} AggState;/* Primitive options supported by nodeAgg.c: *///nodeag .c支持的基本选项#define AGGSPLITOP_COMBINE 0x01 /* substitute combinefn for transfn */#define AGGSPLITOP_SKIPFINAL 0x02 /* skip finalfn, return state as-is */#define AGGSPLITOP_SERIALIZE 0x04 /* apply serializefn to output */#define AGGSPLITOP_DESERIALIZE 0x08 /* apply deserializefn to input *//* Supported operating modes (i.e., useful combinations of these options): *///支持的操作模式typedef enum AggSplit{ /* Basic, non-split aggregation: */ //基本 : 非split聚合 AGGSPLIT_SIMPLE = 0, /* Initial phase of partial aggregation, with serialization: */ //部分聚合的初始步骤,序列化 AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE, /* Final phase of partial aggregation, with deserialization: */ //部分聚合的最终步骤,反序列化 AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE} AggSplit;/* Test whether an AggSplit value selects each primitive option: *///测试AggSplit选择了哪些基本选项#define DO_AGGSPLIT_COMBINE(as) (((as) & AGGSPLITOP_COMBINE) != 0)#define DO_AGGSPLIT_SKIPFINAL(as) (((as) & AGGSPLITOP_SKIPFINAL) != 0)#define DO_AGGSPLIT_SERIALIZE(as) (((as) & AGGSPLITOP_SERIALIZE) != 0)#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)
二、源码解读
ExecAgg接收从outer子计划返回的元组合适的属性上为每一个聚合函数(出现在投影列或节点表达式)执行聚合.需要聚合的元组数量依赖于是否已分组或者选择普通聚合.在已分组的聚合操作宏,为每一个组产生结果行;普通聚合,整个查询只有一个结果行.
不管哪种情况,每一个聚合结果值都会存储在表达式上下文中(ExecProject会解析结果元组)
/* * ExecAgg - * * ExecAgg receives tuples from its outer subplan and aggregates over * the appropriate attribute for each aggregate function use (Aggref * node) appearing in the targetlist or qual of the node. The number * of tuples to aggregate over depends on whether grouped or plain * aggregation is selected. In grouped aggregation, we produce a result * row for each group; in plain aggregation there's a single result row * for the whole query. In either case, the value of each aggregate is * stored in the expression context to be used when ExecProject evaluates * the result tuple. * ExecAgg接收从outer子计划返回的元组合适的属性上为每一个聚合函数(出现在投影列或节点表达式)执行聚合. * 需要聚合的元组数量依赖于是否已分组或者选择普通聚合. * 在已分组的聚合操作宏,为每一个组产生结果行;普通聚合,整个查询只有一个结果行. * 不管哪种情况,每一个聚合结果值都会存储在表达式上下文中(ExecProject会解析结果元组) */static TupleTableSlot *ExecAgg(PlanState *pstate){ AggState *node = castNode(AggState, pstate); TupleTableSlot *result = NULL; CHECK_FOR_INTERRUPTS(); if (!node->agg_done) { /* Dispatch based on strategy */ //基于策略进行分发 switch (node->phase->aggstrategy) { case AGG_HASHED: if (!node->table_filled) agg_fill_hash_table(node); /* FALLTHROUGH */ //填充后,执行MIXED case AGG_MIXED: result = agg_retrieve_hash_table(node); break; case AGG_PLAIN: case AGG_SORTED: result = agg_retrieve_direct(node); break; } if (!TupIsNull(result)) return result; } return NULL;}
agg_fill_hash_table
读取输入并构建哈希表,逻辑较为简单,详细参考下面源码
/* * ExecAgg for hashed case: read input and build hash table * 读取输入并构建哈希表 */static voidagg_fill_hash_table(AggState *aggstate){ TupleTableSlot *outerslot; ExprContext *tmpcontext = aggstate->tmpcontext; /* * Process each outer-plan tuple, and then fetch the next one, until we * exhaust the outer plan. * 处理每一个outer-plan返回的元组,然后继续提取下一个,直至完成所有元组的处理. */ for (;;) { //--------- 循环直至完成所有元组的处理 //提取输入的元组 outerslot = fetch_input_tuple(aggstate); if (TupIsNull(outerslot)) break;//已完成处理,退出循环 /* set up for lookup_hash_entries and advance_aggregates */ //配置lookup_hash_entries和advance_aggregates函数 //把元组放在临时内存上下文中 tmpcontext->ecxt_outertuple = outerslot; /* Find or build hashtable entries */ //检索或构建哈希表条目 lookup_hash_entries(aggstate); /* Advance the aggregates (or combine functions) */ //增加聚合(或组合函数) advance_aggregates(aggstate); /* * Reset per-input-tuple context after each tuple, but note that the * hash lookups do this too * 重置per-input-tuple内存上下文,但需要注意hash检索也会做这个事情 */ ResetExprContext(aggstate->tmpcontext); } aggstate->table_filled = true; /* Initialize to walk the first hash table */ //初始化用于遍历第一个哈希表 select_current_set(aggstate, 0, true); ResetTupleHashIterator(aggstate->perhash[0].hashtable, &aggstate->perhash[0].hashiter);}/* * Advance each aggregate transition state for one input tuple. The input * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is * accessible to ExecEvalExpr. * * We have two sets of transition states to handle: one for sorted aggregation * and one for hashed; we do them both here, to avoid multiple evaluation of * the inputs. * * When called, CurrentMemoryContext should be the per-query context. */static voidadvance_aggregates(AggState *aggstate){ bool dummynull; ExecEvalExprSwitchContext(aggstate->phase->evaltrans, aggstate->tmpcontext, &dummynull);}/* * ExecEvalExprSwitchContext * * Same as ExecEvalExpr, but get into the right allocation context explicitly. */#ifndef FRONTENDstatic inline DatumExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull){ Datum retDatum; MemoryContext oldContext; oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); retDatum = state->evalfunc(state, econtext, isNull); MemoryContextSwitchTo(oldContext); return retDatum;}#endif/* * Look up hash entries for the current tuple in all hashed grouping sets, * returning an array of pergroup pointers suitable for advance_aggregates. * 为当前元组在所有已完成hash的grouping sets中检索hash条目, * 为后续的advance_aggregates函数调用返回pergroup指针数组. * * Be aware that lookup_hash_entry can reset the tmpcontext. * 需要提醒的是lookup_hash_entry可以重置tmpcontext */static voidlookup_hash_entries(AggState *aggstate){ //hash个数 int numHashes = aggstate->num_hashes; //获取pergroup AggStatePerGroup *pergroup = aggstate->hash_pergroup; int setno; for (setno = 0; setno < numHashes; setno++) { //设置当前集合 select_current_set(aggstate, setno, true); //检索哈希条目 pergroup[setno] = lookup_hash_entry(aggstate)->additional; }}/* * Find or create a hashtable entry for the tuple group containing the current * tuple (already set in tmpcontext's outertuple slot), in the current grouping * set (which the caller must have selected - note that initialize_aggregate * depends on this). * 为包含当前元组的组检索或创建哈希表条目(已在tmpcontext上下文中设置了outertuple slot), * 在当前grouping set中设置(调用者已完成选择 - 注意initialize_aggregate依赖于此) * * When called, CurrentMemoryContext should be the per-query context. * 一旦完成调用,CurrentMemoryContext应该是per-query上下文 */static TupleHashEntryData *lookup_hash_entry(AggState *aggstate){ //输入的元组 TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple; //perhash AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set]; //hashslot TupleTableSlot *hashslot = perhash->hashslot; //条目入口 TupleHashEntryData *entry; //变量 bool isnew; int i; /* transfer just the needed columns into hashslot */ //转换需要的列到hashslot中 slot_getsomeattrs(inputslot, perhash->largestGrpColIdx); ExecClearTuple(hashslot); for (i = 0; i < perhash->numhashGrpCols; i++) { //遍历分组列 //列编号 int varNumber = perhash->hashGrpColIdxInput[i] - 1; //赋值 hashslot->tts_values[i] = inputslot->tts_values[varNumber]; hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber]; } //存储虚拟元组 ExecStoreVirtualTuple(hashslot); /* find or create the hashtable entry using the filtered tuple */ //使用已过滤的元组检索或者创建哈希表条目 entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew); if (isnew) { //新条目 AggStatePerGroup pergroup; int transno; //分配内存 pergroup = (AggStatePerGroup) MemoryContextAlloc(perhash->hashtable->tablecxt, sizeof(AggStatePerGroupData) * aggstate->numtrans); entry->additional = pergroup; /* * Initialize aggregates for new tuple group, lookup_hash_entries() * already has selected the relevant grouping set. * 为新元组group初始化聚合操作, lookup_hash_entries()已选择了相应的grouping set */ for (transno = 0; transno < aggstate->numtrans; transno++) { //遍历转换函数 AggStatePerTrans pertrans = &aggstate->pertrans[transno]; AggStatePerGroup pergroupstate = &pergroup[transno]; //初始化聚合 initialize_aggregate(aggstate, pertrans, pergroupstate); } } return entry;}/* * Find or create a hashtable entry for the tuple group containing the * given tuple. The tuple must be the same type as the hashtable entries. * 为包含给定元组的元组group检索或创建哈希表条目. * 元组必须与哈希表条目具有相同的类型. * * If isnew is NULL, we do not create new entries; we return NULL if no * match is found. * 如isnew为NULL,不需要创建新的条目;如无匹配则返回NULL. * * If isnew isn't NULL, then a new entry is created if no existing entry * matches. On return, *isnew is true if the entry is newly created, * false if it existed already. ->additional_data in the new entry has * been zeroed. * 如isnew不是NULL,如没有与之匹配的现存条目,则创建新的条目. * 在返回的时候,如新创建了条目,则*isnew为T,如已存在条目则为F. * 新条目中的->additional_data已初始化为0. */TupleHashEntryLookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew){ //哈希条目 TupleHashEntryData *entry; MemoryContext oldContext; bool found; MinimalTuple key; /* Need to run the hash functions in short-lived context */ //在短生命周期中执行哈希函数 oldContext = MemoryContextSwitchTo(hashtable->tempcxt); /* set up data needed by hash and match functions */ //设置哈希和匹配函数需要的数据 hashtable->inputslot = slot; hashtable->in_hash_funcs = hashtable->tab_hash_funcs; hashtable->cur_eq_func = hashtable->tab_eq_func; //参考inputslot的flag key = NULL; /* flag to reference inputslot */ if (isnew) { //新条目,插入到哈希表中 entry = tuplehash_insert(hashtable->hashtab, key, &found); if (found) { /* found pre-existing entry */ //发现上一个已存在的条目 *isnew = false; } else { /* created new entry */ //创建新条目 *isnew = true; /* zero caller data */ //初始化调用者的数据 entry->additional = NULL; MemoryContextSwitchTo(hashtable->tablecxt); /* Copy the first tuple into the table context */ //拷贝第一个条目到数据表上下文中 entry->firstTuple = ExecCopySlotMinimalTuple(slot); } } else { //isnew为NULL,调用tuplehash_lookup entry = tuplehash_lookup(hashtable->hashtab, key); } MemoryContextSwitchTo(oldContext); return entry;}/* * (Re)Initialize an individual aggregate. * (重新)初始化单独的聚合函数. * * This function handles only one grouping set, already set in * aggstate->current_set. * 该函数只处理一个grouping set(已在aggstate->current_set设置) * * When called, CurrentMemoryContext should be the per-query context. * 调用完毕,CurrentMemoryContext应为per-query上下文. */static voidinitialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate){ /* * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate. * 为每一个DISTINCT/ORDER BY聚合启动刷新排序操作 */ if (pertrans->numSortCols > 0) { /* * In case of rescan, maybe there could be an uncompleted sort * operation? Clean it up if so. * 如为重新扫描,可能存在未完成的排序操作.如存在,则需清除. */ if (pertrans->sortstates[aggstate->current_set]) tuplesort_end(pertrans->sortstates[aggstate->current_set]); /* * We use a plain Datum sorter when there's a single input column; * otherwise sort the full tuple. (See comments for * process_ordered_aggregate_single.) * 如存在一个独立的输入列,使用普通的Datum排序器即可. * 否则的话,排序全部元组(参见process_ordered_aggregate_single中的注释) */ if (pertrans->numInputs == 1) { //属性信息 Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0); //Datum sorter pertrans->sortstates[aggstate->current_set] = tuplesort_begin_datum(attr->atttypid, pertrans->sortOperators[0], pertrans->sortCollations[0], pertrans->sortNullsFirst[0], work_mem, NULL, false); } else //full tuple sorter pertrans->sortstates[aggstate->current_set] = tuplesort_begin_heap(pertrans->sortdesc, pertrans->numSortCols, pertrans->sortColIdx, pertrans->sortOperators, pertrans->sortCollations, pertrans->sortNullsFirst, work_mem, NULL, false); } /* * (Re)set transValue to the initial value. * (重新)设置transValue为初始值 * * Note that when the initial value is pass-by-ref, we must copy it (into * the aggcontext) since we will pfree the transValue later. * 注意初始值为pass-by-ref(引用传递),必须拷贝该参数(到aggcontext中),因为在后续会用pfree释放transValue. */ if (pertrans->initValueIsNull) pergroupstate->transValue = pertrans->initValue; else { MemoryContext oldContext; oldContext = MemoryContextSwitchTo( aggstate->curaggcontext->ecxt_per_tuple_memory); //拷贝 pergroupstate->transValue = datumCopy(pertrans->initValue, pertrans->transtypeByVal, pertrans->transtypeLen); MemoryContextSwitchTo(oldContext); } pergroupstate->transValueIsNull = pertrans->initValueIsNull; /* * If the initial value for the transition state doesn't exist in the * pg_aggregate table then we will let the first non-NULL value returned * from the outer procNode become the initial value. (This is useful for * aggregates like max() and min().) The noTransValue flag signals that we * still need to do this. * 如转换状态的初始值在pg_aggregate表中不存在,那么让outer procNode中的第一个非NULL值返回作为初始值. * (这在max()和min()聚合时会非常有用).noTransValue标记提示需要执行该动作. */ pergroupstate->noTransValue = pertrans->initValueIsNull;}/* * Select the current grouping set; affects current_set and * curaggcontext. * 选择当前的goruping set;影响的参数包括current_set和curaggcontext. */static voidselect_current_set(AggState *aggstate, int setno, bool is_hash){ /* when changing this, also adapt ExecInterpExpr() and friends */ //在修改的时候,会同时调整ExecInterpExpr()和友元 if (is_hash) aggstate->curaggcontext = aggstate->hashcontext; else aggstate->curaggcontext = aggstate->aggcontexts[setno]; aggstate->current_set = setno;}
三、跟踪分析
测试脚本
//禁用并行testdb=# set max_parallel_workers_per_gather=0;SETtestdb=# explain verbose select bh,avg(c1),min(c1),max(c2) from t_agg group by bh; QUERY PLAN --------------------------------------------------------------------------- HashAggregate (cost=13677.00..13677.06 rows=5 width=45) Output: bh, avg(c1), min(c1), max(c2) Group Key: t_agg.bh -> Seq Scan on public.t_agg (cost=0.00..8677.00 rows=500000 width=13) Output: bh, c1, c2, c3, c4, c5, c6(5 rows)
跟踪分析
(gdb) b ExecAggBreakpoint 1 at 0x6ee444: file nodeAgg.c, line 1536.(gdb) cContinuing.Breakpoint 1, ExecAgg (pstate=0x1f895a0) at nodeAgg.c:15361536 AggState *node = castNode(AggState, pstate);(gdb)
输入参数,AggState,在ExecInitAgg函数中初始化
(gdb) p *pstate$1 = {type = T_AggState, plan = 0x1f7b1e0, state = 0x1f89388, ExecProcNode = 0x6ee438 , ExecProcNodeReal = 0x6ee438 , instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0, qual = 0x0, lefttree = 0x1f89b10, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, ps_ResultTupleSlot = 0x1f8a710, ps_ExprContext = 0x1f89a50, ps_ProjInfo = 0x1f8a850, scandesc = 0x1f89e60}
使用Hash实现
(gdb) n1537 TupleTableSlot *result = NULL;(gdb) 1539 CHECK_FOR_INTERRUPTS();(gdb) 1541 if (!node->agg_done)(gdb) 1544 switch (node->phase->aggstrategy)(gdb) p node->phase->aggstrategy$2 = AGG_HASHED(gdb) n1547 if (!node->table_filled)(gdb) 1548 agg_fill_hash_table(node);(gdb)
进入agg_fill_hash_table
(gdb) stepagg_fill_hash_table (aggstate=0x1f895a0) at nodeAgg.c:19151915 ExprContext *tmpcontext = aggstate->tmpcontext;
agg_fill_hash_table->提取输入的元组
(gdb) n1923 outerslot = fetch_input_tuple(aggstate);(gdb) stepfetch_input_tuple (aggstate=0x1f895a0) at nodeAgg.c:396396 if (aggstate->sort_in)(gdb) p aggstate->sort_in$3 = (Tuplesortstate *) 0x0(gdb) n406 slot = ExecProcNode(outerPlanState(aggstate));(gdb) 408 if (!TupIsNull(slot) && aggstate->sort_out)(gdb) p *slot$4 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false, tts_tuple = 0x1fa5998, tts_tupleDescriptor = 0x7ff7dd2d1380, tts_mcxt = 0x1f89270, tts_buffer = 124, tts_nvalid = 0, tts_values = 0x1f89d48, tts_isnull = 0x1f89d80, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = { bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}(gdb) n411 return slot;(gdb) 412 }(gdb) agg_fill_hash_table (aggstate=0x1f895a0) at nodeAgg.c:19241924 if (TupIsNull(outerslot))(gdb)
lookup_hash_entries->进入lookup_hash_entries,为当前元组在所有已完成hash的grouping sets中检索hash条目,为后续的advance_aggregates函数调用返回pergroup指针数组.
(gdb) n1928 tmpcontext->ecxt_outertuple = outerslot;(gdb) 1931 lookup_hash_entries(aggstate);(gdb) (gdb) steplookup_hash_entries (aggstate=0x1f895a0) at nodeAgg.c:15091509 int numHashes = aggstate->num_hashes;(gdb) n1510 AggStatePerGroup *pergroup = aggstate->hash_pergroup;(gdb) p numHashes$5 = 1(gdb) n1513 for (setno = 0; setno < numHashes; setno++)(gdb) p *pergroup$6 = (AggStatePerGroup) 0x0(gdb) n1515 select_current_set(aggstate, setno, true);(gdb) stepselect_current_set (aggstate=0x1f895a0, setno=0, is_hash=true) at nodeAgg.c:306306 if (is_hash)(gdb) n307 aggstate->curaggcontext = aggstate->hashcontext;(gdb) 311 aggstate->current_set = setno;(gdb) 312 }(gdb) lookup_hash_entries (aggstate=0x1f895a0) at nodeAgg.c:15161516 pergroup[setno] = lookup_hash_entry(aggstate)->additional;(gdb)
lookup_hash_entry->调用lookup_hash_entry,该函数为包含当前元组的组检索或创建哈希表条目.
(gdb) steplookup_hash_entry (aggstate=0x1f895a0) at nodeAgg.c:14511451 TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;(gdb) n1452 AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];(gdb) p aggstate->current_set$7 = 0(gdb) n1453 TupleTableSlot *hashslot = perhash->hashslot;(gdb) p *perhash$8 = {hashtable = 0x1f9fc98, hashiter = {cur = 0, end = 0, done = false}, hashslot = 0x1f8b198, hashfunctions = 0x1f8b230, eqfuncoids = 0x1f9fc50, numCols = 1, numhashGrpCols = 1, largestGrpColIdx = 1, hashGrpColIdxInput = 0x1f9fbb0, hashGrpColIdxHash = 0x1f9fbd0, aggnode = 0x1f7b1e0}(gdb) n1459 slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);(gdb) 1460 ExecClearTuple(hashslot);(gdb) p *perhash$9 = {hashtable = 0x1f9fc98, hashiter = {cur = 0, end = 0, done = false}, hashslot = 0x1f8b198, hashfunctions = 0x1f8b230, eqfuncoids = 0x1f9fc50, numCols = 1, numhashGrpCols = 1, largestGrpColIdx = 1, hashGrpColIdxInput = 0x1f9fbb0, hashGrpColIdxHash = 0x1f9fbd0, aggnode = 0x1f7b1e0}(gdb) p *perhash->hashslot$10 = {type = T_TupleTableSlot, tts_isempty = true, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false, tts_tuple = 0x0, tts_tupleDescriptor = 0x1f8b080, tts_mcxt = 0x1f89270, tts_buffer = 0, tts_nvalid = 0, tts_values = 0x1f8b1f8, tts_isnull = 0x1f8b200, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = { bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}(gdb) p *perhash->hashfunctions$11 = {fn_addr = 0x4c8a31 , fn_oid = 400, fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x1f89270, fn_expr = 0x0}(gdb) p *perhash->eqfuncoids$12 = 67(gdb) p *perhash->hashGrpColIdxInput$13 = 1(gdb) p *perhash->hashGrpColIdxHash$14 = 1(gdb) p *perhash->aggnode$15 = {plan = {type = T_Agg, startup_cost = 13677, total_cost = 13677.0625, plan_rows = 5, plan_width = 45, parallel_aware = false, parallel_safe = false, plan_node_id = 0, targetlist = 0x1f84108, qual = 0x0, lefttree = 0x1f83bc8, righttree = 0x0, initPlan = 0x0, extParam = 0x0, allParam = 0x0}, aggstrategy = AGG_HASHED, aggsplit = AGGSPLIT_SIMPLE, numCols = 1, grpColIdx = 0x1f83eb8, grpOperators = 0x1f83e98, numGroups = 5, aggParams = 0x0, groupingSets = 0x0, chain = 0x0}(gdb)
lookup_hash_entry->遍历分组键(这里是bh列)
(gdb) n1462 for (i = 0; i < perhash->numhashGrpCols; i++)(gdb) 1464 int varNumber = perhash->hashGrpColIdxInput[i] - 1;(gdb) 1466 hashslot->tts_values[i] = inputslot->tts_values[varNumber];(gdb) p varNumber$16 = 0(gdb) n1467 hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];(gdb) 1462 for (i = 0; i < perhash->numhashGrpCols; i++)(gdb) p *hashslot$17 = {type = T_TupleTableSlot, tts_isempty = true, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false, tts_tuple = 0x0, tts_tupleDescriptor = 0x1f8b080, tts_mcxt = 0x1f89270, tts_buffer = 0, tts_nvalid = 0, tts_values = 0x1f8b1f8, tts_isnull = 0x1f8b200, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = { bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}(gdb) p *hashslot->tts_values[0]$18 = 811222795(gdb)
lookup_hash_entry->调用LookupTupleHashEntry,该函数使用已过滤的元组检索或者创建哈希表条目
(gdb) stepLookupTupleHashEntry (hashtable=0x1f9fc98, slot=0x1f8b198, isnew=0x7fff7f065e17) at execGrouping.c:290290 oldContext = MemoryContextSwitchTo(hashtable->tempcxt);(gdb) p *hashtable$19 = {hashtab = 0x1f9fd30, numCols = 1, keyColIdx = 0x1f9fbd0, tab_hash_funcs = 0x1f8b230, tab_eq_func = 0x1fa0050, tablecxt = 0x1f91370, tempcxt = 0x1f9d8e0, entrysize = 24, tableslot = 0x1f9ffb8, inputslot = 0x0, in_hash_funcs = 0x0, cur_eq_func = 0x0, hash_iv = 0, exprcontext = 0x1fa0970}(gdb) p *hashtable->hashtab$20 = {size = 8, members = 0, sizemask = 7, grow_threshold = 7, data = 0x1f9fd88, ctx = 0x1f89270, private_data = 0x1f9fc98}(gdb) p *hashtable->keyColIdx$21 = 1(gdb) p *hashtable->tab_hash_funcs$22 = {fn_addr = 0x4c8a31 , fn_oid = 400, fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x1f89270, fn_expr = 0x0}(gdb) n293 hashtable->inputslot = slot;(gdb) 294 hashtable->in_hash_funcs = hashtable->tab_hash_funcs;(gdb) 295 hashtable->cur_eq_func = hashtable->tab_eq_func;(gdb) 297 key = NULL; /* flag to reference inputslot */(gdb) 299 if (isnew)(gdb) 301 entry = tuplehash_insert(hashtable->hashtab, key, &found);(gdb) steptuplehash_insert (tb=0x1f9fd30, key=0x0, found=0x7fff7f065dd7) at ../../../src/include/lib/simplehash.h:490490 uint32 hash = SH_HASH_KEY(tb, key);(gdb) finishRun till exit from #0 tuplehash_insert (tb=0x1f9fd30, key=0x0, found=0x7fff7f065dd7) at ../../../src/include/lib/simplehash.h:4900x00000000006d3a1e in LookupTupleHashEntry (hashtable=0x1f9fc98, slot=0x1f8b198, isnew=0x7fff7f065e17) at execGrouping.c:301301 entry = tuplehash_insert(hashtable->hashtab, key, &found);Value returned is $23 = (TupleHashEntryData *) 0x1f9fdb8(gdb) n303 if (found)(gdb) p found$24 = false(gdb)
LookupTupleHashEntry->插入新条目,返回entry
(gdb) n311 *isnew = true;(gdb) 313 entry->additional = NULL;(gdb) 314 MemoryContextSwitchTo(hashtable->tablecxt);(gdb) 316 entry->firstTuple = ExecCopySlotMinimalTuple(slot);(gdb) 324 MemoryContextSwitchTo(oldContext);(gdb) 326 return entry;(gdb) 327 }
lookup_hash_entry->回到lookup_hash_entry
(gdb) lookup_hash_entry (aggstate=0x1f895a0) at nodeAgg.c:14741474 if (isnew)(gdb)
lookup_hash_entry->分配内存,设置条目的额外信息
(gdb) n1481 sizeof(AggStatePerGroupData) * aggstate->numtrans);(gdb) p *entry$25 = {firstTuple = 0x1f91488, additional = 0x0, status = 1, hash = 443809650}(gdb) n1480 MemoryContextAlloc(perhash->hashtable->tablecxt,(gdb) 1479 pergroup = (AggStatePerGroup)(gdb) 1482 entry->additional = pergroup;(gdb)
lookup_hash_entry->为新元组group初始化聚合操作, lookup_hash_entries()已选择了相应的grouping set(这里有3个聚合列)
1488 for (transno = 0; transno < aggstate->numtrans; transno++)(gdb) p aggstate->numtrans$26 = 3(gdb) (gdb) n1490 AggStatePerTrans pertrans = &aggstate->pertrans[transno];(gdb) 1491 AggStatePerGroup pergroupstate = &pergroup[transno];(gdb) p *pertrans$27 = {aggref = 0x1f84650, aggshared = false, numInputs = 1, numTransInputs = 1, transfn_oid = 768, serialfn_oid = 0, deserialfn_oid = 0, aggtranstype = 23, transfn = {fn_addr = 0x93e877 , fn_oid = 768, fn_nargs = 2, fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x1f89270, fn_expr = 0x1fa0b00}, serialfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, deserialfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, aggCollation = 0, numSortCols = 0, numDistinctCols = 0, sortColIdx = 0x0, sortOperators = 0x0, sortCollations = 0x0, sortNullsFirst = 0x0, equalfnOne = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, equalfnMulti = 0x0, initValue = 0, initValueIsNull = true, inputtypeLen = 0, transtypeLen = 4, inputtypeByVal = false, transtypeByVal = true, sortslot = 0x0, uniqslot = 0x0, sortdesc = 0x0, sortstates = 0x1f9fb70, transfn_fcinfo = {flinfo = 0x1f99418, context = 0x1f895a0, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 2, arg = {0 }, argnull = {false }}, serialfn_fcinfo = {flinfo = 0x0, context = 0x0, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 0, arg = {0 }, argnull = {false }}, deserialfn_fcinfo = {flinfo = 0x0, context = 0x0, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 0, arg = { 0 }, argnull = {false }}}(gdb) n1493 initialize_aggregate(aggstate, pertrans, pergroupstate);(gdb) p *pergroupstate$28 = {transValue = 9187201950435737471, transValueIsNull = 127, noTransValue = 127}(gdb) n1488 for (transno = 0; transno < aggstate->numtrans; transno++)(gdb) p *aggstate$29 = {ss = {ps = {type = T_AggState, plan = 0x1f7b1e0, state = 0x1f89388, ExecProcNode = 0x6ee438 , ExecProcNodeReal = 0x6ee438 , instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0, qual = 0x0, lefttree = 0x1f89b10, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0, ps_ResultTupleSlot = 0x1f8a710, ps_ExprContext = 0x1f89a50, ps_ProjInfo = 0x1f8a850, scandesc = 0x1f89e60}, ss_currentRelation = 0x0, ss_currentScanDesc = 0x0, ss_ScanTupleSlot = 0x1f8a3b8}, aggs = 0x1f8ad60, numaggs = 3, numtrans = 3, aggstrategy = AGG_HASHED, aggsplit = AGGSPLIT_SIMPLE, phase = 0x1f8ae58, numphases = 1, current_phase = 0, peragg = 0x1f9f930, pertrans = 0x1f993f0, hashcontext = 0x1f89990, aggcontexts = 0x1f897b8, tmpcontext = 0x1f897d8, curaggcontext = 0x1f89990, curperagg = 0x0, curpertrans = 0x0, input_done = false, agg_done = false, projected_set = -1, current_set = 0, grouped_cols = 0x0, all_grouped_cols = 0x1f8aff0, maxsets = 1, phases = 0x1f8ae58, sort_in = 0x0, sort_out = 0x0, sort_slot = 0x0, pergroups = 0x0, grp_firstTuple = 0x0, table_filled = false, num_hashes = 1, perhash = 0x1f8aeb0, hash_pergroup = 0x1f9fb48, all_pergroups = 0x1f9fb48, combinedproj = 0x0}(gdb) n1490 AggStatePerTrans pertrans = &aggstate->pertrans[transno];(gdb) 1491 AggStatePerGroup pergroupstate = &pergroup[transno];(gdb) 1493 initialize_aggregate(aggstate, pertrans, pergroupstate);(gdb) 1488 for (transno = 0; transno < aggstate->numtrans; transno++)(gdb) 1490 AggStatePerTrans pertrans = &aggstate->pertrans[transno];(gdb) 1491 AggStatePerGroup pergroupstate = &pergroup[transno];(gdb) 1493 initialize_aggregate(aggstate, pertrans, pergroupstate);(gdb) 1488 for (transno = 0; transno < aggstate->numtrans; transno++)(gdb) 1497 return entry;(gdb) 1498 }(gdb)
lookup_hash_entries->回到lookup_hash_entries
(gdb) nlookup_hash_entries (aggstate=0x1f895a0) at nodeAgg.c:15131513 for (setno = 0; setno < numHashes; setno++)
agg_fill_hash_table->回到agg_fill_hash_table
(gdb) n1518 }(gdb) agg_fill_hash_table (aggstate=0x1f895a0) at nodeAgg.c:19341934 advance_aggregates(aggstate);(gdb)
advance_aggregates->进入advance_aggregates
(gdb) stepadvance_aggregates (aggstate=0x1f895a0) at nodeAgg.c:680680 ExecEvalExprSwitchContext(aggstate->phase->evaltrans,(gdb) p *aggstate->phase->evaltrans$30 = {tag = {type = T_ExprState}, flags = 6 '\006', resnull = false, resvalue = 0, resultslot = 0x0, steps = 0x1fa10d0, evalfunc = 0x6cd882 , expr = 0x1f895a0, evalfunc_private = 0x6cb43e , steps_len = 16, steps_alloc = 16, parent = 0x1f895a0, ext_params = 0x0, innermost_caseval = 0x0, innermost_casenull = 0x0, innermost_domainval = 0x0, innermost_domainnull = 0x0}(gdb) stepExecEvalExprSwitchContext (state=0x1fa1038, econtext=0x1f897d8, isNull=0x7fff7f065e9f) at ../../../src/include/executor/executor.h:312312 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);(gdb) finishRun till exit from #0 ExecEvalExprSwitchContext (state=0x1fa1038, econtext=0x1f897d8, isNull=0x7fff7f065e9f) at ../../../src/include/executor/executor.h:312advance_aggregates (aggstate=0x1f895a0) at nodeAgg.c:683683 }Value returned is $31 = 0
进入第2轮循环
(gdb) stepagg_fill_hash_table (aggstate=0x1f895a0) at nodeAgg.c:19401940 ResetExprContext(aggstate->tmpcontext);(gdb) n1941 }
查看相关信息
(gdb) n1941 }(gdb) 1923 outerslot = fetch_input_tuple(aggstate);(gdb) 1924 if (TupIsNull(outerslot))(gdb) n1928 tmpcontext->ecxt_outertuple = outerslot;(gdb) 1931 lookup_hash_entries(aggstate);(gdb) 1934 advance_aggregates(aggstate);(gdb) 1940 ResetExprContext(aggstate->tmpcontext);(gdb) p *outerslot$32 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = true, tts_tuple = 0x1fa5998, tts_tupleDescriptor = 0x7ff7dd2d1380, tts_mcxt = 0x1f89270, tts_buffer = 124, tts_nvalid = 3, tts_values = 0x1f89d48, tts_isnull = 0x1f89d80, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = { bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 16, tts_fixedTupleDescriptor = true}(gdb) x/32x outerslot->tts_values0x1f89d48: 0x28 0xf6 0x0b 0xb1 0xf7 0x7f 0x00 0x000x1f89d50: 0x02 0x00 0x00 0x00 0x00 0x00 0x00 0x000x1f89d58: 0x02 0x00 0x00 0x00 0x00 0x00 0x00 0x000x1f89d60: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00
tuple数据
(gdb) x/56x outerslot->tts_tuple->t_data->t_bits0x7ff7b2e1365f: 0x00 0x0b 0x47 0x5a 0x30 0x31 0x00 0x000x7ff7b2e13667: 0x00 0x01 0x00 0x00 0x00 0x01 0x00 0x000x7ff7b2e1366f: 0x00 0x01 0x00 0x00 0x00 0x01 0x00 0x000x7ff7b2e13677: 0x00 0x01 0x00 0x00 0x00 0x01 0x00 0x000x7ff7b2e1367f: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x000x7ff7b2e13687: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x000x7ff7b2e1368f: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00
DONE!
四、参考资料
N/A