PostgreSQL流复制小细节--从库如何确定起始位点receiveStart

    科技2022-08-16  109

    一、问题引出

    PostgreSQL的流复制大家再熟悉不过了,今天在看文档时,脑子里突然冒出一个问题。

    从库起walreceiver进程时是如何确定该从哪个点开始接收日志?这个起始的点在数据库中是以什么形式记录的?怎么来的?是共享内存结构体?还是记录在ControlFile?还是有专门的系统表去存储?

    带着这些乱七八糟的疑问,翻阅了下源代码,并gdb跟踪调试了下从库起walreceiver进程的过程,总算是把这个问题搞明白了。

    以下把这个起始点描述为receiveStart

    二、代码走读

    通过阅读源代码,了解到receiveStart是共享内存结构体WalRcv的成员。

    WalRcv结构体和XLogCtl、ControlFile等类似,都是在父进程PostMaster进程启动过程中,创建共享内这一步骤实现的。父进程提前创建好所有的共享内存,并做些初始化,之后fork出各个子进程,将共享内存地址映射给子进程。

    WalRcv结构体的定义:

    /* Shared memory area for management of walreceiver process */ typedef struct { /* * PID of currently active walreceiver process, its current state and * start time (actually, the time at which it was requested to be * started). */ pid_t pid; WalRcvState walRcvState; pg_time_t startTime; /* * receiveStart and receiveStartTLI indicate the first byte position and * timeline that will be received. When startup process starts the * walreceiver, it sets these to the point where it wants the streaming to * begin. */ XLogRecPtr receiveStart; TimeLineID receiveStartTLI; /* * receivedUpto-1 is the last byte position that has already been * received, and receivedTLI is the timeline it came from. At the first * startup of walreceiver, these are set to receiveStart and * receiveStartTLI. After that, walreceiver updates these whenever it * flushes the received WAL to disk. */ XLogRecPtr receivedUpto; TimeLineID receivedTLI; /* * latestChunkStart is the starting byte position of the current "batch" * of received WAL. It's actually the same as the previous value of * receivedUpto before the last flush to disk. Startup process can use * this to detect whether it's keeping up or not. */ XLogRecPtr latestChunkStart; /* * Time of send and receive of any message received. */ TimestampTz lastMsgSendTime; TimestampTz lastMsgReceiptTime; /* * Latest reported end of WAL on the sender */ XLogRecPtr latestWalEnd; TimestampTz latestWalEndTime; /* * connection string; initially set to connect to the primary, and later * clobbered to hide security-sensitive fields. */ char conninfo[MAXCONNINFO]; /* * Host name (this can be a host name, an IP address, or a directory path) * and port number of the active replication connection. */ char sender_host[NI_MAXHOST]; int sender_port; /* * replication slot name; is also used for walreceiver to connect with the * primary */ char slotname[NAMEDATALEN]; /* set true once conninfo is ready to display (obfuscated pwds etc) */ bool ready_to_display; /* * Latch used by startup process to wake up walreceiver after telling it * where to start streaming (after setting receiveStart and * receiveStartTLI), and also to tell it to send apply feedback to the * primary whenever specially marked commit records are applied. This is * normally mapped to procLatch when walreceiver is running. */ Latch *latch; slock_t mutex; /* locks shared variables shown above */ /* * force walreceiver reply? This doesn't need to be locked; memory * barriers for ordering are sufficient. But we do need atomic fetch and * store semantics, so use sig_atomic_t. */ sig_atomic_t force_reply; /* used as a bool */ } WalRcvData; extern WalRcvData *WalRcv;

    WalRcv的初始化:

    PostmasterMain() --> reset_shared() --> CreateSharedMemoryAndSemaphores() --> WalRcvShmemInit()

    /* Allocate and initialize walreceiver-related shared memory */ void WalRcvShmemInit(void) { bool found; WalRcv = (WalRcvData *) ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found); if (!found) { /* First time through, so initialize */ MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); WalRcv->latch = NULL; } }

    WalRcv成员的赋值:

    流复制起始点receiveStart 的确定,来自以下函数

    /* * Request postmaster to start walreceiver. * * recptr indicates the position where streaming should begin, conninfo * is a libpq connection string to use, and slotname is, optionally, the name * of a replication slot to acquire. */ void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname) { /* 这里获取到共享内存结构体WalRcv */ WalRcvData *walrcv = WalRcv; bool launch = false; pg_time_t now = (pg_time_t) time(NULL); Latch *latch; /* * We always start at the beginning of the segment. That prevents a broken * segment (i.e., with no records in the first half of a segment) from * being created by XLOG streaming, which might cause trouble later on if * the segment is e.g archived. */ if (XLogSegmentOffset(recptr, wal_segment_size) != 0) recptr -= XLogSegmentOffset(recptr, wal_segment_size); SpinLockAcquire(&walrcv->mutex); /* It better be stopped if we try to restart it */ Assert(walrcv->walRcvState == WALRCV_STOPPED || walrcv->walRcvState == WALRCV_WAITING); if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else walrcv->conninfo[0] = '\0'; if (slotname != NULL) strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN); else walrcv->slotname[0] = '\0'; if (walrcv->walRcvState == WALRCV_STOPPED) { launch = true; walrcv->walRcvState = WALRCV_STARTING; } else walrcv->walRcvState = WALRCV_RESTARTING; walrcv->startTime = now; /* * If this is the first startup of walreceiver (on this timeline), * initialize receivedUpto and latestChunkStart to the starting point. */ if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) { walrcv->receivedUpto = recptr; walrcv->receivedTLI = tli; walrcv->latestChunkStart = recptr; } /* 这里就对receiveStart进行赋值,取值为recptr */ walrcv->receiveStart = recptr; walrcv->receiveStartTLI = tli; latch = walrcv->latch; SpinLockRelease(&walrcv->mutex); if (launch) SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); else if (latch) SetLatch(latch); }

    三、跟踪调试

    跟踪前期过程比较繁琐,直接来看进入RequestXLogStreaming 函数后的stack信息:

    Breakpoint 5, RequestXLogStreaming (tli=1, recptr=3388998312, conninfo=0xd42728 "user=repuser password=repuser host=192.168.92.128 port=6548 sslmode=disable sslcompression=0 target_session_attrs=any", slotname=0x0) at walreceiverfuncs.c:225 225 { (gdb) bt #0 RequestXLogStreaming (tli=1, recptr=3388998312, conninfo=0xd42728 "user=repuser password=repuser host=192.168.92.128 port=6548 sslmode=disable sslcompression=0 target_session_attrs=any", slotname=0x0) at walreceiverfuncs.c:228 #1 0x00000000004f6c3e in WaitForWALToBecomeAvailable (tliRecPtr=3388998288, fetching_ckpt=false, randAccess=false, RecPtr=3388998312) at xlog.c:11979 #2 XLogPageRead (xlogreader=0xd427b8, targetPagePtr=<optimized out>, reqLen=680, targetRecPtr=3388998288, readBuf=0xd6fca8 "\230\320\006", readTLI=0xd4306c) at xlog.c:11720 #3 0x0000000000503b83 in ReadPageInternal (state=state@entry=0xd427b8, pageptr=pageptr@entry=3388997632, reqLen=reqLen@entry=680) at xlogreader.c:577 #4 0x0000000000504403 in XLogReadRecord (state=state@entry=0xd427b8, RecPtr=3388998288, RecPtr@entry=0, errormsg=errormsg@entry=0x7fffffffa4f8) at xlogreader.c:278 #5 0x00000000004f4dd8 in ReadRecord (xlogreader=xlogreader@entry=0xd427b8, RecPtr=RecPtr@entry=0, emode=emode@entry=15, fetching_ckpt=fetching_ckpt@entry=false) at xlog.c:4213 #6 0x00000000004fabb1 in StartupXLOG () at xlog.c:7372 #7 0x00000000006bf591 in StartupProcessMain () at startup.c:211 #8 0x0000000000508725 in AuxiliaryProcessMain (argc=argc@entry=2, argv=argv@entry=0x7fffffffe240) at bootstrap.c:441 #9 0x00000000006bc7e9 in StartChildProcess (type=StartupProcess) at postmaster.c:5337 #10 0x00000000006bee55 in PostmasterMain (argc=argc@entry=1, argv=argv@entry=0xd41be0) at postmaster.c:1372 #11 0x000000000047bb91 in main (argc=1, argv=0xd41be0) at main.c:228 (gdb)

    单步执行RequestXLogStreaming 函数,跟踪receiveStart的赋值过程

    (gdb) /* 获取共享内存结构体WalRcv */ 226 WalRcvData *walrcv = WalRcv; (gdb) n 228 pg_time_t now = (pg_time_t) time(NULL); /*此时receiveStart还未进行赋值 */ (gdb) p walrcv->receiveStart $4 = 0 (gdb) n 237 if (XLogSegmentOffset(recptr, wal_segment_size) != 0) (gdb) /*计算偏移量recptr */ 238 recptr -= XLogSegmentOffset(recptr, wal_segment_size); (gdb) 240 SpinLockAcquire(&walrcv->mutex); (gdb) 246 if (conninfo != NULL) (gdb) 247 strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); (gdb) 251 if (slotname != NULL) (gdb) 254 walrcv->slotname[0] = '\0'; (gdb) 256 if (walrcv->walRcvState == WALRCV_STOPPED) (gdb) 259 walrcv->walRcvState = WALRCV_STARTING; (gdb) 258 launch = true; (gdb) 269 if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) (gdb) 263 walrcv->startTime = now; (gdb) 269 if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) (gdb) 271 walrcv->receivedUpto = recptr; (gdb) 272 walrcv->receivedTLI = tli; /* 当前偏移量为3388997632 */ (gdb) p recptr $5 = 3388997632 (gdb) n 273 walrcv->latestChunkStart = recptr; (gdb) 275 walrcv->receiveStart = recptr; (gdb) 276 walrcv->receiveStartTLI = tli; /* 已经将偏移量赋值给receiveStart */ (gdb) p walrcv->receiveStart $6 = 3388997632 (gdb)

    目前已经获取到receiveStart=3388997632,这个数值是什么鬼?具体和什么有关联?

    继续走读代码,最终发现这个值和replayEndRecPtr及minRecoveryPoint有关。这两个变量很熟悉吧,所谓redo结束点、最小恢复点,这两个位点都属于各自共享内存结构体成员。源码中 startup redo一致性检查会看到这两个玩意。

    walreceiver跑起来后,来打印下这两个位点,如下:

    (gdb) bt #0 0x00007f6102d285e3 in __epoll_wait_nocancel () at ../sysdeps/unix/syscall-template.S:81 #1 0x0000000000707eae in WaitEventSetWaitBlock (nevents=1, occurred_events=0x7ffeeffdff00, cur_timeout=100, set=0x2b069e8) at latch.c:1048 #2 WaitEventSetWait (set=set@entry=0x2b069e8, timeout=timeout@entry=100, occurred_events=occurred_events@entry=0x7ffeeffdff00, nevents=nevents@entry=1, wait_event_info=wait_event_info@entry=83886091) at latch.c:1000 #3 0x00000000007082e7 in WaitLatchOrSocket (latch=0x7f60fc49da94, wakeEvents=wakeEvents@entry=27, sock=3, timeout=timeout@entry=100, wait_event_info=wait_event_info@entry=83886091) at latch.c:385 #4 0x00000000006e49c4 in WalReceiverMain () at walreceiver.c:489 #5 0x000000000050871a in AuxiliaryProcessMain (argc=argc@entry=2, argv=argv@entry=0x7ffeeffe0530) at bootstrap.c:462 #6 0x00000000006bc7e9 in StartChildProcess (type=WalReceiverProcess) at postmaster.c:5337 #7 0x00000000006bcff5 in MaybeStartWalReceiver () at postmaster.c:5499 #8 0x00000000006bde97 in sigusr1_handler (postgres_signal_arg=<optimized out>) at postmaster.c:5134 #9 <signal handler called> #10 0x00007f6102d1f0d3 in __select_nocancel () at ../sysdeps/unix/syscall-template.S:81 #11 0x000000000047a7f5 in ServerLoop () at postmaster.c:1671 #12 0x00000000006bee79 in PostmasterMain (argc=argc@entry=3, argv=argv@entry=0x2b05c40) at postmaster.c:1380 #13 0x000000000047bb91 in main (argc=3, argv=0x2b05c40) at main.c:228 /* 从共享内存结构体xlogctl中获取replayEndRecPtr */ (gdb) p XLogCtl->replayEndRecPtr $3 = 3388998288 /* 从共享内存结构体ControlFile中获取minRecoveryPoint*/ (gdb) p ControlFile->minRecoveryPoint $4 = 3388998288

    可以看到两个位点值都为3388998288,刚才说这两个位点取值决定了receiveStart。具体是这样的,XLogReadRecord函数中做了计算,如下RecPtr 对应replayEndRecPtr,XLOG_BLCKSZ为1024

    XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) { /* 省略代码 */ targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); /* 省略代码 */ }

    看下targetPagePtr 的计算结果,和receiveStart相同

    (gdb) p 3388998288 - (3388998288 % 1024) $9 = 3388997632 (gdb)

    当然也可以登录从库查询系统函数来验证: 关注两个字段: min_recovery_end_lsn = 0/CA000290 receive_start_lsn = 0/CA000000

    这里的min_recovery_end_lsn 对应最小恢复点minRecoveryPoint,忽略0/,CA000290是16进制数,转换为10进制为3388998288 ;

    receive_start_lsn 对应receiveStart,同样忽略0/,CA000000转换为10进制后为3388997632

    系统函数定义感兴趣的可以自行阅读源码,这里不再展开。

    postgres=# select * from pg_control_recovery(); -[ RECORD 1 ]-----------------+----------- min_recovery_end_lsn | 0/CA000290 min_recovery_end_timeline | 1 backup_start_lsn | 0/0 backup_end_lsn | 0/0 end_of_backup_record_required | f postgres=# select * from pg_stat_get_wal_receiver(); -[ RECORD 1 ]---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- pid | 53008 status | streaming receive_start_lsn | 0/CA000000 receive_start_tli | 1 received_lsn | 0/CA000290 received_tli | 1 last_msg_send_time | 2020-10-05 18:41:38.441149+08 last_msg_receipt_time | 2020-10-05 18:41:38.441273+08 latest_end_lsn | 0/CA000290 latest_end_time | 2020-10-05 18:38:38.238186+08 slot_name | sender_host | 192.168.92.128 sender_port | 6548 conninfo | user=repuser password=******** dbname=replication host=192.168.92.128 port=6548 fallback_application_name=walreceiver sslmode=disable sslcompression=0 target_session_attrs=any postgres=# exit

    之后walreceiver和walsender之间的发送接收大概是这样的:

    以物理复制为例 primary端进入一个walsendLoop入参为回调函数XLogSendPhysical,从standby请求的receiveStart开始发送,这个位点发送完成后每当主库刷新了flush position,同步更新Sentptr,然后开始发送。发送完会修改对应的位点Endptr,和Sentptr等,为下次发送做准备;

    standby端进入WalReceiverMain的主循环中,等待接收primary消息,接收到后会发送确认消息,并flush,同时startup去应用;

    两端在空闲时都是将socket fd挂到epoll的就绪链表里等待,达到sleeptime后会发送心跳包以维持连接状态。

    四、总结反思

    以上基本搞清楚了流复制起始位点receiveStart的设置过程,简单总结下。

    receiveStart是共享内存结构体WalRcv的成员,取值和controlFile(pg_contorl文件)有关。

    1) 数据库启动时创建共享内存,初始化一些共享内存结构体(ControlFile ,XLogCtl,WalRcv等); 2) 在做startup之前,获取pg_control文件内容至共享内存结构体ControlFile ; 3) startup根据ControlFile 进行redo,redo作完,达到一致性后更新一些位点信息(replayEndRecPtr,minRecoveryPoint等) 4) 在起walreceiver时,根据minRecoveryPoint、replayEndRecPtr等,更新共享内存结构体WalRcv

    再次感觉到了pg_contorl在PostgreSQL中的重要性。

    Processed: 0.014, SQL: 9