PostgreSQL 源码解读(211)- 后台进程#10(checkpointer-BufferSync)
发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,本节介绍了checkpoint中用于刷盘的函数:BufferSync,该函数Write out all dirty buffers in the pool(把缓冲池中所有脏页持久化到物理存储中).值得
千家信息网最后更新 2025年02月01日PostgreSQL 源码解读(211)- 后台进程#10(checkpointer-BufferSync)
本节介绍了checkpoint中用于刷盘的函数:BufferSync,该函数Write out all dirty buffers in the pool(把缓冲池中所有脏页持久化到物理存储中).
值得一提的是:checkpoint只会处理在checkpoint开始时的脏页(标记为BM_CHECKPOINT_NEEDED),而不会处理在checkpoint变脏的page.
一、数据结构
宏定义
checkpoints request flag bits,检查点请求标记位定义.
/* * OR-able request flag bits for checkpoints. The "cause" bits are used only * for logging purposes. Note: the flags must be defined so that it's * sensible to OR together request flags arising from different requestors. *//* These directly affect the behavior of CreateCheckPoint and subsidiaries */#define CHECKPOINT_IS_SHUTDOWN 0x0001 /* Checkpoint is for shutdown */#define CHECKPOINT_END_OF_RECOVERY 0x0002 /* Like shutdown checkpoint, but * issued at end of WAL recovery */#define CHECKPOINT_IMMEDIATE 0x0004 /* Do it without delays */#define CHECKPOINT_FORCE 0x0008 /* Force even if no activity */#define CHECKPOINT_FLUSH_ALL 0x0010 /* Flush all pages, including those * belonging to unlogged tables *//* These are important to RequestCheckpoint */#define CHECKPOINT_WAIT 0x0020 /* Wait for completion */#define CHECKPOINT_REQUESTED 0x0040 /* Checkpoint request has been made *//* These indicate the cause of a checkpoint request */#define CHECKPOINT_CAUSE_XLOG 0x0080 /* XLOG consumption */#define CHECKPOINT_CAUSE_TIME 0x0100 /* Elapsed time */
二、源码解读
BufferSync : 把缓冲池中所有脏页持久化到物理存储中.其主要逻辑如下:
1.执行相关校验,如确保调用SyncOneBuffer函数的正确性等;
2.根据checkpoint标记设置mask标记(如为XX,则unlogged buffer也会flush);
3.遍历缓存,使用BM_CHECKPOINT_NEEDED标记需要刷盘的缓存page;如无需要处理的page,则返回;
4.排序需刷盘的脏页,避免随机IO,提升性能;
5.为每一个需要刷脏页的表空间分配进度状态;
6.在单个标记的写进度上构建最小堆,并计算单个处理缓冲区占比多少;
7.如ts_heap不为空,循环处理
7.1获取buf_id
7.2调用SyncOneBuffer刷盘
7.3调用CheckpointWriteDelay,休眠以控制I/O频率
7.4释放资源,更新统计信息
/* * BufferSync -- Write out all dirty buffers in the pool. * 把缓冲池中所有脏页持久化到物理存储中. * * This is called at checkpoint time to write out all dirty shared buffers. * The checkpoint request flags should be passed in. If CHECKPOINT_IMMEDIATE * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN, * CHECKPOINT_END_OF_RECOVERY or CHECKPOINT_FLUSH_ALL is set, we write even * unlogged buffers, which are otherwise skipped. The remaining flags * currently have no effect here. * 该函数在checkpoint时把缓冲池中所有脏页刷到磁盘上. * 输入参数为checkpoint请求标记. * 如请求标记为CHECKPOINT_IMMEDIATE,在写入期间禁用延迟; * 如为CHECKPOINT_IS_SHUTDOWN/CHECKPOINT_END_OF_RECOVERY/CHECKPOINT_FLUSH_ALL, * 就算正常情况下会忽略的unlogged缓存,也会写入到磁盘上. * 其他标记在这里没有影响. */static voidBufferSync(int flags){ uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; int num_processed; int num_written; CkptTsStatus *per_ts_stat = NULL; Oid last_tsid; binaryheap *ts_heap; int i; int mask = BM_DIRTY; WritebackContext wb_context; /* Make sure we can handle the pin inside SyncOneBuffer */ //确保可以处理在SyncOneBuffer函数中的pin page ResourceOwnerEnlargeBuffers(CurrentResourceOwner); /* * Unless this is a shutdown checkpoint or we have been explicitly told, * we write only permanent, dirty buffers. But at shutdown or end of * recovery, we write all dirty buffers. */ //如为CHECKPOINT_IS_SHUTDOWN/CHECKPOINT_END_OF_RECOVERY/CHECKPOINT_FLUSH_ALL, //就算正常情况下会忽略的unlogged缓存,也会写入到磁盘上. if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_FLUSH_ALL)))) mask |= BM_PERMANENT; /* * Loop over all buffers, and mark the ones that need to be written with * BM_CHECKPOINT_NEEDED. Count them as we go (num_to_scan), so that we * can estimate how much work needs to be done. * 遍历缓存,使用BM_CHECKPOINT_NEEDED标记需要写入的page. * 对这些pages计数以便估算有多少工作需要完成. * * This allows us to write only those pages that were dirty when the * checkpoint began, and not those that get dirtied while it proceeds. * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us * later in this function, or by normal backends or the bgwriter cleaning * scan, the flag is cleared. Any buffer dirtied after this point won't * have the flag set. * 只需要写在checkpoint开始时的脏页,不需要包括在checkpoint期间变脏的page. * 一旦标记为BM_CHECKPOINT_NEEDED的脏页完成刷盘, * 在这个函数后续处理逻辑或者普通的后台进程/bgwriter进程会重置该标记. * 所有在该时点的脏页不会设置为BM_CHECKPOINT_NEEDED. * * Note that if we fail to write some buffer, we may leave buffers with * BM_CHECKPOINT_NEEDED still set. This is OK since any such buffer would * certainly need to be written for the next checkpoint attempt, too. * 要注意的是脏页刷盘出错,脏页的标记仍为BM_CHECKPOINT_NEEDED,在下次checkpoint是尝试再次刷盘. */ num_to_scan = 0; for (buf_id = 0; buf_id < NBuffers; buf_id++) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); /* * Header spinlock is enough to examine BM_DIRTY, see comment in * SyncOneBuffer. */ buf_state = LockBufHdr(bufHdr); if ((buf_state & mask) == mask) { CkptSortItem *item; buf_state |= BM_CHECKPOINT_NEEDED; item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; item->tsId = bufHdr->tag.rnode.spcNode; item->relNode = bufHdr->tag.rnode.relNode; item->forkNum = bufHdr->tag.forkNum; item->blockNum = bufHdr->tag.blockNum; } UnlockBufHdr(bufHdr, buf_state); } if (num_to_scan == 0) return; /* nothing to do */ WritebackContextInit(&wb_context, &checkpoint_flush_after); TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan); /* * Sort buffers that need to be written to reduce the likelihood of random * IO. The sorting is also important for the implementation of balancing * writes between tablespaces. Without balancing writes we'd potentially * end up writing to the tablespaces one-by-one; possibly overloading the * underlying system. * 排序需刷盘的脏页,用于避免随机IO. */ qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem), ckpt_buforder_comparator); num_spaces = 0; /* * Allocate progress status for each tablespace with buffers that need to * be flushed. This requires the to-be-flushed array to be sorted. * 为每一个需要刷脏页的表空间分配进度状态. */ last_tsid = InvalidOid; for (i = 0; i < num_to_scan; i++) { CkptTsStatus *s; Oid cur_tsid; cur_tsid = CkptBufferIds[i].tsId; /* * Grow array of per-tablespace status structs, every time a new * tablespace is found. */ if (last_tsid == InvalidOid || last_tsid != cur_tsid) { Size sz; num_spaces++; /* * Not worth adding grow-by-power-of-2 logic here - even with a * few hundred tablespaces this should be fine. */ sz = sizeof(CkptTsStatus) * num_spaces; if (per_ts_stat == NULL) per_ts_stat = (CkptTsStatus *) palloc(sz); else per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz); s = &per_ts_stat[num_spaces - 1]; memset(s, 0, sizeof(*s)); s->tsId = cur_tsid; /* * The first buffer in this tablespace. As CkptBufferIds is sorted * by tablespace all (s->num_to_scan) buffers in this tablespace * will follow afterwards. */ s->index = i; /* * progress_slice will be determined once we know how many buffers * are in each tablespace, i.e. after this loop. */ last_tsid = cur_tsid; } else { s = &per_ts_stat[num_spaces - 1]; } s->num_to_scan++; } Assert(num_spaces > 0); /* * Build a min-heap over the write-progress in the individual tablespaces, * and compute how large a portion of the total progress a single * processed buffer is. * 在单个标记的写进度上构建最小堆,并计算单个处理缓冲区占比多少. */ ts_heap = binaryheap_allocate(num_spaces, ts_ckpt_progress_comparator, NULL); for (i = 0; i < num_spaces; i++) { CkptTsStatus *ts_stat = &per_ts_stat[i]; ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan; binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat)); } binaryheap_build(ts_heap); /* * Iterate through to-be-checkpointed buffers and write the ones (still) * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between * tablespaces; otherwise the sorting would lead to only one tablespace * receiving writes at a time, making inefficient use of the hardware. * 迭代处理to-be-checkpointed buffers,刷脏页. * 在表空间之间写入是平衡的. */ num_processed = 0; num_written = 0; while (!binaryheap_empty(ts_heap)) { BufferDesc *bufHdr = NULL; CkptTsStatus *ts_stat = (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap)); buf_id = CkptBufferIds[ts_stat->index].buf_id; Assert(buf_id != -1); bufHdr = GetBufferDescriptor(buf_id); num_processed++; /* * We don't need to acquire the lock here, because we're only looking * at a single bit. It's possible that someone else writes the buffer * and clears the flag right after we check, but that doesn't matter * since SyncOneBuffer will then do nothing. However, there is a * further race condition: it's conceivable that between the time we * examine the bit here and the time SyncOneBuffer acquires the lock, * someone else not only wrote the buffer but replaced it with another * page and dirtied it. In that improbable case, SyncOneBuffer will * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { //只处理标记为BM_CHECKPOINT_NEEDED的page //调用SyncOneBuffer刷盘(一次一个page) if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); BgWriterStats.m_buf_written_checkpoints++; num_written++; } } /* * Measure progress independent of actually having to flush the buffer * - otherwise writing become unbalanced. */ ts_stat->progress += ts_stat->progress_slice; ts_stat->num_scanned++; ts_stat->index++; /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) { binaryheap_remove_first(ts_heap); } else { /* update heap with the new progress */ binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat)); } /* * Sleep to throttle our I/O rate. * 休眠 : 控制I/O频率 */ CheckpointWriteDelay(flags, (double) num_processed / num_to_scan); } /* issue all pending flushes */ IssuePendingWritebacks(&wb_context); pfree(per_ts_stat); per_ts_stat = NULL; binaryheap_free(ts_heap); /* * Update checkpoint statistics. As noted above, this doesn't include * buffers written by other backends or bgwriter scan. */ CheckpointStats.ckpt_bufs_written += num_written; TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan);}
三、跟踪分析
测试脚本
testdb=# update t_wal_ckpt set c2 = 'C4#'||substr(c2,4,40);UPDATE 1testdb=# checkpoint;
跟踪分析
(gdb) handle SIGINT print nostop passSIGINT is used by the debugger.Are you sure you want to change it? (y or n) ySignal Stop Print Pass to program DescriptionSIGINT No Yes Yes Interrupt(gdb) b CheckPointGutsBreakpoint 1 at 0x56f0ca: file xlog.c, line 8968.(gdb) cContinuing.Program received signal SIGINT, Interrupt.Breakpoint 1, CheckPointGuts (checkPointRedo=16953420440, flags=108) at xlog.c:89688968 CheckPointCLOG();(gdb) n8969 CheckPointCommitTs();(gdb) 8970 CheckPointSUBTRANS();(gdb) 8971 CheckPointMultiXact();(gdb) 8972 CheckPointPredicate();(gdb) 8973 CheckPointRelationMap();(gdb) 8974 CheckPointReplicationSlots();(gdb) 8975 CheckPointSnapBuild();(gdb) 8976 CheckPointLogicalRewriteHeap();(gdb) 8977 CheckPointBuffers(flags); /* performs all required fsyncs */(gdb) stepCheckPointBuffers (flags=108) at bufmgr.c:25832583 TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);(gdb) n2584 CheckpointStats.ckpt_write_t = GetCurrentTimestamp();(gdb) 2585 BufferSync(flags);(gdb) stepBufferSync (flags=108) at bufmgr.c:17931793 CkptTsStatus *per_ts_stat = NULL;(gdb) p flags$1 = 108(gdb) n1797 int mask = BM_DIRTY;(gdb) 1801 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);(gdb) 1808 if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |(gdb) 1810 mask |= BM_PERMANENT;(gdb) 1828 num_to_scan = 0;(gdb) 1829 for (buf_id = 0; buf_id < NBuffers; buf_id++)(gdb) 1831 BufferDesc *bufHdr = GetBufferDescriptor(buf_id);(gdb) 1837 buf_state = LockBufHdr(bufHdr);(gdb) p buf_id$2 = 0(gdb) p NBuffers$3 = 65536(gdb) n1839 if ((buf_state & mask) == mask)(gdb) 1853 UnlockBufHdr(bufHdr, buf_state);(gdb) 1829 for (buf_id = 0; buf_id < NBuffers; buf_id++)(gdb) 1831 BufferDesc *bufHdr = GetBufferDescriptor(buf_id);(gdb) 1837 buf_state = LockBufHdr(bufHdr);(gdb) 1839 if ((buf_state & mask) == mask)(gdb) 1853 UnlockBufHdr(bufHdr, buf_state);(gdb) 1829 for (buf_id = 0; buf_id < NBuffers; buf_id++)(gdb) b bufmgr.c:1856Breakpoint 2 at 0x8a68b3: file bufmgr.c, line 1856.(gdb) cContinuing.Breakpoint 2, BufferSync (flags=108) at bufmgr.c:18561856 if (num_to_scan == 0)(gdb) p num_to_scan$4 = 1(gdb) n1859 WritebackContextInit(&wb_context, &checkpoint_flush_after);(gdb) 1861 TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);(gdb) 1870 qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem),(gdb) 1873 num_spaces = 0;(gdb) 1879 last_tsid = InvalidOid;(gdb) 1880 for (i = 0; i < num_to_scan; i++)(gdb) 1885 cur_tsid = CkptBufferIds[i].tsId;(gdb) 1891 if (last_tsid == InvalidOid || last_tsid != cur_tsid)(gdb) p cur_tsid$5 = 1663(gdb) n1895 num_spaces++;(gdb) 1901 sz = sizeof(CkptTsStatus) * num_spaces;(gdb) 1903 if (per_ts_stat == NULL)(gdb) 1904 per_ts_stat = (CkptTsStatus *) palloc(sz);(gdb) 1908 s = &per_ts_stat[num_spaces - 1];(gdb) p sz$6 = 40(gdb) p num_spaces$7 = 1(gdb) n1909 memset(s, 0, sizeof(*s));(gdb) 1910 s->tsId = cur_tsid;(gdb) 1917 s->index = i;(gdb) 1924 last_tsid = cur_tsid;(gdb) 1892 {(gdb) 1931 s->num_to_scan++;(gdb) 1880 for (i = 0; i < num_to_scan; i++)(gdb) 1934 Assert(num_spaces > 0);(gdb) 1941 ts_heap = binaryheap_allocate(num_spaces,(gdb) 1945 for (i = 0; i < num_spaces; i++)(gdb) 1947 CkptTsStatus *ts_stat = &per_ts_stat[i];(gdb) 1949 ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;(gdb) 1951 binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));(gdb) 1945 for (i = 0; i < num_spaces; i++)(gdb) 1954 binaryheap_build(ts_heap);(gdb) 1962 num_processed = 0;(gdb) p *ts_heap$8 = {bh_size = 1, bh_space = 1, bh_has_heap_property = true, bh_compare = 0x8aa0d8 , bh_arg = 0x0, bh_nodes = 0x2d666d8}(gdb) n1963 num_written = 0;(gdb) 1964 while (!binaryheap_empty(ts_heap))(gdb) 1966 BufferDesc *bufHdr = NULL;(gdb) 1968 DatumGetPointer(binaryheap_first(ts_heap));(gdb) 1967 CkptTsStatus *ts_stat = (CkptTsStatus *)(gdb) 1970 buf_id = CkptBufferIds[ts_stat->index].buf_id;(gdb) 1971 Assert(buf_id != -1);(gdb) p buf_id$9 = 160(gdb) n1973 bufHdr = GetBufferDescriptor(buf_id);(gdb) 1975 num_processed++;(gdb) 1989 if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)(gdb) p *bufHdr$10 = {tag = {rnode = {spcNode = 1663, dbNode = 16384, relNode = 221290}, forkNum = MAIN_FORKNUM, blockNum = 0}, buf_id = 160, state = {value = 3549691904}, wait_backend_pid = 0, freeNext = -2, content_lock = {tranche = 53, state = { value = 536870912}, waiters = {head = 2147483647, tail = 2147483647}}}(gdb) n1991 if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)(gdb) 1993 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);(gdb) 1994 BgWriterStats.m_buf_written_checkpoints++;(gdb) 1995 num_written++;(gdb) 2003 ts_stat->progress += ts_stat->progress_slice;(gdb) 2004 ts_stat->num_scanned++;(gdb) 2005 ts_stat->index++;(gdb) 2008 if (ts_stat->num_scanned == ts_stat->num_to_scan)(gdb) 2010 binaryheap_remove_first(ts_heap);(gdb) 2021 CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);(gdb) 1964 while (!binaryheap_empty(ts_heap))(gdb) 2025 IssuePendingWritebacks(&wb_context);(gdb) 2027 pfree(per_ts_stat);(gdb) 2028 per_ts_stat = NULL;(gdb) 2029 binaryheap_free(ts_heap);(gdb) 2035 CheckpointStats.ckpt_bufs_written += num_written;(gdb) 2037 TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan);(gdb) 2038 }(gdb) CheckPointBuffers (flags=108) at bufmgr.c:25862586 CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();(gdb)
四、参考资料
PG Source Code
PgSQL · 特性分析 · 谈谈checkpoint的调度
标记
处理
函数
缓冲
缓存
单个
进度
物理
磁盘
空间
分析
存储
进程
最小
情况
状态
缓冲区
逻辑
频率
休眠
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
csc和华为网络安全区别
网络技术会涉及到的英文
运城软件开发价格
携程年度的数据库
连接服务器的网络密码
服务器上有声音但是不显示
公安网络安全日常管理台账
服务器一开防火墙就无法远程连接
q2服务器销售量
视频网络安全知识
dota2吧大数据库
杭州 数据库
网络技术信息咨询费收费明细
软件开发论文怎么写
服务器租用后台管理
微信云数据库管理工具
数据库中学分和先修课的数据类型
奥维互动地图基于什么软件开发
上海亨嘉网络技术最新招聘
万人服务器游戏
网络安全宣传周校园活动
java软件开发的发展前景怎样
知到计算机网络技术
双语数据库
公安行业网络安全产品
ssh连接服务器端口填多少
东莞汇电互联网科技
5ekz服务器指令
平板电脑安装软件开发
阴阳师手游连接服务器失败