mirror of
https://github.com/postgres/postgres.git
synced 2025-11-26 23:43:30 +03:00
Efficient transaction-controlled synchronous replication.
If a standby is broadcasting reply messages and we have named one or more standbys in synchronous_standby_names then allow users who set synchronous_replication to wait for commit, which then provides strict data integrity guarantees. Design avoids sending and receiving transaction state information so minimises bookkeeping overheads. We synchronize with the highest priority standby that is connected and ready to synchronize. Other standbys can be defined to takeover in case of standby failure. This version has very strict behaviour; more relaxed options may be added at a later date. Simon Riggs and Fujii Masao, with reviews by Yeb Havinga, Jaime Casanova, Heikki Linnakangas and Robert Haas, plus the assistance of many other design reviewers.
This commit is contained in:
@@ -56,6 +56,7 @@
|
||||
#include "pg_trace.h"
|
||||
#include "pgstat.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "replication/syncrep.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/predicate.h"
|
||||
#include "storage/procarray.h"
|
||||
@@ -1071,6 +1072,14 @@ EndPrepare(GlobalTransaction gxact)
|
||||
|
||||
END_CRIT_SECTION();
|
||||
|
||||
/*
|
||||
* Wait for synchronous replication, if required.
|
||||
*
|
||||
* Note that at this stage we have marked the prepare, but still show as
|
||||
* running in the procarray (twice!) and continue to hold locks.
|
||||
*/
|
||||
SyncRepWaitForLSN(gxact->prepare_lsn);
|
||||
|
||||
records.tail = records.head = NULL;
|
||||
}
|
||||
|
||||
@@ -2030,6 +2039,14 @@ RecordTransactionCommitPrepared(TransactionId xid,
|
||||
MyProc->inCommit = false;
|
||||
|
||||
END_CRIT_SECTION();
|
||||
|
||||
/*
|
||||
* Wait for synchronous replication, if required.
|
||||
*
|
||||
* Note that at this stage we have marked clog, but still show as
|
||||
* running in the procarray and continue to hold locks.
|
||||
*/
|
||||
SyncRepWaitForLSN(recptr);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -2109,4 +2126,12 @@ RecordTransactionAbortPrepared(TransactionId xid,
|
||||
TransactionIdAbortTree(xid, nchildren, children);
|
||||
|
||||
END_CRIT_SECTION();
|
||||
|
||||
/*
|
||||
* Wait for synchronous replication, if required.
|
||||
*
|
||||
* Note that at this stage we have marked clog, but still show as
|
||||
* running in the procarray and continue to hold locks.
|
||||
*/
|
||||
SyncRepWaitForLSN(recptr);
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "replication/syncrep.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/lmgr.h"
|
||||
@@ -1055,7 +1056,7 @@ RecordTransactionCommit(void)
|
||||
* if all to-be-deleted tables are temporary though, since they are lost
|
||||
* anyway if we crash.)
|
||||
*/
|
||||
if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0)
|
||||
if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0 || SyncRepRequested())
|
||||
{
|
||||
/*
|
||||
* Synchronous commit case:
|
||||
@@ -1125,6 +1126,14 @@ RecordTransactionCommit(void)
|
||||
/* Compute latestXid while we have the child XIDs handy */
|
||||
latestXid = TransactionIdLatest(xid, nchildren, children);
|
||||
|
||||
/*
|
||||
* Wait for synchronous replication, if required.
|
||||
*
|
||||
* Note that at this stage we have marked clog, but still show as
|
||||
* running in the procarray and continue to hold locks.
|
||||
*/
|
||||
SyncRepWaitForLSN(XactLastRecEnd);
|
||||
|
||||
/* Reset XactLastRecEnd until the next transaction writes something */
|
||||
XactLastRecEnd.xrecoff = 0;
|
||||
|
||||
|
||||
@@ -520,7 +520,9 @@ CREATE VIEW pg_stat_replication AS
|
||||
W.sent_location,
|
||||
W.write_location,
|
||||
W.flush_location,
|
||||
W.replay_location
|
||||
W.replay_location,
|
||||
W.sync_priority,
|
||||
W.sync_state
|
||||
FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
|
||||
pg_stat_get_wal_senders() AS W
|
||||
WHERE S.usesysid = U.oid AND
|
||||
|
||||
@@ -1526,6 +1526,13 @@ AutoVacWorkerMain(int argc, char *argv[])
|
||||
*/
|
||||
SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
|
||||
|
||||
/*
|
||||
* Force synchronous replication off to allow regular maintenance even
|
||||
* if we are waiting for standbys to connect. This is important to
|
||||
* ensure we aren't blocked from performing anti-wraparound tasks.
|
||||
*/
|
||||
SetConfigOption("synchronous_replication", "off", PGC_SUSET, PGC_S_OVERRIDE);
|
||||
|
||||
/*
|
||||
* Get the info about the database we're going to work on.
|
||||
*/
|
||||
|
||||
@@ -735,6 +735,9 @@ PostmasterMain(int argc, char *argv[])
|
||||
if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL)
|
||||
ereport(ERROR,
|
||||
(errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\"")));
|
||||
if (strlen(SyncRepStandbyNames) > 0 && max_wal_senders == 0)
|
||||
ereport(ERROR,
|
||||
(errmsg("Synchronous replication requires WAL streaming (max_wal_senders > 0)")));
|
||||
|
||||
/*
|
||||
* Other one-time internal sanity checks can go here, if they are fast.
|
||||
|
||||
@@ -13,7 +13,7 @@ top_builddir = ../../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
|
||||
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
|
||||
repl_gram.o
|
||||
repl_gram.o syncrep.o
|
||||
|
||||
include $(top_srcdir)/src/backend/common.mk
|
||||
|
||||
|
||||
@@ -317,13 +317,9 @@ WalReceiverMain(void)
|
||||
while (walrcv_receive(0, &type, &buf, &len))
|
||||
XLogWalRcvProcessMsg(type, buf, len);
|
||||
|
||||
/* Let the master know that we received some data. */
|
||||
XLogWalRcvSendReply();
|
||||
XLogWalRcvSendHSFeedback();
|
||||
|
||||
/*
|
||||
* If we've written some records, flush them to disk and let the
|
||||
* startup process know about them.
|
||||
* startup process and primary server know about them.
|
||||
*/
|
||||
XLogWalRcvFlush(false);
|
||||
}
|
||||
@@ -581,7 +577,10 @@ XLogWalRcvFlush(bool dying)
|
||||
|
||||
/* Also let the master know that we made some progress */
|
||||
if (!dying)
|
||||
{
|
||||
XLogWalRcvSendReply();
|
||||
XLogWalRcvSendHSFeedback();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@
|
||||
WalSndCtlData *WalSndCtl = NULL;
|
||||
|
||||
/* My slot in the shared memory array */
|
||||
static WalSnd *MyWalSnd = NULL;
|
||||
WalSnd *MyWalSnd = NULL;
|
||||
|
||||
/* Global state */
|
||||
bool am_walsender = false; /* Am I a walsender process ? */
|
||||
@@ -174,6 +174,8 @@ WalSenderMain(void)
|
||||
SpinLockRelease(&walsnd->mutex);
|
||||
}
|
||||
|
||||
SyncRepInitConfig();
|
||||
|
||||
/* Main loop of walsender */
|
||||
return WalSndLoop();
|
||||
}
|
||||
@@ -584,6 +586,8 @@ ProcessStandbyReplyMessage(void)
|
||||
walsnd->apply = reply.apply;
|
||||
SpinLockRelease(&walsnd->mutex);
|
||||
}
|
||||
|
||||
SyncRepReleaseWaiters();
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -700,6 +704,7 @@ WalSndLoop(void)
|
||||
{
|
||||
got_SIGHUP = false;
|
||||
ProcessConfigFile(PGC_SIGHUP);
|
||||
SyncRepInitConfig();
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -771,7 +776,12 @@ WalSndLoop(void)
|
||||
* that point might wait for some time.
|
||||
*/
|
||||
if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup)
|
||||
{
|
||||
ereport(DEBUG1,
|
||||
(errmsg("standby \"%s\" has now caught up with primary",
|
||||
application_name)));
|
||||
WalSndSetState(WALSNDSTATE_STREAMING);
|
||||
}
|
||||
|
||||
ProcessRepliesIfAny();
|
||||
}
|
||||
@@ -1238,6 +1248,8 @@ WalSndShmemInit(void)
|
||||
/* First time through, so initialize */
|
||||
MemSet(WalSndCtl, 0, WalSndShmemSize());
|
||||
|
||||
SHMQueueInit(&(WalSndCtl->SyncRepQueue));
|
||||
|
||||
for (i = 0; i < max_wal_senders; i++)
|
||||
{
|
||||
WalSnd *walsnd = &WalSndCtl->walsnds[i];
|
||||
@@ -1304,12 +1316,15 @@ WalSndGetStateString(WalSndState state)
|
||||
Datum
|
||||
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
|
||||
{
|
||||
#define PG_STAT_GET_WAL_SENDERS_COLS 6
|
||||
#define PG_STAT_GET_WAL_SENDERS_COLS 8
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
TupleDesc tupdesc;
|
||||
Tuplestorestate *tupstore;
|
||||
MemoryContext per_query_ctx;
|
||||
MemoryContext oldcontext;
|
||||
int sync_priority[max_wal_senders];
|
||||
int priority = 0;
|
||||
int sync_standby = -1;
|
||||
int i;
|
||||
|
||||
/* check to see if caller supports us returning a tuplestore */
|
||||
@@ -1337,6 +1352,33 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
/*
|
||||
* Get the priorities of sync standbys all in one go, to minimise
|
||||
* lock acquisitions and to allow us to evaluate who is the current
|
||||
* sync standby. This code must match the code in SyncRepReleaseWaiters().
|
||||
*/
|
||||
LWLockAcquire(SyncRepLock, LW_SHARED);
|
||||
for (i = 0; i < max_wal_senders; i++)
|
||||
{
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
|
||||
|
||||
if (walsnd->pid != 0)
|
||||
{
|
||||
sync_priority[i] = walsnd->sync_standby_priority;
|
||||
|
||||
if (walsnd->state == WALSNDSTATE_STREAMING &&
|
||||
walsnd->sync_standby_priority > 0 &&
|
||||
(priority == 0 ||
|
||||
priority > walsnd->sync_standby_priority))
|
||||
{
|
||||
priority = walsnd->sync_standby_priority;
|
||||
sync_standby = i;
|
||||
}
|
||||
}
|
||||
}
|
||||
LWLockRelease(SyncRepLock);
|
||||
|
||||
for (i = 0; i < max_wal_senders; i++)
|
||||
{
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
@@ -1370,11 +1412,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
|
||||
* Only superusers can see details. Other users only get
|
||||
* the pid value to know it's a walsender, but no details.
|
||||
*/
|
||||
nulls[1] = true;
|
||||
nulls[2] = true;
|
||||
nulls[3] = true;
|
||||
nulls[4] = true;
|
||||
nulls[5] = true;
|
||||
MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1401,6 +1439,19 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
|
||||
snprintf(location, sizeof(location), "%X/%X",
|
||||
apply.xlogid, apply.xrecoff);
|
||||
values[5] = CStringGetTextDatum(location);
|
||||
|
||||
values[6] = Int32GetDatum(sync_priority[i]);
|
||||
|
||||
/*
|
||||
* More easily understood version of standby state.
|
||||
* This is purely informational, not different from priority.
|
||||
*/
|
||||
if (sync_priority[i] == 0)
|
||||
values[7] = CStringGetTextDatum("ASYNC");
|
||||
else if (i == sync_standby)
|
||||
values[7] = CStringGetTextDatum("SYNC");
|
||||
else
|
||||
values[7] = CStringGetTextDatum("POTENTIAL");
|
||||
}
|
||||
|
||||
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
|
||||
|
||||
@@ -104,7 +104,6 @@ SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem)
|
||||
* element. Inserting "after" the queue head puts the elem
|
||||
* at the head of the queue.
|
||||
*/
|
||||
#ifdef NOT_USED
|
||||
void
|
||||
SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
|
||||
{
|
||||
@@ -118,7 +117,6 @@ SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
|
||||
queue->next = elem;
|
||||
nextPtr->prev = elem;
|
||||
}
|
||||
#endif /* NOT_USED */
|
||||
|
||||
/*--------------------
|
||||
* SHMQueueNext -- Get the next element from a queue
|
||||
@@ -156,6 +154,25 @@ SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
|
||||
return (Pointer) (((char *) elemPtr) - linkOffset);
|
||||
}
|
||||
|
||||
/*--------------------
|
||||
* SHMQueuePrev -- Get the previous element from a queue
|
||||
*
|
||||
* Same as SHMQueueNext, just starting at tail and moving towards head
|
||||
* All other comments and usage applies.
|
||||
*/
|
||||
Pointer
|
||||
SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
|
||||
{
|
||||
SHM_QUEUE *elemPtr = curElem->prev;
|
||||
|
||||
Assert(ShmemAddrIsValid(curElem));
|
||||
|
||||
if (elemPtr == queue) /* back to the queue head? */
|
||||
return NULL;
|
||||
|
||||
return (Pointer) (((char *) elemPtr) - linkOffset);
|
||||
}
|
||||
|
||||
/*
|
||||
* SHMQueueEmpty -- TRUE if queue head is only element, FALSE otherwise
|
||||
*/
|
||||
|
||||
@@ -39,6 +39,7 @@
|
||||
#include "access/xact.h"
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
#include "replication/syncrep.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/pmsignal.h"
|
||||
@@ -196,6 +197,7 @@ InitProcGlobal(void)
|
||||
PGSemaphoreCreate(&(procs[i].sem));
|
||||
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
|
||||
ProcGlobal->freeProcs = &procs[i];
|
||||
InitSharedLatch(&procs[i].waitLatch);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -214,6 +216,7 @@ InitProcGlobal(void)
|
||||
PGSemaphoreCreate(&(procs[i].sem));
|
||||
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
|
||||
ProcGlobal->autovacFreeProcs = &procs[i];
|
||||
InitSharedLatch(&procs[i].waitLatch);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -224,6 +227,7 @@ InitProcGlobal(void)
|
||||
{
|
||||
AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */
|
||||
PGSemaphoreCreate(&(AuxiliaryProcs[i].sem));
|
||||
InitSharedLatch(&procs[i].waitLatch);
|
||||
}
|
||||
|
||||
/* Create ProcStructLock spinlock, too */
|
||||
@@ -326,6 +330,13 @@ InitProcess(void)
|
||||
SHMQueueInit(&(MyProc->myProcLocks[i]));
|
||||
MyProc->recoveryConflictPending = false;
|
||||
|
||||
/* Initialise for sync rep */
|
||||
MyProc->waitLSN.xlogid = 0;
|
||||
MyProc->waitLSN.xrecoff = 0;
|
||||
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
||||
SHMQueueElemInit(&(MyProc->syncRepLinks));
|
||||
OwnLatch((Latch *) &MyProc->waitLatch);
|
||||
|
||||
/*
|
||||
* We might be reusing a semaphore that belonged to a failed process. So
|
||||
* be careful and reinitialize its value here. (This is not strictly
|
||||
@@ -365,6 +376,7 @@ InitProcessPhase2(void)
|
||||
/*
|
||||
* Arrange to clean that up at backend exit.
|
||||
*/
|
||||
on_shmem_exit(SyncRepCleanupAtProcExit, 0);
|
||||
on_shmem_exit(RemoveProcFromArray, 0);
|
||||
}
|
||||
|
||||
|
||||
@@ -55,6 +55,7 @@
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "postmaster/syslogger.h"
|
||||
#include "postmaster/walwriter.h"
|
||||
#include "replication/syncrep.h"
|
||||
#include "replication/walreceiver.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/bufmgr.h"
|
||||
@@ -753,6 +754,14 @@ static struct config_bool ConfigureNamesBool[] =
|
||||
&XactSyncCommit,
|
||||
true, NULL, NULL
|
||||
},
|
||||
{
|
||||
{"synchronous_replication", PGC_USERSET, WAL_REPLICATION,
|
||||
gettext_noop("Requests synchronous replication."),
|
||||
NULL
|
||||
},
|
||||
&sync_rep_mode,
|
||||
false, NULL, NULL
|
||||
},
|
||||
{
|
||||
{"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS,
|
||||
gettext_noop("Continues processing past damaged page headers."),
|
||||
@@ -2716,6 +2725,16 @@ static struct config_string ConfigureNamesString[] =
|
||||
"pg_stat_tmp", assign_pgstat_temp_directory, NULL
|
||||
},
|
||||
|
||||
{
|
||||
{"synchronous_standby_names", PGC_SIGHUP, WAL_REPLICATION,
|
||||
gettext_noop("List of potential standby names to synchronise with."),
|
||||
NULL,
|
||||
GUC_LIST_INPUT
|
||||
},
|
||||
&SyncRepStandbyNames,
|
||||
"", assign_synchronous_standby_names, NULL
|
||||
},
|
||||
|
||||
{
|
||||
{"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE,
|
||||
gettext_noop("Sets default text search configuration."),
|
||||
|
||||
@@ -184,7 +184,16 @@
|
||||
#archive_timeout = 0 # force a logfile segment switch after this
|
||||
# number of seconds; 0 disables
|
||||
|
||||
# - Streaming Replication -
|
||||
# - Replication - User Settings
|
||||
|
||||
#synchronous_replication = off # does commit wait for reply from standby
|
||||
|
||||
# - Streaming Replication - Server Settings
|
||||
|
||||
#synchronous_standby_names = '' # standby servers that provide sync rep
|
||||
# comma-separated list of application_name from standby(s);
|
||||
# '*' = all
|
||||
|
||||
|
||||
#max_wal_senders = 0 # max number of walsender processes
|
||||
# (change requires restart)
|
||||
|
||||
Reference in New Issue
Block a user