1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Replace XLogRecPtr struct with a 64-bit integer.

This simplifies code that needs to do arithmetic on XLogRecPtrs.

To avoid changing on-disk format of data pages, the LSN on data pages is
still stored in the old format. That should keep pg_upgrade happy. However,
we have XLogRecPtrs embedded in the control file, and in the structs that
are sent over the replication protocol, so this changes breaks compatibility
of pg_basebackup and server. I didn't do anything about this in this patch,
per discussion on -hackers, the right thing to do would to be to change the
replication protocol to be architecture-independent, so that you could use
a newer version of pg_receivexlog, for example, against an older server
version.
This commit is contained in:
Heikki Linnakangas
2012-06-24 18:51:37 +03:00
parent 061e7efb1b
commit 0ab9d1c4b3
28 changed files with 279 additions and 357 deletions

View File

@@ -497,7 +497,7 @@ SendXlogRecPtrResult(XLogRecPtr ptr)
StringInfoData buf;
char str[MAXFNAMELEN];
snprintf(str, sizeof(str), "%X/%X", ptr.xlogid, ptr.xrecoff);
snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
pq_beginmessage(&buf, 'T'); /* RowDescription */
pq_sendint(&buf, 1, 2); /* 1 field */

View File

@@ -156,7 +156,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
(uint32) (startpoint >> 32), (uint32) startpoint);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{

View File

@@ -72,8 +72,11 @@ START_REPLICATION { return K_START_REPLICATION; }
" " ;
{hexdigit}+\/{hexdigit}+ {
if (sscanf(yytext, "%X/%X", &yylval.recptr.xlogid, &yylval.recptr.xrecoff) != 2)
uint32 hi,
lo;
if (sscanf(yytext, "%X/%X", &hi, &lo) != 2)
yyerror("invalid streaming start location");
yylval.recptr = ((uint64) hi) << 32 | lo;
return RECPTR;
}

View File

@@ -145,7 +145,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
new_status = (char *) palloc(len + 32 + 1);
memcpy(new_status, old_status, len);
sprintf(new_status + len, " waiting for %X/%X",
XactCommitLSN.xlogid, XactCommitLSN.xrecoff);
(uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN);
set_ps_display(new_status, false);
new_status[len] = '\0'; /* truncate off " waiting ..." */
}
@@ -255,8 +255,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
*/
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
MyProc->waitLSN.xlogid = 0;
MyProc->waitLSN.xrecoff = 0;
MyProc->waitLSN = 0;
if (new_status)
{
@@ -440,12 +439,8 @@ SyncRepReleaseWaiters(void)
LWLockRelease(SyncRepLock);
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
numwrite,
MyWalSnd->write.xlogid,
MyWalSnd->write.xrecoff,
numflush,
MyWalSnd->flush.xlogid,
MyWalSnd->flush.xrecoff);
numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
/*
* If we are managing the highest priority standby, though we weren't
@@ -630,8 +625,7 @@ SyncRepQueueIsOrderedByLSN(int mode)
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
lastLSN.xlogid = 0;
lastLSN.xrecoff = 0;
lastLSN = 0;
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
&(WalSndCtl->SyncRepQueue[mode]),

View File

@@ -516,7 +516,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
}
/* Calculate the start offset of the received logs */
startoff = recptr.xrecoff % XLogSegSize;
startoff = recptr % XLogSegSize;
if (startoff + nbytes > XLogSegSize)
segbytes = XLogSegSize - startoff;
@@ -601,8 +601,8 @@ XLogWalRcvFlush(bool dying)
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
LogstreamResult.Write.xlogid,
LogstreamResult.Write.xrecoff);
(uint32) (LogstreamResult.Write >> 32),
(uint32) LogstreamResult.Write);
set_ps_display(activitymsg, false);
}
@@ -657,9 +657,9 @@ XLogWalRcvSendReply(void)
reply_message.sendTime = now;
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
reply_message.write.xlogid, reply_message.write.xrecoff,
reply_message.flush.xlogid, reply_message.flush.xrecoff,
reply_message.apply.xlogid, reply_message.apply.xrecoff);
(uint32) (reply_message.write >> 32), (uint32) reply_message.write,
(uint32) (reply_message.flush >> 32), (uint32) reply_message.flush,
(uint32) (reply_message.apply >> 32), (uint32) reply_message.apply);
/* Prepend with the message type and send it. */
buf[0] = 'r';

View File

@@ -185,8 +185,8 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
* being created by XLOG streaming, which might cause trouble later on if
* the segment is e.g archived.
*/
if (recptr.xrecoff % XLogSegSize != 0)
recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
if (recptr % XLogSegSize != 0)
recptr -= recptr % XLogSegSize;
SpinLockAcquire(&walrcv->mutex);
@@ -204,8 +204,7 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
* If this is the first startup of walreceiver, we initialize receivedUpto
* and latestChunkStart to receiveStart.
*/
if (walrcv->receiveStart.xlogid == 0 &&
walrcv->receiveStart.xrecoff == 0)
if (walrcv->receiveStart == 0)
{
walrcv->receivedUpto = recptr;
walrcv->latestChunkStart = recptr;

View File

@@ -94,7 +94,7 @@ static uint32 sendOff = 0;
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = {0, 0};
static XLogRecPtr sentPtr = 0;
/*
* Buffer for processing reply messages.
@@ -300,8 +300,7 @@ IdentifySystem(void)
logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X",
logptr.xlogid, logptr.xrecoff);
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
/* Send a RowDescription message */
pq_beginmessage(&buf, 'T');
@@ -613,9 +612,9 @@ ProcessStandbyReplyMessage(void)
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
reply.write.xlogid, reply.write.xrecoff,
reply.flush.xlogid, reply.flush.xrecoff,
reply.apply.xlogid, reply.apply.xrecoff);
(uint32) (reply.write << 32), (uint32) reply.write,
(uint32) (reply.flush << 32), (uint32) reply.flush,
(uint32) (reply.apply << 32), (uint32) reply.apply);
/*
* Update shared state for this WalSender process based on reply data from
@@ -990,7 +989,7 @@ retry:
int segbytes;
int readbytes;
startoff = recptr.xrecoff % XLogSegSize;
startoff = recptr % XLogSegSize;
if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
{
@@ -1156,12 +1155,6 @@ XLogSend(char *msgbuf, bool *caughtup)
startptr = sentPtr;
endptr = startptr;
XLByteAdvance(endptr, MAX_SEND_SIZE);
if (endptr.xlogid != startptr.xlogid)
{
/* Don't cross a logfile boundary within one message */
Assert(endptr.xlogid == startptr.xlogid + 1);
endptr.xrecoff = 0;
}
/* if we went beyond SendRqstPtr, back off */
if (XLByteLE(SendRqstPtr, endptr))
@@ -1172,14 +1165,11 @@ XLogSend(char *msgbuf, bool *caughtup)
else
{
/* round down to page boundary. */
endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
endptr -= (endptr % XLOG_BLCKSZ);
*caughtup = false;
}
if (endptr.xrecoff == 0)
nbytes = 0x100000000L - (uint64) startptr.xrecoff;
else
nbytes = endptr.xrecoff - startptr.xrecoff;
nbytes = endptr - startptr;
Assert(nbytes <= MAX_SEND_SIZE);
/*
@@ -1223,7 +1213,7 @@ XLogSend(char *msgbuf, bool *caughtup)
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
sentPtr.xlogid, sentPtr.xrecoff);
(uint32) (sentPtr >> 32), (uint32) sentPtr);
set_ps_display(activitymsg, false);
}
@@ -1565,25 +1555,25 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
values[1] = CStringGetTextDatum(WalSndGetStateString(state));
snprintf(location, sizeof(location), "%X/%X",
sentPtr.xlogid, sentPtr.xrecoff);
(uint32) (sentPtr >> 32), (uint32) sentPtr);
values[2] = CStringGetTextDatum(location);
if (write.xlogid == 0 && write.xrecoff == 0)
if (write == 0)
nulls[3] = true;
snprintf(location, sizeof(location), "%X/%X",
write.xlogid, write.xrecoff);
(uint32) (write >> 32), (uint32) write);
values[3] = CStringGetTextDatum(location);
if (flush.xlogid == 0 && flush.xrecoff == 0)
if (flush == 0)
nulls[4] = true;
snprintf(location, sizeof(location), "%X/%X",
flush.xlogid, flush.xrecoff);
(uint32) (flush >> 32), (uint32) flush);
values[4] = CStringGetTextDatum(location);
if (apply.xlogid == 0 && apply.xrecoff == 0)
if (apply == 0)
nulls[5] = true;
snprintf(location, sizeof(location), "%X/%X",
apply.xlogid, apply.xrecoff);
(uint32) (apply >> 32), (uint32) apply);
values[5] = CStringGetTextDatum(location);
values[6] = Int32GetDatum(sync_priority[i]);