1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-17 06:41:09 +03:00

Allow multiple xacts during table sync in logical replication.

For the initial table data synchronization in logical replication, we use
a single transaction to copy the entire table and then synchronize the
position in the stream with the main apply worker.

There are multiple downsides of this approach: (a) We have to perform the
entire copy operation again if there is any error (network breakdown,
error in the database operation, etc.) while we synchronize the WAL
position between tablesync worker and apply worker; this will be onerous
especially for large copies, (b) Using a single transaction in the
synchronization-phase (where we can receive WAL from multiple
transactions) will have the risk of exceeding the CID limit, (c) The slot
will hold the WAL till the entire sync is complete because we never commit
till the end.

This patch solves all the above downsides by allowing multiple
transactions during the tablesync phase. The initial copy is done in a
single transaction and after that, we commit each transaction as we
receive. To allow recovery after any error or crash, we use a permanent
slot and origin to track the progress. The slot and origin will be removed
once we finish the synchronization of the table. We also remove slot and
origin of tablesync workers if the user performs DROP SUBSCRIPTION .. or
ALTER SUBSCRIPTION .. REFERESH and some of the table syncs are still not
finished.

The commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and
ALTER SUBSCRIPTION ... SET PUBLICATION ... with refresh option as true
cannot be executed inside a transaction block because they can now drop
the slots for which we have no provision to rollback.

This will also open up the path for logical replication of 2PC
transactions on the subscriber side. Previously, we can't do that because
of the requirement of maintaining a single transaction in tablesync
workers.

Bump catalog version due to change of state in the catalog
(pg_subscription_rel).

Author: Peter Smith, Amit Kapila, and Takamichi Osumi
Reviewed-by: Ajin Cherian, Petr Jelinek, Hou Zhijie and Amit Kapila
Discussion: https://postgr.es/m/CAA4eK1KHJxaZS-fod-0fey=0tq3=Gkn4ho=8N4-5HWiCfu0H1A@mail.gmail.com
This commit is contained in:
Amit Kapila
2021-02-12 07:41:51 +05:30
parent 3063eb1759
commit ce0fdbfe97
23 changed files with 779 additions and 338 deletions

View File

@ -31,8 +31,11 @@
* table state to INIT.
* - Tablesync worker starts; changes table state from INIT to DATASYNC while
* copying.
* - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
* waits for state change.
* - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
* worker specific) state to indicate when the copy phase has completed, so
* if the worker crashes with this (non-memory) state then the copy will not
* be re-attempted.
* - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
* - Apply worker periodically checks for tables in SYNCWAIT state. When
* any appear, it sets the table state to CATCHUP and starts loop-waiting
* until either the table state is set to SYNCDONE or the sync worker
@ -48,8 +51,8 @@
* point it sets state to READY and stops tracking. Again, there might
* be zero changes in between.
*
* So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
* CATCHUP -> SYNCDONE -> READY.
* So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
* -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
*
* The catalog pg_subscription_rel is used to keep information about
* subscribed tables and their state. The catalog holds all states
@ -58,6 +61,7 @@
* Example flows look like this:
* - Apply is in front:
* sync:8
* -> set in catalog FINISHEDCOPY
* -> set in memory SYNCWAIT
* apply:10
* -> set in memory CATCHUP
@ -73,6 +77,7 @@
*
* - Sync is in front:
* sync:10
* -> set in catalog FINISHEDCOPY
* -> set in memory SYNCWAIT
* apply:8
* -> set in memory CATCHUP
@ -101,7 +106,10 @@
#include "replication/logicalrelation.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "replication/slot.h"
#include "replication/origin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@ -269,26 +277,52 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
static void
process_syncing_tables_for_sync(XLogRecPtr current_lsn)
{
Assert(IsTransactionState());
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
current_lsn >= MyLogicalRepWorker->relstate_lsn)
{
TimeLineID tli;
char syncslotname[NAMEDATALEN] = {0};
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
MyLogicalRepWorker->relstate_lsn = current_lsn;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
* UpdateSubscriptionRelState must be called within a transaction.
* That transaction will be ended within the finish_sync_worker().
*/
if (!IsTransactionState())
StartTransactionCommand();
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
/* End wal streaming so wrconn can be re-used to drop the slot. */
walrcv_endstreaming(wrconn, &tli);
/*
* Cleanup the tablesync slot.
*
* This has to be done after updating the state because otherwise if
* there is an error while doing the database operations we won't be
* able to rollback dropped slot.
*/
ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
syncslotname);
/*
* It is important to give an error if we are unable to drop the slot,
* otherwise, it won't be dropped till the corresponding subscription
* is dropped. So passing missing_ok = false.
*/
ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
finish_sync_worker();
}
else
@ -403,6 +437,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
if (current_lsn >= rstate->lsn)
{
char originname[NAMEDATALEN];
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
if (!started_tx)
@ -411,6 +447,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
started_tx = true;
}
/*
* Remove the tablesync origin tracking if exists.
*
* The normal case origin drop is done here instead of in the
* process_syncing_tables_for_sync function because we don't
* allow to drop the origin till the process owning the origin
* is alive.
*
* There is a chance that the user is concurrently performing
* refresh for the subscription where we remove the table
* state and its origin and by this time the origin might be
* already removed. So passing missing_ok = true.
*/
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
rstate->relid,
originname);
replorigin_drop_by_name(originname, true, false);
/*
* Update the state to READY only after the origin cleanup.
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
rstate->lsn);
@ -805,6 +862,50 @@ copy_table(Relation rel)
logicalrep_rel_close(relmapentry, NoLock);
}
/*
* Determine the tablesync slot name.
*
* The name must not exceed NAMEDATALEN - 1 because of remote node constraints
* on slot name length. We append system_identifier to avoid slot_name
* collision with subscriptions in other clusters. With the current scheme
* pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
* length of slot_name will be 50.
*
* The returned slot name is either:
* - stored in the supplied buffer (syncslotname), or
* - palloc'ed in current memory context (if syncslotname = NULL).
*
* Note: We don't use the subscription slot name as part of tablesync slot name
* because we are responsible for cleaning up these slots and it could become
* impossible to recalculate what name to cleanup if the subscription slot name
* had changed.
*/
char *
ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
char syncslotname[NAMEDATALEN])
{
if (syncslotname)
sprintf(syncslotname, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid,
GetSystemIdentifier());
else
syncslotname = psprintf("pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid,
GetSystemIdentifier());
return syncslotname;
}
/*
* Form the origin name for tablesync.
*
* Return the name in the supplied buffer.
*/
void
ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
char originname[NAMEDATALEN])
{
snprintf(originname, NAMEDATALEN, "pg_%u_%u", suboid, relid);
}
/*
* Start syncing the table in the sync worker.
*
@ -822,6 +923,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
XLogRecPtr relstate_lsn;
Relation rel;
WalRcvExecResult *res;
char originname[NAMEDATALEN];
RepOriginId originid;
/* Check the state of the table synchronization. */
StartTransactionCommand();
@ -847,19 +950,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
finish_sync_worker(); /* doesn't return */
}
/*
* To build a slot name for the sync work, we are limited to NAMEDATALEN -
* 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
* and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
* NAMEDATALEN on the remote that matters, but this scheme will also work
* reasonably if that is different.)
*/
StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
slotname = psprintf("%.*s_%u_sync_%u",
NAMEDATALEN - 28,
MySubscription->slotname,
MySubscription->oid,
MyLogicalRepWorker->relid);
/* Calculate the name of the tablesync slot. */
slotname = ReplicationSlotNameForTablesync(MySubscription->oid,
MyLogicalRepWorker->relid,
NULL /* use palloc */ );
/*
* Here we use the slot name instead of the subscription name as the
@ -872,7 +966,50 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
(errmsg("could not connect to the publisher: %s", err)));
Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
/* Assign the origin tracking record name. */
ReplicationOriginNameForTablesync(MySubscription->oid,
MyLogicalRepWorker->relid,
originname);
if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
{
/*
* We have previously errored out before finishing the copy so the
* replication slot might exist. We want to remove the slot if it
* already exists and proceed.
*
* XXX We could also instead try to drop the slot, last time we failed
* but for that, we might need to clean up the copy state as it might
* be in the middle of fetching the rows. Also, if there is a network
* breakdown then it wouldn't have succeeded so trying it next time
* seems like a better bet.
*/
ReplicationSlotDropAtPubNode(wrconn, slotname, true);
}
else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
{
/*
* The COPY phase was previously done, but tablesync then crashed
* before it was able to finish normally.
*/
StartTransactionCommand();
/*
* The origin tracking name must already exist. It was created first
* time this tablesync was launched.
*/
originid = replorigin_by_name(originname, false);
replorigin_session_setup(originid);
replorigin_session_origin = originid;
*origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
goto copy_table_done;
}
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@ -888,9 +1025,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
CommitTransactionCommand();
pgstat_report_stat(false);
/*
* We want to do the table data sync in a single transaction.
*/
StartTransactionCommand();
/*
@ -916,13 +1050,46 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
walrcv_clear_result(res);
/*
* Create a new temporary logical decoding slot. This slot will be used
* Create a new permanent logical decoding slot. This slot will be used
* for the catchup phase after COPY is done, so tell it to use the
* snapshot to make the final data consistent.
*/
walrcv_create_slot(wrconn, slotname, true,
walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
CRS_USE_SNAPSHOT, origin_startpos);
/*
* Setup replication origin tracking. The purpose of doing this before the
* copy is to avoid doing the copy again due to any error in setting up
* origin tracking.
*/
originid = replorigin_by_name(originname, true);
if (!OidIsValid(originid))
{
/*
* Origin tracking does not exist, so create it now.
*
* Then advance to the LSN got from walrcv_create_slot. This is WAL
* logged for the purpose of recovery. Locks are to prevent the
* replication origin from vanishing while advancing.
*/
originid = replorigin_create(originname);
LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
true /* go backward */ , true /* WAL log */ );
UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
replorigin_session_setup(originid);
replorigin_session_origin = originid;
}
else
{
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("replication origin \"%s\" already exists",
originname)));
}
/* Now do the initial data copy */
PushActiveSnapshot(GetTransactionSnapshot());
copy_table(rel);
@ -940,6 +1107,25 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/* Make the copy visible. */
CommandCounterIncrement();
/*
* Update the persisted state to indicate the COPY phase is done; make it
* visible to others.
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
SUBREL_STATE_FINISHEDCOPY,
MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
copy_table_done:
elog(DEBUG1,
"LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
originname,
(uint32) (*origin_startpos >> 32),
(uint32) *origin_startpos);
/*
* We are done with the initial data synchronization, update the state.
*/