mirror of
https://github.com/postgres/postgres.git
synced 2025-11-10 17:42:29 +03:00
Logical replication support for initial data copy
Add functionality for a new subscription to copy the initial data in the tables and then sync with the ongoing apply process. For the copying, add a new internal COPY option to have the COPY source data provided by a callback function. The initial data copy works on the subscriber by receiving COPY data from the publisher and then providing it locally into a COPY that writes to the destination table. A WAL receiver can now execute full SQL commands. This is used here to obtain information about tables and publications. Several new options were added to CREATE and ALTER SUBSCRIPTION to control whether and when initial table syncing happens. Change pg_dump option --no-create-subscription-slots to --no-subscription-connect and use the new CREATE SUBSCRIPTION ... NOCONNECT option for that. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com> Tested-by: Erik Rijkers <er@xs4all.nl>
This commit is contained in:
@@ -22,14 +22,16 @@
|
||||
#include "libpq-fe.h"
|
||||
#include "pqexpbuffer.h"
|
||||
#include "access/xlog.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "funcapi.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "replication/logicalproto.h"
|
||||
#include "replication/walreceiver.h"
|
||||
#include "storage/proc.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/tuplestore.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
@@ -68,10 +70,12 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
|
||||
static char *libpqrcv_create_slot(WalReceiverConn *conn,
|
||||
const char *slotname,
|
||||
bool temporary,
|
||||
bool export_snapshot,
|
||||
CRSSnapshotAction snapshot_action,
|
||||
XLogRecPtr *lsn);
|
||||
static bool libpqrcv_command(WalReceiverConn *conn,
|
||||
const char *cmd, char **err);
|
||||
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
|
||||
const char *query,
|
||||
const int nRetTypes,
|
||||
const Oid *retTypes);
|
||||
static void libpqrcv_disconnect(WalReceiverConn *conn);
|
||||
|
||||
static WalReceiverFunctionsType PQWalReceiverFunctions = {
|
||||
@@ -85,7 +89,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
|
||||
libpqrcv_receive,
|
||||
libpqrcv_send,
|
||||
libpqrcv_create_slot,
|
||||
libpqrcv_command,
|
||||
libpqrcv_exec,
|
||||
libpqrcv_disconnect
|
||||
};
|
||||
|
||||
@@ -431,10 +435,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
|
||||
* next timeline's ID, or just CommandComplete if the server was shut
|
||||
* down.
|
||||
*
|
||||
* If we had not yet received CopyDone from the backend, PGRES_COPY_IN
|
||||
* would also be possible. However, at the moment this function is only
|
||||
* called after receiving CopyDone from the backend - the walreceiver
|
||||
* never terminates replication on its own initiative.
|
||||
* If we had not yet received CopyDone from the backend, PGRES_COPY_OUT
|
||||
* is also possible in case we aborted the copy in mid-stream.
|
||||
*/
|
||||
res = PQgetResult(conn->streamConn);
|
||||
if (PQresultStatus(res) == PGRES_TUPLES_OK)
|
||||
@@ -531,7 +533,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
|
||||
* Windows.
|
||||
*
|
||||
* The function is modeled on PQexec() in libpq, but only implements
|
||||
* those parts that are in use in the walreceiver.
|
||||
* those parts that are in use in the walreceiver api.
|
||||
*
|
||||
* Queries are always executed on the connection in streamConn.
|
||||
*/
|
||||
@@ -543,8 +545,9 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
|
||||
|
||||
/*
|
||||
* PQexec() silently discards any prior query results on the connection.
|
||||
* This is not required for walreceiver since it's expected that walsender
|
||||
* won't generate any such junk results.
|
||||
* This is not required for this function as it's expected that the
|
||||
* caller (which is this library in all cases) will behave correctly and
|
||||
* we don't have to be backwards compatible with old libpq.
|
||||
*/
|
||||
|
||||
/*
|
||||
@@ -593,8 +596,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
|
||||
|
||||
/*
|
||||
* Emulate the PQexec()'s behavior of returning the last result when
|
||||
* there are many. Since walsender will never generate multiple
|
||||
* results, we skip the concatenation of error messages.
|
||||
* there are many. We are fine with returning just last error message.
|
||||
*/
|
||||
result = PQgetResult(streamConn);
|
||||
if (result == NULL)
|
||||
@@ -675,8 +677,19 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
|
||||
PGresult *res;
|
||||
|
||||
res = PQgetResult(conn->streamConn);
|
||||
if (PQresultStatus(res) == PGRES_COMMAND_OK ||
|
||||
PQresultStatus(res) == PGRES_COPY_IN)
|
||||
if (PQresultStatus(res) == PGRES_COMMAND_OK)
|
||||
{
|
||||
PQclear(res);
|
||||
|
||||
/* Verify that there are no more results */
|
||||
res = PQgetResult(conn->streamConn);
|
||||
if (res != NULL)
|
||||
ereport(ERROR,
|
||||
(errmsg("unexpected result after CommandComplete: %s",
|
||||
PQerrorMessage(conn->streamConn))));
|
||||
return -1;
|
||||
}
|
||||
else if (PQresultStatus(res) == PGRES_COPY_IN)
|
||||
{
|
||||
PQclear(res);
|
||||
return -1;
|
||||
@@ -721,7 +734,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
|
||||
*/
|
||||
static char *
|
||||
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
||||
bool temporary, bool export_snapshot, XLogRecPtr *lsn)
|
||||
bool temporary, CRSSnapshotAction snapshot_action,
|
||||
XLogRecPtr *lsn)
|
||||
{
|
||||
PGresult *res;
|
||||
StringInfoData cmd;
|
||||
@@ -737,10 +751,18 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
||||
if (conn->logical)
|
||||
{
|
||||
appendStringInfo(&cmd, " LOGICAL pgoutput");
|
||||
if (export_snapshot)
|
||||
appendStringInfo(&cmd, " EXPORT_SNAPSHOT");
|
||||
else
|
||||
appendStringInfo(&cmd, " NOEXPORT_SNAPSHOT");
|
||||
switch (snapshot_action)
|
||||
{
|
||||
case CRS_EXPORT_SNAPSHOT:
|
||||
appendStringInfo(&cmd, " EXPORT_SNAPSHOT");
|
||||
break;
|
||||
case CRS_NOEXPORT_SNAPSHOT:
|
||||
appendStringInfo(&cmd, " NOEXPORT_SNAPSHOT");
|
||||
break;
|
||||
case CRS_USE_SNAPSHOT:
|
||||
appendStringInfo(&cmd, " USE_SNAPSHOT");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
|
||||
@@ -767,28 +789,139 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
||||
}
|
||||
|
||||
/*
|
||||
* Run command.
|
||||
*
|
||||
* Returns if the command has succeeded and fills the err with palloced
|
||||
* error message if not.
|
||||
* Convert tuple query result to tuplestore.
|
||||
*/
|
||||
static bool
|
||||
libpqrcv_command(WalReceiverConn *conn, const char *cmd, char **err)
|
||||
static void
|
||||
libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
|
||||
const int nRetTypes, const Oid *retTypes)
|
||||
{
|
||||
PGresult *res;
|
||||
int tupn;
|
||||
int coln;
|
||||
int nfields = PQnfields(pgres);
|
||||
HeapTuple tuple;
|
||||
AttInMetadata *attinmeta;
|
||||
MemoryContext rowcontext;
|
||||
MemoryContext oldcontext;
|
||||
|
||||
res = libpqrcv_PQexec(conn->streamConn, cmd);
|
||||
/* No point in doing anything here if there were no tuples returned. */
|
||||
if (PQntuples(pgres) == 0)
|
||||
return;
|
||||
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
/* Make sure we got expected number of fields. */
|
||||
if (nfields != nRetTypes)
|
||||
ereport(ERROR,
|
||||
(errmsg("invalid query responser"),
|
||||
errdetail("Expected %d fields, got %d fields.",
|
||||
nRetTypes, nfields)));
|
||||
|
||||
|
||||
walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
|
||||
|
||||
/* Create tuple descriptor corresponding to expected result. */
|
||||
walres->tupledesc = CreateTemplateTupleDesc(nRetTypes, false);
|
||||
for (coln = 0; coln < nRetTypes; coln++)
|
||||
TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
|
||||
PQfname(pgres, coln), retTypes[coln], -1, 0);
|
||||
attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
|
||||
|
||||
/* Create temporary context for local allocations. */
|
||||
rowcontext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"libpqrcv query result context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
/* Process returned rows. */
|
||||
for (tupn = 0; tupn < PQntuples(pgres); tupn++)
|
||||
{
|
||||
PQclear(res);
|
||||
*err = pchomp(PQerrorMessage(conn->streamConn));
|
||||
return false;
|
||||
char *cstrs[MaxTupleAttributeNumber];
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Do the allocations in temporary context. */
|
||||
oldcontext = MemoryContextSwitchTo(rowcontext);
|
||||
|
||||
/*
|
||||
* Fill cstrs with null-terminated strings of column values.
|
||||
*/
|
||||
for (coln = 0; coln < nfields; coln++)
|
||||
{
|
||||
if (PQgetisnull(pgres, tupn, coln))
|
||||
cstrs[coln] = NULL;
|
||||
else
|
||||
cstrs[coln] = PQgetvalue(pgres, tupn, coln);
|
||||
}
|
||||
|
||||
/* Convert row to a tuple, and add it to the tuplestore */
|
||||
tuple = BuildTupleFromCStrings(attinmeta, cstrs);
|
||||
tuplestore_puttuple(walres->tuplestore, tuple);
|
||||
|
||||
/* Clean up */
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
MemoryContextReset(rowcontext);
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
MemoryContextDelete(rowcontext);
|
||||
}
|
||||
|
||||
return true;
|
||||
/*
|
||||
* Public interface for sending generic queries (and commands).
|
||||
*
|
||||
* This can only be called from process connected to database.
|
||||
*/
|
||||
static WalRcvExecResult *
|
||||
libpqrcv_exec(WalReceiverConn *conn, const char *query,
|
||||
const int nRetTypes, const Oid *retTypes)
|
||||
{
|
||||
PGresult *pgres = NULL;
|
||||
WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
|
||||
|
||||
if (MyDatabaseId == InvalidOid)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("the query interface requires a database connection")));
|
||||
|
||||
pgres = libpqrcv_PQexec(conn->streamConn, query);
|
||||
|
||||
switch (PQresultStatus(pgres))
|
||||
{
|
||||
case PGRES_SINGLE_TUPLE:
|
||||
case PGRES_TUPLES_OK:
|
||||
walres->status = WALRCV_OK_TUPLES;
|
||||
libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
|
||||
break;
|
||||
|
||||
case PGRES_COPY_IN:
|
||||
walres->status = WALRCV_OK_COPY_IN;
|
||||
break;
|
||||
|
||||
case PGRES_COPY_OUT:
|
||||
walres->status = WALRCV_OK_COPY_OUT;
|
||||
break;
|
||||
|
||||
case PGRES_COPY_BOTH:
|
||||
walres->status = WALRCV_OK_COPY_BOTH;
|
||||
break;
|
||||
|
||||
case PGRES_COMMAND_OK:
|
||||
walres->status = WALRCV_OK_COMMAND;
|
||||
break;
|
||||
|
||||
/* Empty query is considered error. */
|
||||
case PGRES_EMPTY_QUERY:
|
||||
walres->status = WALRCV_ERROR;
|
||||
walres->err = _("empty query");
|
||||
break;
|
||||
|
||||
case PGRES_NONFATAL_ERROR:
|
||||
case PGRES_FATAL_ERROR:
|
||||
case PGRES_BAD_RESPONSE:
|
||||
walres->status = WALRCV_ERROR;
|
||||
walres->err = pchomp(PQerrorMessage(conn->streamConn));
|
||||
break;
|
||||
}
|
||||
|
||||
PQclear(pgres);
|
||||
|
||||
return walres;
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -15,6 +15,6 @@ include $(top_builddir)/src/Makefile.global
|
||||
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
|
||||
|
||||
OBJS = decode.o launcher.o logical.o logicalfuncs.o message.o origin.o \
|
||||
proto.o relation.o reorderbuffer.o snapbuild.o worker.o
|
||||
proto.o relation.o reorderbuffer.o snapbuild.o tablesync.o worker.o
|
||||
|
||||
include $(top_srcdir)/src/backend/common.mk
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "access/xact.h"
|
||||
|
||||
#include "catalog/pg_subscription.h"
|
||||
#include "catalog/pg_subscription_rel.h"
|
||||
|
||||
#include "libpq/pqsignal.h"
|
||||
|
||||
@@ -56,6 +57,8 @@
|
||||
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
|
||||
|
||||
int max_logical_replication_workers = 4;
|
||||
int max_sync_workers_per_subscription = 2;
|
||||
|
||||
LogicalRepWorker *MyLogicalRepWorker = NULL;
|
||||
|
||||
typedef struct LogicalRepCtxStruct
|
||||
@@ -198,20 +201,22 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
|
||||
|
||||
/*
|
||||
* Walks the workers array and searches for one that matches given
|
||||
* subscription id.
|
||||
* subscription id and relid.
|
||||
*/
|
||||
LogicalRepWorker *
|
||||
logicalrep_worker_find(Oid subid)
|
||||
logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
|
||||
{
|
||||
int i;
|
||||
LogicalRepWorker *res = NULL;
|
||||
|
||||
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
|
||||
|
||||
/* Search for attached worker for a given subscription id. */
|
||||
for (i = 0; i < max_logical_replication_workers; i++)
|
||||
{
|
||||
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
|
||||
if (w->subid == subid && w->proc && IsBackendPid(w->proc->pid))
|
||||
if (w->subid == subid && w->relid == relid &&
|
||||
(!only_running || (w->proc && IsBackendPid(w->proc->pid))))
|
||||
{
|
||||
res = w;
|
||||
break;
|
||||
@@ -225,7 +230,8 @@ logicalrep_worker_find(Oid subid)
|
||||
* Start new apply background worker.
|
||||
*/
|
||||
void
|
||||
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
|
||||
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
|
||||
Oid relid)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
BackgroundWorkerHandle *bgw_handle;
|
||||
@@ -270,10 +276,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
|
||||
}
|
||||
|
||||
/* Prepare the worker info. */
|
||||
memset(worker, 0, sizeof(LogicalRepWorker));
|
||||
worker->proc = NULL;
|
||||
worker->dbid = dbid;
|
||||
worker->userid = userid;
|
||||
worker->subid = subid;
|
||||
worker->relid = relid;
|
||||
worker->relstate = SUBREL_STATE_UNKNOWN;
|
||||
worker->relstate_lsn = InvalidXLogRecPtr;
|
||||
worker->last_lsn = InvalidXLogRecPtr;
|
||||
TIMESTAMP_NOBEGIN(worker->last_send_time);
|
||||
TIMESTAMP_NOBEGIN(worker->last_recv_time);
|
||||
worker->reply_lsn = InvalidXLogRecPtr;
|
||||
TIMESTAMP_NOBEGIN(worker->reply_time);
|
||||
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
|
||||
@@ -282,8 +296,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
|
||||
BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
bgw.bgw_main = ApplyWorkerMain;
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN,
|
||||
"logical replication worker for subscription %u", subid);
|
||||
if (OidIsValid(relid))
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN,
|
||||
"logical replication worker for subscription %u sync %u", subid, relid);
|
||||
else
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN,
|
||||
"logical replication worker for subscription %u", subid);
|
||||
|
||||
bgw.bgw_restart_time = BGW_NEVER_RESTART;
|
||||
bgw.bgw_notify_pid = MyProcPid;
|
||||
@@ -307,13 +325,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
|
||||
* slot.
|
||||
*/
|
||||
void
|
||||
logicalrep_worker_stop(Oid subid)
|
||||
logicalrep_worker_stop(Oid subid, Oid relid)
|
||||
{
|
||||
LogicalRepWorker *worker;
|
||||
|
||||
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
||||
|
||||
worker = logicalrep_worker_find(subid);
|
||||
worker = logicalrep_worker_find(subid, relid, false);
|
||||
|
||||
/* No worker, nothing to do. */
|
||||
if (!worker)
|
||||
@@ -395,6 +413,31 @@ logicalrep_worker_stop(Oid subid)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Wake up (using latch) the logical replication worker.
|
||||
*/
|
||||
void
|
||||
logicalrep_worker_wakeup(Oid subid, Oid relid)
|
||||
{
|
||||
LogicalRepWorker *worker;
|
||||
|
||||
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
||||
worker = logicalrep_worker_find(subid, relid, true);
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
|
||||
if (worker)
|
||||
logicalrep_worker_wakeup_ptr(worker);
|
||||
}
|
||||
|
||||
/*
|
||||
* Wake up (using latch) the logical replication worker.
|
||||
*/
|
||||
void
|
||||
logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
|
||||
{
|
||||
SetLatch(&worker->proc->procLatch);
|
||||
}
|
||||
|
||||
/*
|
||||
* Attach to a slot.
|
||||
*/
|
||||
@@ -457,6 +500,29 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
|
||||
SetLatch(MyLatch);
|
||||
}
|
||||
|
||||
/*
|
||||
* Count the number of registered (not necessarily running) sync workers
|
||||
* for a subscription.
|
||||
*/
|
||||
int
|
||||
logicalrep_sync_worker_count(Oid subid)
|
||||
{
|
||||
int i;
|
||||
int res = 0;
|
||||
|
||||
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
|
||||
|
||||
/* Search for attached worker for a given subscription id. */
|
||||
for (i = 0; i < max_logical_replication_workers; i++)
|
||||
{
|
||||
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
|
||||
if (w->subid == subid && OidIsValid(w->relid))
|
||||
res++;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/*
|
||||
* ApplyLauncherShmemSize
|
||||
* Compute space needed for replication launcher shared memory
|
||||
@@ -512,7 +578,20 @@ ApplyLauncherShmemInit(void)
|
||||
&found);
|
||||
|
||||
if (!found)
|
||||
{
|
||||
int slot;
|
||||
|
||||
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
|
||||
|
||||
/* Initialize memory and spin locks for each worker slot. */
|
||||
for (slot = 0; slot < max_logical_replication_workers; slot++)
|
||||
{
|
||||
LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
|
||||
|
||||
memset(worker, 0, sizeof(LogicalRepWorker));
|
||||
SpinLockInit(&worker->relmutex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -607,12 +686,13 @@ ApplyLauncherMain(Datum main_arg)
|
||||
LogicalRepWorker *w;
|
||||
|
||||
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
||||
w = logicalrep_worker_find(sub->oid);
|
||||
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
|
||||
if (sub->enabled && w == NULL)
|
||||
{
|
||||
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner);
|
||||
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
|
||||
sub->owner, InvalidOid);
|
||||
last_start_time = now;
|
||||
wait_time = wal_retrieve_retry_interval;
|
||||
/* Limit to one worker per mainloop cycle. */
|
||||
@@ -664,7 +744,7 @@ ApplyLauncherMain(Datum main_arg)
|
||||
Datum
|
||||
pg_stat_get_subscription(PG_FUNCTION_ARGS)
|
||||
{
|
||||
#define PG_STAT_GET_SUBSCRIPTION_COLS 7
|
||||
#define PG_STAT_GET_SUBSCRIPTION_COLS 8
|
||||
Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
|
||||
int i;
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
@@ -723,27 +803,31 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
|
||||
MemSet(nulls, 0, sizeof(nulls));
|
||||
|
||||
values[0] = ObjectIdGetDatum(worker.subid);
|
||||
values[1] = Int32GetDatum(worker_pid);
|
||||
if (XLogRecPtrIsInvalid(worker.last_lsn))
|
||||
nulls[2] = true;
|
||||
if (OidIsValid(worker.relid))
|
||||
values[1] = ObjectIdGetDatum(worker.relid);
|
||||
else
|
||||
values[2] = LSNGetDatum(worker.last_lsn);
|
||||
if (worker.last_send_time == 0)
|
||||
nulls[1] = true;
|
||||
values[2] = Int32GetDatum(worker_pid);
|
||||
if (XLogRecPtrIsInvalid(worker.last_lsn))
|
||||
nulls[3] = true;
|
||||
else
|
||||
values[3] = TimestampTzGetDatum(worker.last_send_time);
|
||||
if (worker.last_recv_time == 0)
|
||||
values[3] = LSNGetDatum(worker.last_lsn);
|
||||
if (worker.last_send_time == 0)
|
||||
nulls[4] = true;
|
||||
else
|
||||
values[4] = TimestampTzGetDatum(worker.last_recv_time);
|
||||
if (XLogRecPtrIsInvalid(worker.reply_lsn))
|
||||
values[4] = TimestampTzGetDatum(worker.last_send_time);
|
||||
if (worker.last_recv_time == 0)
|
||||
nulls[5] = true;
|
||||
else
|
||||
values[5] = LSNGetDatum(worker.reply_lsn);
|
||||
if (worker.reply_time == 0)
|
||||
values[5] = TimestampTzGetDatum(worker.last_recv_time);
|
||||
if (XLogRecPtrIsInvalid(worker.reply_lsn))
|
||||
nulls[6] = true;
|
||||
else
|
||||
values[6] = TimestampTzGetDatum(worker.reply_time);
|
||||
values[6] = LSNGetDatum(worker.reply_lsn);
|
||||
if (worker.reply_time == 0)
|
||||
nulls[7] = true;
|
||||
else
|
||||
values[7] = TimestampTzGetDatum(worker.reply_time);
|
||||
|
||||
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "access/heapam.h"
|
||||
#include "access/sysattr.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_subscription_rel.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "replication/logicalrelation.h"
|
||||
#include "replication/worker_internal.h"
|
||||
@@ -357,6 +358,12 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
|
||||
else
|
||||
entry->localrel = heap_open(entry->localreloid, lockmode);
|
||||
|
||||
if (entry->state != SUBREL_STATE_READY)
|
||||
entry->state = GetSubscriptionRelState(MySubscription->oid,
|
||||
entry->localreloid,
|
||||
&entry->statelsn,
|
||||
true);
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
|
||||
@@ -499,51 +499,32 @@ SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
|
||||
}
|
||||
|
||||
/*
|
||||
* Export a snapshot so it can be set in another session with SET TRANSACTION
|
||||
* SNAPSHOT.
|
||||
* Build the initial slot snapshot and convert it to normal snapshot that
|
||||
* is understood by HeapTupleSatisfiesMVCC.
|
||||
*
|
||||
* For that we need to start a transaction in the current backend as the
|
||||
* importing side checks whether the source transaction is still open to make
|
||||
* sure the xmin horizon hasn't advanced since then.
|
||||
*
|
||||
* After that we convert a locally built snapshot into the normal variant
|
||||
* understood by HeapTupleSatisfiesMVCC et al.
|
||||
* The snapshot will be usable directly in current transaction or exported
|
||||
* for loading in different transaction.
|
||||
*/
|
||||
const char *
|
||||
SnapBuildExportSnapshot(SnapBuild *builder)
|
||||
Snapshot
|
||||
SnapBuildInitalSnapshot(SnapBuild *builder)
|
||||
{
|
||||
Snapshot snap;
|
||||
char *snapname;
|
||||
TransactionId xid;
|
||||
TransactionId *newxip;
|
||||
int newxcnt = 0;
|
||||
|
||||
Assert(!FirstSnapshotSet);
|
||||
Assert(XactIsoLevel = XACT_REPEATABLE_READ);
|
||||
|
||||
if (builder->state != SNAPBUILD_CONSISTENT)
|
||||
elog(ERROR, "cannot export a snapshot before reaching a consistent state");
|
||||
elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
|
||||
|
||||
if (!builder->committed.includes_all_transactions)
|
||||
elog(ERROR, "cannot export a snapshot, not all transactions are monitored anymore");
|
||||
elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
|
||||
|
||||
/* so we don't overwrite the existing value */
|
||||
if (TransactionIdIsValid(MyPgXact->xmin))
|
||||
elog(ERROR, "cannot export a snapshot when MyPgXact->xmin already is valid");
|
||||
|
||||
if (IsTransactionOrTransactionBlock())
|
||||
elog(ERROR, "cannot export a snapshot from within a transaction");
|
||||
|
||||
if (SavedResourceOwnerDuringExport)
|
||||
elog(ERROR, "can only export one snapshot at a time");
|
||||
|
||||
SavedResourceOwnerDuringExport = CurrentResourceOwner;
|
||||
ExportInProgress = true;
|
||||
|
||||
StartTransactionCommand();
|
||||
|
||||
Assert(!FirstSnapshotSet);
|
||||
|
||||
/* There doesn't seem to a nice API to set these */
|
||||
XactIsoLevel = XACT_REPEATABLE_READ;
|
||||
XactReadOnly = true;
|
||||
elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
|
||||
|
||||
snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
|
||||
|
||||
@@ -578,7 +559,9 @@ SnapBuildExportSnapshot(SnapBuild *builder)
|
||||
if (test == NULL)
|
||||
{
|
||||
if (newxcnt >= GetMaxSnapshotXidCount())
|
||||
elog(ERROR, "snapshot too large");
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("initial slot snapshot too large")));
|
||||
|
||||
newxip[newxcnt++] = xid;
|
||||
}
|
||||
@@ -589,9 +572,43 @@ SnapBuildExportSnapshot(SnapBuild *builder)
|
||||
snap->xcnt = newxcnt;
|
||||
snap->xip = newxip;
|
||||
|
||||
return snap;
|
||||
}
|
||||
|
||||
/*
|
||||
* Export a snapshot so it can be set in another session with SET TRANSACTION
|
||||
* SNAPSHOT.
|
||||
*
|
||||
* For that we need to start a transaction in the current backend as the
|
||||
* importing side checks whether the source transaction is still open to make
|
||||
* sure the xmin horizon hasn't advanced since then.
|
||||
*/
|
||||
const char *
|
||||
SnapBuildExportSnapshot(SnapBuild *builder)
|
||||
{
|
||||
Snapshot snap;
|
||||
char *snapname;
|
||||
|
||||
if (IsTransactionOrTransactionBlock())
|
||||
elog(ERROR, "cannot export a snapshot from within a transaction");
|
||||
|
||||
if (SavedResourceOwnerDuringExport)
|
||||
elog(ERROR, "can only export one snapshot at a time");
|
||||
|
||||
SavedResourceOwnerDuringExport = CurrentResourceOwner;
|
||||
ExportInProgress = true;
|
||||
|
||||
StartTransactionCommand();
|
||||
|
||||
/* There doesn't seem to a nice API to set these */
|
||||
XactIsoLevel = XACT_REPEATABLE_READ;
|
||||
XactReadOnly = true;
|
||||
|
||||
snap = SnapBuildInitalSnapshot(builder);
|
||||
|
||||
/*
|
||||
* now that we've built a plain snapshot, use the normal mechanisms for
|
||||
* exporting it
|
||||
* now that we've built a plain snapshot, make it active and use the
|
||||
* normal mechanisms for exporting it
|
||||
*/
|
||||
snapname = ExportSnapshot(snap);
|
||||
|
||||
|
||||
840
src/backend/replication/logical/tablesync.c
Normal file
840
src/backend/replication/logical/tablesync.c
Normal file
@@ -0,0 +1,840 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
* tablesync.c
|
||||
* PostgreSQL logical replication
|
||||
*
|
||||
* Copyright (c) 2012-2016, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/replication/logical/tablesync.c
|
||||
*
|
||||
* NOTES
|
||||
* This file contains code for initial table data synchronization for
|
||||
* logical replication.
|
||||
*
|
||||
* The initial data synchronization is done separately for each table,
|
||||
* in separate apply worker that only fetches the initial snapshot data
|
||||
* from the publisher and then synchronizes the position in stream with
|
||||
* the main apply worker.
|
||||
*
|
||||
* The are several reasons for doing the synchronization this way:
|
||||
* - It allows us to parallelize the initial data synchronization
|
||||
* which lowers the time needed for it to happen.
|
||||
* - The initial synchronization does not have to hold the xid and LSN
|
||||
* for the time it takes to copy data of all tables, causing less
|
||||
* bloat and lower disk consumption compared to doing the
|
||||
* synchronization in single process for whole database.
|
||||
* - It allows us to synchronize the tables added after the initial
|
||||
* synchronization has finished.
|
||||
*
|
||||
* The stream position synchronization works in multiple steps.
|
||||
* - Sync finishes copy and sets table state as SYNCWAIT and waits
|
||||
* for state to change in a loop.
|
||||
* - Apply periodically checks tables that are synchronizing for SYNCWAIT.
|
||||
* When the desired state appears it will compare its position in the
|
||||
* stream with the SYNCWAIT position and based on that changes the
|
||||
* state to based on following rules:
|
||||
* - if the apply is in front of the sync in the wal stream the new
|
||||
* state is set to CATCHUP and apply loops until the sync process
|
||||
* catches up to the same LSN as apply
|
||||
* - if the sync is in front of the apply in the wal stream the new
|
||||
* state is set to SYNCDONE
|
||||
* - if both apply and sync are at the same position in the wal stream
|
||||
* the state of the table is set to READY
|
||||
* - If the state was set to CATCHUP sync will read the stream and
|
||||
* apply changes until it catches up to the specified stream
|
||||
* position and then sets state to READY and signals apply that it
|
||||
* can stop waiting and exits, if the state was set to something
|
||||
* else than CATCHUP the sync process will simply end.
|
||||
* - If the state was set to SYNCDONE by apply, the apply will
|
||||
* continue tracking the table until it reaches the SYNCDONE stream
|
||||
* position at which point it sets state to READY and stops tracking.
|
||||
*
|
||||
* The catalog pg_subscription_rel is used to keep information about
|
||||
* subscribed tables and their state and some transient state during
|
||||
* data synchronization is kept in shared memory.
|
||||
*
|
||||
* Example flows look like this:
|
||||
* - Apply is in front:
|
||||
* sync:8
|
||||
* -> set SYNCWAIT
|
||||
* apply:10
|
||||
* -> set CATCHUP
|
||||
* -> enter wait-loop
|
||||
* sync:10
|
||||
* -> set READY
|
||||
* -> exit
|
||||
* apply:10
|
||||
* -> exit wait-loop
|
||||
* -> continue rep
|
||||
* - Sync in front:
|
||||
* sync:10
|
||||
* -> set SYNCWAIT
|
||||
* apply:8
|
||||
* -> set SYNCDONE
|
||||
* -> continue per-table filtering
|
||||
* sync:10
|
||||
* -> exit
|
||||
* apply:10
|
||||
* -> set READY
|
||||
* -> stop per-table filtering
|
||||
* -> continue rep
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
|
||||
#include "catalog/pg_subscription_rel.h"
|
||||
#include "catalog/pg_type.h"
|
||||
|
||||
#include "commands/copy.h"
|
||||
|
||||
#include "replication/logicallauncher.h"
|
||||
#include "replication/logicalrelation.h"
|
||||
#include "replication/walreceiver.h"
|
||||
#include "replication/worker_internal.h"
|
||||
|
||||
#include "storage/ipc.h"
|
||||
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
static bool table_states_valid = false;
|
||||
|
||||
StringInfo copybuf = NULL;
|
||||
|
||||
/*
|
||||
* Exit routine for synchronization worker.
|
||||
*/
|
||||
static void pg_attribute_noreturn()
|
||||
finish_sync_worker(void)
|
||||
{
|
||||
/* Commit any outstanding transaction. */
|
||||
if (IsTransactionState())
|
||||
CommitTransactionCommand();
|
||||
|
||||
/* And flush all writes. */
|
||||
XLogFlush(GetXLogWriteRecPtr());
|
||||
|
||||
/* Find the main apply worker and signal it. */
|
||||
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
|
||||
|
||||
ereport(LOG,
|
||||
(errmsg("logical replication synchronization worker finished processing")));
|
||||
|
||||
/* Stop gracefully */
|
||||
walrcv_disconnect(wrconn);
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait until the table synchronization change.
|
||||
*
|
||||
* Returns false if the relation subscription state disappeared.
|
||||
*/
|
||||
static bool
|
||||
wait_for_sync_status_change(Oid relid, char origstate)
|
||||
{
|
||||
int rc;
|
||||
char state = origstate;
|
||||
|
||||
while (!got_SIGTERM)
|
||||
{
|
||||
LogicalRepWorker *worker;
|
||||
|
||||
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
||||
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
|
||||
relid, false);
|
||||
if (!worker)
|
||||
{
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
return false;
|
||||
}
|
||||
state = worker->relstate;
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
|
||||
if (state == SUBREL_STATE_UNKNOWN)
|
||||
return false;
|
||||
|
||||
if (state != origstate)
|
||||
return true;
|
||||
|
||||
rc = WaitLatch(&MyProc->procLatch,
|
||||
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
|
||||
10000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
|
||||
|
||||
/* emergency bailout if postmaster has died */
|
||||
if (rc & WL_POSTMASTER_DEATH)
|
||||
proc_exit(1);
|
||||
|
||||
ResetLatch(&MyProc->procLatch);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback from syscache invalidation.
|
||||
*/
|
||||
void
|
||||
invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
|
||||
{
|
||||
table_states_valid = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle table synchronization cooperation from the synchronization
|
||||
* worker.
|
||||
*
|
||||
* If the sync worker is in catch up mode and reached the predetermined
|
||||
* synchronization point in the WAL stream, mark the table as READY and
|
||||
* finish. If it caught up too far, set to SYNCDONE and finish. Things will
|
||||
* then proceed in the "sync in front" scenario.
|
||||
*/
|
||||
static void
|
||||
process_syncing_tables_for_sync(XLogRecPtr current_lsn)
|
||||
{
|
||||
Assert(IsTransactionState());
|
||||
|
||||
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
|
||||
|
||||
if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
|
||||
current_lsn >= MyLogicalRepWorker->relstate_lsn)
|
||||
{
|
||||
TimeLineID tli;
|
||||
|
||||
MyLogicalRepWorker->relstate =
|
||||
(current_lsn == MyLogicalRepWorker->relstate_lsn)
|
||||
? SUBREL_STATE_READY
|
||||
: SUBREL_STATE_SYNCDONE;
|
||||
MyLogicalRepWorker->relstate_lsn = current_lsn;
|
||||
|
||||
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
||||
|
||||
SetSubscriptionRelState(MyLogicalRepWorker->subid,
|
||||
MyLogicalRepWorker->relid,
|
||||
MyLogicalRepWorker->relstate,
|
||||
MyLogicalRepWorker->relstate_lsn);
|
||||
|
||||
walrcv_endstreaming(wrconn, &tli);
|
||||
finish_sync_worker();
|
||||
}
|
||||
else
|
||||
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle table synchronization cooperation from the apply worker.
|
||||
*
|
||||
* Walk over all subscription tables that are individually tracked by the
|
||||
* apply process (currently, all that have state other than
|
||||
* SUBREL_STATE_READY) and manage synchronization for them.
|
||||
*
|
||||
* If there are tables that need synchronizing and are not being synchronized
|
||||
* yet, start sync workers for them (if there are free slots for sync
|
||||
* workers).
|
||||
*
|
||||
* For tables that are being synchronized already, check if sync workers
|
||||
* either need action from the apply worker or have finished.
|
||||
*
|
||||
* The usual scenario is that the apply got ahead of the sync while the sync
|
||||
* ran, and then the action needed by apply is to mark a table for CATCHUP and
|
||||
* wait for the catchup to happen. In the less common case that sync worker
|
||||
* got in front of the apply worker, the table is marked as SYNCDONE but not
|
||||
* ready yet, as it needs to be tracked until apply reaches the same position
|
||||
* to which it was synced.
|
||||
*
|
||||
* If the synchronization position is reached, then the table can be marked as
|
||||
* READY and is no longer tracked.
|
||||
*/
|
||||
static void
|
||||
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
|
||||
{
|
||||
static List *table_states = NIL;
|
||||
ListCell *lc;
|
||||
|
||||
Assert(!IsTransactionState());
|
||||
|
||||
/* We need up to date sync state info for subscription tables here. */
|
||||
if (!table_states_valid)
|
||||
{
|
||||
MemoryContext oldctx;
|
||||
List *rstates;
|
||||
ListCell *lc;
|
||||
SubscriptionRelState *rstate;
|
||||
|
||||
/* Clean the old list. */
|
||||
list_free_deep(table_states);
|
||||
table_states = NIL;
|
||||
|
||||
StartTransactionCommand();
|
||||
|
||||
/* Fetch all non-ready tables. */
|
||||
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
|
||||
|
||||
/* Allocate the tracking info in a permanent memory context. */
|
||||
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
|
||||
foreach(lc, rstates)
|
||||
{
|
||||
rstate = palloc(sizeof(SubscriptionRelState));
|
||||
memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
|
||||
table_states = lappend(table_states, rstate);
|
||||
}
|
||||
MemoryContextSwitchTo(oldctx);
|
||||
|
||||
CommitTransactionCommand();
|
||||
|
||||
table_states_valid = true;
|
||||
}
|
||||
|
||||
/* Process all tables that are being synchronized. */
|
||||
foreach(lc, table_states)
|
||||
{
|
||||
SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc);
|
||||
|
||||
if (rstate->state == SUBREL_STATE_SYNCDONE)
|
||||
{
|
||||
/*
|
||||
* Apply has caught up to the position where the table sync
|
||||
* has finished. Time to mark the table as ready so that
|
||||
* apply will just continue to replicate it normally.
|
||||
*/
|
||||
if (current_lsn >= rstate->lsn)
|
||||
{
|
||||
rstate->state = SUBREL_STATE_READY;
|
||||
rstate->lsn = current_lsn;
|
||||
StartTransactionCommand();
|
||||
SetSubscriptionRelState(MyLogicalRepWorker->subid,
|
||||
rstate->relid, rstate->state,
|
||||
rstate->lsn);
|
||||
CommitTransactionCommand();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LogicalRepWorker *syncworker;
|
||||
int nsyncworkers = 0;
|
||||
|
||||
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
||||
syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
|
||||
rstate->relid, false);
|
||||
if (syncworker)
|
||||
{
|
||||
SpinLockAcquire(&syncworker->relmutex);
|
||||
rstate->state = syncworker->relstate;
|
||||
rstate->lsn = syncworker->relstate_lsn;
|
||||
SpinLockRelease(&syncworker->relmutex);
|
||||
}
|
||||
else
|
||||
/*
|
||||
* If no sync worker for this table yet, could running sync
|
||||
* workers for this subscription, while we have the lock, for
|
||||
* later.
|
||||
*/
|
||||
nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
|
||||
/*
|
||||
* There is a worker synchronizing the relation and waiting for
|
||||
* apply to do something.
|
||||
*/
|
||||
if (syncworker && rstate->state == SUBREL_STATE_SYNCWAIT)
|
||||
{
|
||||
/*
|
||||
* There are three possible synchronization situations here.
|
||||
*
|
||||
* a) Apply is in front of the table sync: We tell the table
|
||||
* sync to CATCHUP.
|
||||
*
|
||||
* b) Apply is behind the table sync: We tell the table sync
|
||||
* to mark the table as SYNCDONE and finish.
|
||||
|
||||
* c) Apply and table sync are at the same position: We tell
|
||||
* table sync to mark the table as READY and finish.
|
||||
*
|
||||
* In any case we'll need to wait for table sync to change
|
||||
* the state in catalog and only then continue ourselves.
|
||||
*/
|
||||
if (current_lsn > rstate->lsn)
|
||||
{
|
||||
rstate->state = SUBREL_STATE_CATCHUP;
|
||||
rstate->lsn = current_lsn;
|
||||
}
|
||||
else if (current_lsn == rstate->lsn)
|
||||
{
|
||||
rstate->state = SUBREL_STATE_READY;
|
||||
rstate->lsn = current_lsn;
|
||||
}
|
||||
else
|
||||
rstate->state = SUBREL_STATE_SYNCDONE;
|
||||
|
||||
SpinLockAcquire(&syncworker->relmutex);
|
||||
syncworker->relstate = rstate->state;
|
||||
syncworker->relstate_lsn = rstate->lsn;
|
||||
SpinLockRelease(&syncworker->relmutex);
|
||||
|
||||
/* Signal the sync worker, as it may be waiting for us. */
|
||||
logicalrep_worker_wakeup_ptr(syncworker);
|
||||
|
||||
/*
|
||||
* Enter busy loop and wait for synchronization status
|
||||
* change.
|
||||
*/
|
||||
wait_for_sync_status_change(rstate->relid, rstate->state);
|
||||
}
|
||||
|
||||
/*
|
||||
* If there is no sync worker registered for the table and
|
||||
* there is some free sync worker slot, start new sync worker
|
||||
* for the table.
|
||||
*/
|
||||
else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
|
||||
{
|
||||
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
|
||||
MySubscription->oid,
|
||||
MySubscription->name,
|
||||
MyLogicalRepWorker->userid,
|
||||
rstate->relid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Process state possible change(s) of tables that are being synchronized.
|
||||
*/
|
||||
void
|
||||
process_syncing_tables(XLogRecPtr current_lsn)
|
||||
{
|
||||
if (am_tablesync_worker())
|
||||
process_syncing_tables_for_sync(current_lsn);
|
||||
else
|
||||
process_syncing_tables_for_apply(current_lsn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Create list of columns for COPY based on logical relation mapping.
|
||||
*/
|
||||
static List *
|
||||
make_copy_attnamelist(LogicalRepRelMapEntry *rel)
|
||||
{
|
||||
List *attnamelist = NIL;
|
||||
TupleDesc desc = RelationGetDescr(rel->localrel);
|
||||
int i;
|
||||
|
||||
for (i = 0; i < desc->natts; i++)
|
||||
{
|
||||
int remoteattnum = rel->attrmap[i];
|
||||
|
||||
/* Skip dropped attributes. */
|
||||
if (desc->attrs[i]->attisdropped)
|
||||
continue;
|
||||
|
||||
/* Skip attributes that are missing on remote side. */
|
||||
if (remoteattnum < 0)
|
||||
continue;
|
||||
|
||||
attnamelist = lappend(attnamelist,
|
||||
makeString(rel->remoterel.attnames[remoteattnum]));
|
||||
}
|
||||
|
||||
return attnamelist;
|
||||
}
|
||||
|
||||
/*
|
||||
* Data source callback for the COPY FROM, which reads from the remote
|
||||
* connection and passes the data back to our local COPY.
|
||||
*/
|
||||
static int
|
||||
copy_read_data(void *outbuf, int minread, int maxread)
|
||||
{
|
||||
int bytesread = 0;
|
||||
int avail;
|
||||
|
||||
/* If there are some leftover data from previous read, use them. */
|
||||
avail = copybuf->len - copybuf->cursor;
|
||||
if (avail)
|
||||
{
|
||||
if (avail > maxread)
|
||||
avail = maxread;
|
||||
memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
|
||||
copybuf->cursor += avail;
|
||||
maxread -= avail;
|
||||
bytesread += avail;
|
||||
}
|
||||
|
||||
while (!got_SIGTERM && maxread > 0 && bytesread < minread)
|
||||
{
|
||||
pgsocket fd = PGINVALID_SOCKET;
|
||||
int rc;
|
||||
int len;
|
||||
char *buf = NULL;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
/* Try read the data. */
|
||||
len = walrcv_receive(wrconn, &buf, &fd);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
if (len == 0)
|
||||
break;
|
||||
else if (len < 0)
|
||||
return bytesread;
|
||||
else
|
||||
{
|
||||
/* Process the data */
|
||||
copybuf->data = buf;
|
||||
copybuf->len = len;
|
||||
copybuf->cursor = 0;
|
||||
|
||||
avail = copybuf->len - copybuf->cursor;
|
||||
if (avail > maxread)
|
||||
avail = maxread;
|
||||
memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
|
||||
outbuf = (void *) ((char *) outbuf + avail);
|
||||
copybuf->cursor += avail;
|
||||
maxread -= avail;
|
||||
bytesread += avail;
|
||||
}
|
||||
|
||||
if (maxread <= 0 || bytesread >= minread)
|
||||
return bytesread;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait for more data or latch.
|
||||
*/
|
||||
rc = WaitLatchOrSocket(&MyProc->procLatch,
|
||||
WL_SOCKET_READABLE | WL_LATCH_SET |
|
||||
WL_TIMEOUT | WL_POSTMASTER_DEATH,
|
||||
fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
|
||||
|
||||
/* Emergency bailout if postmaster has died */
|
||||
if (rc & WL_POSTMASTER_DEATH)
|
||||
proc_exit(1);
|
||||
|
||||
ResetLatch(&MyProc->procLatch);
|
||||
}
|
||||
|
||||
/* Check for exit condition. */
|
||||
if (got_SIGTERM)
|
||||
proc_exit(0);
|
||||
|
||||
return bytesread;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Get information about remote relation in similar fashion the RELATION
|
||||
* message provides during replication.
|
||||
*/
|
||||
static void
|
||||
fetch_remote_table_info(char *nspname, char *relname,
|
||||
LogicalRepRelation *lrel)
|
||||
{
|
||||
WalRcvExecResult *res;
|
||||
StringInfoData cmd;
|
||||
TupleTableSlot *slot;
|
||||
Oid tableRow[2] = {OIDOID, CHAROID};
|
||||
Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
|
||||
bool isnull;
|
||||
int natt;
|
||||
|
||||
lrel->nspname = nspname;
|
||||
lrel->relname = relname;
|
||||
|
||||
/* First fetch Oid and replica identity. */
|
||||
initStringInfo(&cmd);
|
||||
appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
|
||||
" FROM pg_catalog.pg_class c,"
|
||||
" pg_catalog.pg_namespace n"
|
||||
" WHERE n.nspname = %s"
|
||||
" AND c.relname = %s"
|
||||
" AND c.relkind = 'r'",
|
||||
quote_literal_cstr(nspname),
|
||||
quote_literal_cstr(relname));
|
||||
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
|
||||
|
||||
if (res->status != WALRCV_OK_TUPLES)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
|
||||
nspname, relname, res->err)));
|
||||
|
||||
slot = MakeSingleTupleTableSlot(res->tupledesc);
|
||||
if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
|
||||
ereport(ERROR,
|
||||
(errmsg("table \"%s.%s\" not found on publisher",
|
||||
nspname, relname)));
|
||||
|
||||
lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
|
||||
Assert(!isnull);
|
||||
lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
|
||||
Assert(!isnull);
|
||||
|
||||
ExecDropSingleTupleTableSlot(slot);
|
||||
walrcv_clear_result(res);
|
||||
|
||||
/* Now fetch columns. */
|
||||
resetStringInfo(&cmd);
|
||||
appendStringInfo(&cmd,
|
||||
"SELECT a.attname,"
|
||||
" a.atttypid,"
|
||||
" a.atttypmod,"
|
||||
" a.attnum = ANY(i.indkey)"
|
||||
" FROM pg_catalog.pg_attribute a"
|
||||
" LEFT JOIN pg_catalog.pg_index i"
|
||||
" ON (i.indexrelid = pg_get_replica_identity_index(%u))"
|
||||
" WHERE a.attnum > 0::pg_catalog.int2"
|
||||
" AND NOT a.attisdropped"
|
||||
" AND a.attrelid = %u"
|
||||
" ORDER BY a.attnum",
|
||||
lrel->remoteid, lrel->remoteid);
|
||||
res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
|
||||
|
||||
if (res->status != WALRCV_OK_TUPLES)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not fetch table info for table \"%s.%s\": %s",
|
||||
nspname, relname, res->err)));
|
||||
|
||||
/* We don't know number of rows coming, so allocate enough space. */
|
||||
lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
|
||||
lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
|
||||
lrel->attkeys = NULL;
|
||||
|
||||
natt = 0;
|
||||
slot = MakeSingleTupleTableSlot(res->tupledesc);
|
||||
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
|
||||
{
|
||||
lrel->attnames[natt] =
|
||||
pstrdup(TextDatumGetCString(slot_getattr(slot, 1, &isnull)));
|
||||
Assert(!isnull);
|
||||
lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
|
||||
Assert(!isnull);
|
||||
if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
|
||||
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
|
||||
|
||||
/* Should never happen. */
|
||||
if (++natt >= MaxTupleAttributeNumber)
|
||||
elog(ERROR, "too many columns in remote table \"%s.%s\"",
|
||||
nspname, relname);
|
||||
|
||||
ExecClearTuple(slot);
|
||||
}
|
||||
ExecDropSingleTupleTableSlot(slot);
|
||||
|
||||
lrel->natts = natt;
|
||||
|
||||
walrcv_clear_result(res);
|
||||
pfree(cmd.data);
|
||||
}
|
||||
|
||||
/*
|
||||
* Copy existing data of a table from publisher.
|
||||
*
|
||||
* Caller is responsible for locking the local relation.
|
||||
*/
|
||||
static void
|
||||
copy_table(Relation rel)
|
||||
{
|
||||
LogicalRepRelMapEntry *relmapentry;
|
||||
LogicalRepRelation lrel;
|
||||
WalRcvExecResult *res;
|
||||
StringInfoData cmd;
|
||||
CopyState cstate;
|
||||
List *attnamelist;
|
||||
|
||||
/* Get the publisher relation info. */
|
||||
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
|
||||
RelationGetRelationName(rel), &lrel);
|
||||
|
||||
/* Put the relation into relmap. */
|
||||
logicalrep_relmap_update(&lrel);
|
||||
|
||||
/* Map the publisher relation to local one. */
|
||||
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
|
||||
Assert(rel == relmapentry->localrel);
|
||||
|
||||
/* Start copy on the publisher. */
|
||||
initStringInfo(&cmd);
|
||||
appendStringInfo(&cmd, "COPY %s TO STDOUT",
|
||||
quote_qualified_identifier(lrel.nspname, lrel.relname));
|
||||
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
|
||||
pfree(cmd.data);
|
||||
if (res->status != WALRCV_OK_COPY_OUT)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not start initial contents copy for table \"%s.%s\": %s",
|
||||
lrel.nspname, lrel.relname, res->err)));
|
||||
walrcv_clear_result(res);
|
||||
|
||||
copybuf = makeStringInfo();
|
||||
|
||||
/* Create CopyState for ingestion of the data from publisher. */
|
||||
attnamelist = make_copy_attnamelist(relmapentry);
|
||||
cstate = BeginCopyFrom(NULL, rel, NULL, false, copy_read_data, attnamelist, NIL);
|
||||
|
||||
/* Do the copy */
|
||||
(void) CopyFrom(cstate);
|
||||
|
||||
logicalrep_rel_close(relmapentry, NoLock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Start syncing the table in the sync worker.
|
||||
*
|
||||
* The returned slot name is palloced in current memory context.
|
||||
*/
|
||||
char *
|
||||
LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
|
||||
{
|
||||
char *slotname;
|
||||
char *err;
|
||||
|
||||
/* Check the state of the table synchronization. */
|
||||
StartTransactionCommand();
|
||||
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
|
||||
MyLogicalRepWorker->relstate =
|
||||
GetSubscriptionRelState(MyLogicalRepWorker->subid,
|
||||
MyLogicalRepWorker->relid,
|
||||
&MyLogicalRepWorker->relstate_lsn,
|
||||
false);
|
||||
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
||||
CommitTransactionCommand();
|
||||
|
||||
/*
|
||||
* To build a slot name for the sync work, we are limited to NAMEDATALEN -
|
||||
* 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
|
||||
* and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
|
||||
* NAMEDATALEN on the remote that matters, but this scheme will also work
|
||||
* reasonably if that is different.)
|
||||
*/
|
||||
StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
|
||||
slotname = psprintf("%.*s_%u_sync_%u",
|
||||
NAMEDATALEN - 28,
|
||||
MySubscription->slotname,
|
||||
MySubscription->oid,
|
||||
MyLogicalRepWorker->relid);
|
||||
|
||||
wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
|
||||
if (wrconn == NULL)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not connect to the publisher: %s", err)));
|
||||
|
||||
switch (MyLogicalRepWorker->relstate)
|
||||
{
|
||||
case SUBREL_STATE_INIT:
|
||||
case SUBREL_STATE_DATASYNC:
|
||||
{
|
||||
Relation rel;
|
||||
WalRcvExecResult *res;
|
||||
|
||||
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
|
||||
MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
|
||||
MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
|
||||
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
||||
|
||||
/* Update the state and make it visible to others. */
|
||||
StartTransactionCommand();
|
||||
SetSubscriptionRelState(MyLogicalRepWorker->subid,
|
||||
MyLogicalRepWorker->relid,
|
||||
MyLogicalRepWorker->relstate,
|
||||
MyLogicalRepWorker->relstate_lsn);
|
||||
CommitTransactionCommand();
|
||||
|
||||
/*
|
||||
* We want to do the table data sync in single
|
||||
* transaction.
|
||||
*/
|
||||
StartTransactionCommand();
|
||||
|
||||
/*
|
||||
* Use standard write lock here. It might be better to
|
||||
* disallow access to table while it's being synchronized.
|
||||
* But we don't want to block the main apply process from
|
||||
* working and it has to open relation in RowExclusiveLock
|
||||
* when remapping remote relation id to local one.
|
||||
*/
|
||||
rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock);
|
||||
|
||||
/*
|
||||
* Create temporary slot for the sync process.
|
||||
* We do this inside transaction so that we can use the
|
||||
* snapshot made by the slot to get existing data.
|
||||
*/
|
||||
res = walrcv_exec(wrconn,
|
||||
"BEGIN READ ONLY ISOLATION LEVEL "
|
||||
"REPEATABLE READ", 0, NULL);
|
||||
if (res->status != WALRCV_OK_COMMAND)
|
||||
ereport(ERROR,
|
||||
(errmsg("table copy could not start transaction on publisher"),
|
||||
errdetail("The error was: %s", res->err)));
|
||||
walrcv_clear_result(res);
|
||||
|
||||
/*
|
||||
* Create new temporary logical decoding slot.
|
||||
*
|
||||
* We'll use slot for data copy so make sure the snapshot
|
||||
* is used for the transaction, that way the COPY will get
|
||||
* data that is consistent with the lsn used by the slot
|
||||
* to start decoding.
|
||||
*/
|
||||
walrcv_create_slot(wrconn, slotname, true,
|
||||
CRS_USE_SNAPSHOT, origin_startpos);
|
||||
|
||||
copy_table(rel);
|
||||
|
||||
res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
|
||||
if (res->status != WALRCV_OK_COMMAND)
|
||||
ereport(ERROR,
|
||||
(errmsg("table copy could not finish transaction on publisher"),
|
||||
errdetail("The error was: %s", res->err)));
|
||||
walrcv_clear_result(res);
|
||||
|
||||
heap_close(rel, NoLock);
|
||||
|
||||
/* Make the copy visible. */
|
||||
CommandCounterIncrement();
|
||||
|
||||
/*
|
||||
* We are done with the initial data synchronization,
|
||||
* update the state.
|
||||
*/
|
||||
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
|
||||
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
|
||||
MyLogicalRepWorker->relstate_lsn = *origin_startpos;
|
||||
SpinLockRelease(&MyLogicalRepWorker->relmutex);
|
||||
|
||||
/*
|
||||
* Wait for main apply worker to either tell us to
|
||||
* catchup or that we are done.
|
||||
*/
|
||||
wait_for_sync_status_change(MyLogicalRepWorker->relid,
|
||||
MyLogicalRepWorker->relstate);
|
||||
if (MyLogicalRepWorker->relstate != SUBREL_STATE_CATCHUP)
|
||||
{
|
||||
/* Update the new state. */
|
||||
SetSubscriptionRelState(MyLogicalRepWorker->subid,
|
||||
MyLogicalRepWorker->relid,
|
||||
MyLogicalRepWorker->relstate,
|
||||
MyLogicalRepWorker->relstate_lsn);
|
||||
finish_sync_worker();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case SUBREL_STATE_SYNCDONE:
|
||||
case SUBREL_STATE_READY:
|
||||
/* Nothing to do here but finish. */
|
||||
finish_sync_worker();
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown relation state \"%c\"",
|
||||
MyLogicalRepWorker->relstate);
|
||||
}
|
||||
|
||||
return slotname;
|
||||
}
|
||||
@@ -32,6 +32,7 @@
|
||||
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_subscription.h"
|
||||
#include "catalog/pg_subscription_rel.h"
|
||||
|
||||
#include "commands/trigger.h"
|
||||
|
||||
@@ -101,7 +102,7 @@ typedef struct SlotErrCallbackArg
|
||||
} SlotErrCallbackArg;
|
||||
|
||||
static MemoryContext ApplyContext = NULL;
|
||||
static MemoryContext ApplyCacheContext = NULL;
|
||||
MemoryContext ApplyCacheContext = NULL;
|
||||
|
||||
WalReceiverConn *wrconn = NULL;
|
||||
|
||||
@@ -109,6 +110,7 @@ Subscription *MySubscription = NULL;
|
||||
bool MySubscriptionValid = false;
|
||||
|
||||
bool in_remote_transaction = false;
|
||||
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
|
||||
|
||||
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
|
||||
|
||||
@@ -116,6 +118,30 @@ static void store_flush_position(XLogRecPtr remote_lsn);
|
||||
|
||||
static void reread_subscription(void);
|
||||
|
||||
/*
|
||||
* Should this worker apply changes for given relation.
|
||||
*
|
||||
* This is mainly needed for initial relation data sync as that runs in
|
||||
* separate worker process running in parallel and we need some way to skip
|
||||
* changes coming to the main apply worker during the sync of a table.
|
||||
*
|
||||
* Note we need to do smaller or equals comparison for SYNCDONE state because
|
||||
* it might hold position of end of intitial slot consistent point WAL
|
||||
* record + 1 (ie start of next record) and next record can be COMMIT of
|
||||
* transaction we are now processing (which is what we set remote_final_lsn
|
||||
* to in apply_handle_begin).
|
||||
*/
|
||||
static bool
|
||||
should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
|
||||
{
|
||||
if (am_tablesync_worker())
|
||||
return MyLogicalRepWorker->relid == rel->localreloid;
|
||||
else
|
||||
return (rel->state == SUBREL_STATE_READY ||
|
||||
(rel->state == SUBREL_STATE_SYNCDONE &&
|
||||
rel->statelsn <= remote_final_lsn));
|
||||
}
|
||||
|
||||
/*
|
||||
* Make sure that we started local transaction.
|
||||
*
|
||||
@@ -398,6 +424,8 @@ apply_handle_begin(StringInfo s)
|
||||
replorigin_session_origin_timestamp = begin_data.committime;
|
||||
replorigin_session_origin_lsn = begin_data.final_lsn;
|
||||
|
||||
remote_final_lsn = begin_data.final_lsn;
|
||||
|
||||
in_remote_transaction = true;
|
||||
|
||||
pgstat_report_activity(STATE_RUNNING, NULL);
|
||||
@@ -418,7 +446,10 @@ apply_handle_commit(StringInfo s)
|
||||
Assert(commit_data.commit_lsn == replorigin_session_origin_lsn);
|
||||
Assert(commit_data.committime == replorigin_session_origin_timestamp);
|
||||
|
||||
if (IsTransactionState())
|
||||
Assert(commit_data.commit_lsn == remote_final_lsn);
|
||||
|
||||
/* The synchronization worker runs in single transaction. */
|
||||
if (IsTransactionState() && !am_tablesync_worker())
|
||||
{
|
||||
CommitTransactionCommand();
|
||||
|
||||
@@ -427,6 +458,9 @@ apply_handle_commit(StringInfo s)
|
||||
|
||||
in_remote_transaction = false;
|
||||
|
||||
/* Process any tables that are being synchronized in parallel. */
|
||||
process_syncing_tables(commit_data.end_lsn);
|
||||
|
||||
pgstat_report_activity(STATE_IDLE, NULL);
|
||||
}
|
||||
|
||||
@@ -442,7 +476,8 @@ apply_handle_origin(StringInfo s)
|
||||
* ORIGIN message can only come inside remote transaction and before
|
||||
* any actual writes.
|
||||
*/
|
||||
if (!in_remote_transaction || IsTransactionState())
|
||||
if (!in_remote_transaction ||
|
||||
(IsTransactionState() && !am_tablesync_worker()))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||
errmsg("ORIGIN message sent out of order")));
|
||||
@@ -515,6 +550,15 @@ apply_handle_insert(StringInfo s)
|
||||
|
||||
relid = logicalrep_read_insert(s, &newtup);
|
||||
rel = logicalrep_rel_open(relid, RowExclusiveLock);
|
||||
if (!should_apply_changes_for_rel(rel))
|
||||
{
|
||||
/*
|
||||
* The relation can't become interesting in the middle of the
|
||||
* transaction so it's safe to unlock it.
|
||||
*/
|
||||
logicalrep_rel_close(rel, RowExclusiveLock);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Initialize the executor state. */
|
||||
estate = create_estate_for_relation(rel);
|
||||
@@ -607,6 +651,15 @@ apply_handle_update(StringInfo s)
|
||||
relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
|
||||
&newtup);
|
||||
rel = logicalrep_rel_open(relid, RowExclusiveLock);
|
||||
if (!should_apply_changes_for_rel(rel))
|
||||
{
|
||||
/*
|
||||
* The relation can't become interesting in the middle of the
|
||||
* transaction so it's safe to unlock it.
|
||||
*/
|
||||
logicalrep_rel_close(rel, RowExclusiveLock);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Check if we can do the update. */
|
||||
check_relation_updatable(rel);
|
||||
@@ -716,6 +769,15 @@ apply_handle_delete(StringInfo s)
|
||||
|
||||
relid = logicalrep_read_delete(s, &oldtup);
|
||||
rel = logicalrep_rel_open(relid, RowExclusiveLock);
|
||||
if (!should_apply_changes_for_rel(rel))
|
||||
{
|
||||
/*
|
||||
* The relation can't become interesting in the middle of the
|
||||
* transaction so it's safe to unlock it.
|
||||
*/
|
||||
logicalrep_rel_close(rel, RowExclusiveLock);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Check if we can do the delete. */
|
||||
check_relation_updatable(rel);
|
||||
@@ -927,10 +989,8 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
|
||||
* Apply main loop.
|
||||
*/
|
||||
static void
|
||||
ApplyLoop(void)
|
||||
LogicalRepApplyLoop(XLogRecPtr last_received)
|
||||
{
|
||||
XLogRecPtr last_received = InvalidXLogRecPtr;
|
||||
|
||||
/* Init the ApplyContext which we use for easier cleanup. */
|
||||
ApplyContext = AllocSetContextCreate(TopMemoryContext,
|
||||
"ApplyContext",
|
||||
@@ -1014,15 +1074,18 @@ ApplyLoop(void)
|
||||
}
|
||||
else if (c == 'k')
|
||||
{
|
||||
XLogRecPtr endpos;
|
||||
XLogRecPtr end_lsn;
|
||||
TimestampTz timestamp;
|
||||
bool reply_requested;
|
||||
|
||||
endpos = pq_getmsgint64(&s);
|
||||
end_lsn = pq_getmsgint64(&s);
|
||||
timestamp = pq_getmsgint64(&s);
|
||||
reply_requested = pq_getmsgbyte(&s);
|
||||
|
||||
send_feedback(endpos, reply_requested, false);
|
||||
if (last_received < end_lsn)
|
||||
last_received = end_lsn;
|
||||
|
||||
send_feedback(last_received, reply_requested, false);
|
||||
UpdateWorkerStats(last_received, timestamp, true);
|
||||
}
|
||||
/* other message types are purposefully ignored */
|
||||
@@ -1030,6 +1093,9 @@ ApplyLoop(void)
|
||||
|
||||
len = walrcv_receive(wrconn, &buf, &fd);
|
||||
}
|
||||
|
||||
/* confirm all writes at once */
|
||||
send_feedback(last_received, false, false);
|
||||
}
|
||||
|
||||
if (!in_remote_transaction)
|
||||
@@ -1038,15 +1104,13 @@ ApplyLoop(void)
|
||||
* If we didn't get any transactions for a while there might be
|
||||
* unconsumed invalidation messages in the queue, consume them now.
|
||||
*/
|
||||
StartTransactionCommand();
|
||||
/* Check for subscription change */
|
||||
AcceptInvalidationMessages();
|
||||
if (!MySubscriptionValid)
|
||||
reread_subscription();
|
||||
CommitTransactionCommand();
|
||||
}
|
||||
|
||||
/* confirm all writes at once */
|
||||
send_feedback(last_received, false, false);
|
||||
/* Process any table synchronization changes. */
|
||||
process_syncing_tables(last_received);
|
||||
}
|
||||
|
||||
/* Cleanup the memory. */
|
||||
MemoryContextResetAndDeleteChildren(ApplyContext);
|
||||
@@ -1054,7 +1118,11 @@ ApplyLoop(void)
|
||||
|
||||
/* Check if we need to exit the streaming loop. */
|
||||
if (endofstream)
|
||||
{
|
||||
TimeLineID tli;
|
||||
walrcv_endstreaming(wrconn, &tli);
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait for more data or latch.
|
||||
@@ -1222,6 +1290,14 @@ reread_subscription(void)
|
||||
{
|
||||
MemoryContext oldctx;
|
||||
Subscription *newsub;
|
||||
bool started_tx = false;
|
||||
|
||||
/* This function might be called inside or outside of transaction. */
|
||||
if (!IsTransactionState())
|
||||
{
|
||||
StartTransactionCommand();
|
||||
started_tx = true;
|
||||
}
|
||||
|
||||
/* Ensure allocations in permanent context. */
|
||||
oldctx = MemoryContextSwitchTo(ApplyCacheContext);
|
||||
@@ -1319,6 +1395,9 @@ reread_subscription(void)
|
||||
|
||||
MemoryContextSwitchTo(oldctx);
|
||||
|
||||
if (started_tx)
|
||||
CommitTransactionCommand();
|
||||
|
||||
MySubscriptionValid = true;
|
||||
}
|
||||
|
||||
@@ -1339,11 +1418,8 @@ ApplyWorkerMain(Datum main_arg)
|
||||
int worker_slot = DatumGetObjectId(main_arg);
|
||||
MemoryContext oldctx;
|
||||
char originname[NAMEDATALEN];
|
||||
RepOriginId originid;
|
||||
XLogRecPtr origin_startpos;
|
||||
char *err;
|
||||
int server_version;
|
||||
TimeLineID startpointTLI;
|
||||
char *myslotname;
|
||||
WalRcvStreamOptions options;
|
||||
|
||||
/* Attach to slot */
|
||||
@@ -1402,49 +1478,90 @@ ApplyWorkerMain(Datum main_arg)
|
||||
subscription_change_cb,
|
||||
(Datum) 0);
|
||||
|
||||
ereport(LOG,
|
||||
(errmsg("logical replication apply for subscription \"%s\" has started",
|
||||
MySubscription->name)));
|
||||
|
||||
/* Setup replication origin tracking. */
|
||||
snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
|
||||
originid = replorigin_by_name(originname, true);
|
||||
if (!OidIsValid(originid))
|
||||
originid = replorigin_create(originname);
|
||||
replorigin_session_setup(originid);
|
||||
replorigin_session_origin = originid;
|
||||
origin_startpos = replorigin_session_get_progress(false);
|
||||
if (am_tablesync_worker())
|
||||
elog(LOG, "logical replication sync for subscription %s, table %s started",
|
||||
MySubscription->name, get_rel_name(MyLogicalRepWorker->relid));
|
||||
else
|
||||
elog(LOG, "logical replication apply for subscription %s started",
|
||||
MySubscription->name);
|
||||
|
||||
CommitTransactionCommand();
|
||||
|
||||
/* Connect to the origin and start the replication. */
|
||||
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
|
||||
MySubscription->conninfo);
|
||||
wrconn = walrcv_connect(MySubscription->conninfo, true,
|
||||
MySubscription->name, &err);
|
||||
if (wrconn == NULL)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not connect to the publisher: %s", err)));
|
||||
|
||||
if (am_tablesync_worker())
|
||||
{
|
||||
char *syncslotname;
|
||||
|
||||
/* This is table synchroniation worker, call initial sync. */
|
||||
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
|
||||
|
||||
/* The slot name needs to be allocated in permanent memory context. */
|
||||
oldctx = MemoryContextSwitchTo(ApplyCacheContext);
|
||||
myslotname = pstrdup(syncslotname);
|
||||
MemoryContextSwitchTo(oldctx);
|
||||
|
||||
pfree(syncslotname);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* This is main apply worker */
|
||||
RepOriginId originid;
|
||||
TimeLineID startpointTLI;
|
||||
char *err;
|
||||
int server_version;
|
||||
|
||||
myslotname = MySubscription->slotname;
|
||||
|
||||
/* Setup replication origin tracking. */
|
||||
StartTransactionCommand();
|
||||
snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
|
||||
originid = replorigin_by_name(originname, true);
|
||||
if (!OidIsValid(originid))
|
||||
originid = replorigin_create(originname);
|
||||
replorigin_session_setup(originid);
|
||||
replorigin_session_origin = originid;
|
||||
origin_startpos = replorigin_session_get_progress(false);
|
||||
CommitTransactionCommand();
|
||||
|
||||
wrconn = walrcv_connect(MySubscription->conninfo, true, myslotname,
|
||||
&err);
|
||||
if (wrconn == NULL)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not connect to the publisher: %s", err)));
|
||||
|
||||
/*
|
||||
* We don't really use the output identify_system for anything
|
||||
* but it does some initializations on the upstream so let's still
|
||||
* call it.
|
||||
*/
|
||||
(void) walrcv_identify_system(wrconn, &startpointTLI,
|
||||
&server_version);
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* We don't really use the output identify_system for anything
|
||||
* but it does some initializations on the upstream so let's still
|
||||
* call it.
|
||||
* Setup callback for syscache so that we know when something
|
||||
* changes in the subscription relation state.
|
||||
*/
|
||||
(void) walrcv_identify_system(wrconn, &startpointTLI, &server_version);
|
||||
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
|
||||
invalidate_syncing_table_states,
|
||||
(Datum) 0);
|
||||
|
||||
/* Build logical replication streaming options. */
|
||||
options.logical = true;
|
||||
options.startpoint = origin_startpos;
|
||||
options.slotname = MySubscription->slotname;
|
||||
options.slotname = myslotname;
|
||||
options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
|
||||
options.proto.logical.publication_names = MySubscription->publications;
|
||||
|
||||
/* Start streaming from the slot. */
|
||||
/* Start normal logical streaming replication. */
|
||||
walrcv_startstreaming(wrconn, &options);
|
||||
|
||||
/* Run the main loop. */
|
||||
ApplyLoop();
|
||||
LogicalRepApplyLoop(origin_startpos);
|
||||
|
||||
walrcv_disconnect(wrconn);
|
||||
|
||||
|
||||
@@ -25,6 +25,8 @@
|
||||
/* Result of the parsing is returned here */
|
||||
Node *replication_parse_result;
|
||||
|
||||
static SQLCmd *make_sqlcmd(void);
|
||||
|
||||
|
||||
/*
|
||||
* Bison doesn't allocate anything that needs to live across parser calls,
|
||||
@@ -57,6 +59,7 @@ Node *replication_parse_result;
|
||||
%token <str> SCONST IDENT
|
||||
%token <uintval> UCONST
|
||||
%token <recptr> RECPTR
|
||||
%token T_WORD
|
||||
|
||||
/* Keyword tokens. */
|
||||
%token K_BASE_BACKUP
|
||||
@@ -81,11 +84,12 @@ Node *replication_parse_result;
|
||||
%token K_TEMPORARY
|
||||
%token K_EXPORT_SNAPSHOT
|
||||
%token K_NOEXPORT_SNAPSHOT
|
||||
%token K_USE_SNAPSHOT
|
||||
|
||||
%type <node> command
|
||||
%type <node> base_backup start_replication start_logical_replication
|
||||
create_replication_slot drop_replication_slot identify_system
|
||||
timeline_history show
|
||||
timeline_history show sql_cmd
|
||||
%type <list> base_backup_opt_list
|
||||
%type <defelt> base_backup_opt
|
||||
%type <uintval> opt_timeline
|
||||
@@ -118,6 +122,7 @@ command:
|
||||
| drop_replication_slot
|
||||
| timeline_history
|
||||
| show
|
||||
| sql_cmd
|
||||
;
|
||||
|
||||
/*
|
||||
@@ -248,6 +253,11 @@ create_slot_opt:
|
||||
$$ = makeDefElem("export_snapshot",
|
||||
(Node *)makeInteger(FALSE), -1);
|
||||
}
|
||||
| K_USE_SNAPSHOT
|
||||
{
|
||||
$$ = makeDefElem("use_snapshot",
|
||||
(Node *)makeInteger(TRUE), -1);
|
||||
}
|
||||
| K_RESERVE_WAL
|
||||
{
|
||||
$$ = makeDefElem("reserve_wal",
|
||||
@@ -373,6 +383,26 @@ plugin_opt_arg:
|
||||
SCONST { $$ = (Node *) makeString($1); }
|
||||
| /* EMPTY */ { $$ = NULL; }
|
||||
;
|
||||
|
||||
sql_cmd:
|
||||
IDENT { $$ = (Node *) make_sqlcmd(); }
|
||||
;
|
||||
%%
|
||||
|
||||
static SQLCmd *
|
||||
make_sqlcmd(void)
|
||||
{
|
||||
SQLCmd *cmd = makeNode(SQLCmd);
|
||||
int tok;
|
||||
|
||||
/* Just move lexer to the end of command. */
|
||||
for (;;)
|
||||
{
|
||||
tok = yylex();
|
||||
if (tok == ';' || tok == 0)
|
||||
break;
|
||||
}
|
||||
return cmd;
|
||||
}
|
||||
|
||||
#include "repl_scanner.c"
|
||||
|
||||
@@ -102,6 +102,7 @@ SLOT { return K_SLOT; }
|
||||
TEMPORARY { return K_TEMPORARY; }
|
||||
EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
|
||||
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
|
||||
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
|
||||
|
||||
"," { return ','; }
|
||||
";" { return ';'; }
|
||||
@@ -180,9 +181,7 @@ NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
|
||||
}
|
||||
|
||||
. {
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("syntax error: unexpected character \"%s\"", yytext)));
|
||||
return T_WORD;
|
||||
}
|
||||
%%
|
||||
|
||||
|
||||
@@ -753,7 +753,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
|
||||
static void
|
||||
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
|
||||
bool *reserve_wal,
|
||||
bool *export_snapshot)
|
||||
CRSSnapshotAction *snapshot_action)
|
||||
{
|
||||
ListCell *lc;
|
||||
bool snapshot_action_given = false;
|
||||
@@ -772,7 +772,18 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
|
||||
errmsg("conflicting or redundant options")));
|
||||
|
||||
snapshot_action_given = true;
|
||||
*export_snapshot = defGetBoolean(defel);
|
||||
*snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
|
||||
CRS_NOEXPORT_SNAPSHOT;
|
||||
}
|
||||
else if (strcmp(defel->defname, "use_snapshot") == 0)
|
||||
{
|
||||
if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
|
||||
snapshot_action_given = true;
|
||||
*snapshot_action = CRS_USE_SNAPSHOT;
|
||||
}
|
||||
else if (strcmp(defel->defname, "reserve_wal") == 0)
|
||||
{
|
||||
@@ -799,7 +810,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
char xpos[MAXFNAMELEN];
|
||||
char *slot_name;
|
||||
bool reserve_wal = false;
|
||||
bool export_snapshot = true;
|
||||
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
|
||||
DestReceiver *dest;
|
||||
TupOutputState *tstate;
|
||||
TupleDesc tupdesc;
|
||||
@@ -808,7 +819,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
|
||||
Assert(!MyReplicationSlot);
|
||||
|
||||
parseCreateReplSlotOptions(cmd, &reserve_wal, &export_snapshot);
|
||||
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
|
||||
|
||||
/* setup state for XLogReadPage */
|
||||
sendTimeLineIsHistoric = false;
|
||||
@@ -838,6 +849,40 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
{
|
||||
LogicalDecodingContext *ctx;
|
||||
|
||||
/*
|
||||
* Do options check early so that we can bail before calling the
|
||||
* DecodingContextFindStartpoint which can take long time.
|
||||
*/
|
||||
if (snapshot_action == CRS_EXPORT_SNAPSHOT)
|
||||
{
|
||||
if (IsTransactionBlock())
|
||||
ereport(ERROR,
|
||||
(errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
|
||||
"must not be called inside a transaction")));
|
||||
}
|
||||
else if (snapshot_action == CRS_USE_SNAPSHOT)
|
||||
{
|
||||
if (!IsTransactionBlock())
|
||||
ereport(ERROR,
|
||||
(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
|
||||
"must be called inside a transaction")));
|
||||
|
||||
if (XactIsoLevel != XACT_REPEATABLE_READ)
|
||||
ereport(ERROR,
|
||||
(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
|
||||
"must be called in REPEATABLE READ isolation mode transaction")));
|
||||
|
||||
if (FirstSnapshotSet)
|
||||
ereport(ERROR,
|
||||
(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
|
||||
"must be called before any query")));
|
||||
|
||||
if (IsSubTransaction())
|
||||
ereport(ERROR,
|
||||
(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
|
||||
"must not be called in a subtransaction")));
|
||||
}
|
||||
|
||||
ctx = CreateInitDecodingContext(cmd->plugin, NIL,
|
||||
logical_read_xlog_page,
|
||||
WalSndPrepareWrite, WalSndWriteData);
|
||||
@@ -855,13 +900,22 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
DecodingContextFindStartpoint(ctx);
|
||||
|
||||
/*
|
||||
* Export the snapshot if we've been asked to do so.
|
||||
* Export or use the snapshot if we've been asked to do so.
|
||||
*
|
||||
* NB. We will convert the snapbuild.c kind of snapshot to normal
|
||||
* snapshot when doing this.
|
||||
*/
|
||||
if (export_snapshot)
|
||||
if (snapshot_action == CRS_EXPORT_SNAPSHOT)
|
||||
{
|
||||
snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
|
||||
}
|
||||
else if (snapshot_action == CRS_USE_SNAPSHOT)
|
||||
{
|
||||
Snapshot snap;
|
||||
|
||||
snap = SnapBuildInitalSnapshot(ctx->snapshot_builder);
|
||||
RestoreTransactionSnapshot(snap, MyProc);
|
||||
}
|
||||
|
||||
/* don't need the decoding context anymore */
|
||||
FreeDecodingContext(ctx);
|
||||
@@ -1277,8 +1331,11 @@ WalSndWaitForWal(XLogRecPtr loc)
|
||||
|
||||
/*
|
||||
* Execute an incoming replication command.
|
||||
*
|
||||
* Returns true if the cmd_string was recognized as WalSender command, false
|
||||
* if not.
|
||||
*/
|
||||
void
|
||||
bool
|
||||
exec_replication_command(const char *cmd_string)
|
||||
{
|
||||
int parse_rc;
|
||||
@@ -1317,6 +1374,25 @@ exec_replication_command(const char *cmd_string)
|
||||
|
||||
cmd_node = replication_parse_result;
|
||||
|
||||
/*
|
||||
* CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
|
||||
* called outside of transaction the snapshot should be cleared here.
|
||||
*/
|
||||
if (!IsTransactionBlock())
|
||||
SnapBuildClearExportedSnapshot();
|
||||
|
||||
/*
|
||||
* For aborted transactions, don't allow anything except pure SQL,
|
||||
* the exec_simple_query() will handle it correctly.
|
||||
*/
|
||||
if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
|
||||
errmsg("current transaction is aborted, "
|
||||
"commands ignored until end of transaction block")));
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* Allocate buffers that will be used for each outgoing and incoming
|
||||
* message. We do this just once per command to reduce palloc overhead.
|
||||
@@ -1332,6 +1408,7 @@ exec_replication_command(const char *cmd_string)
|
||||
break;
|
||||
|
||||
case T_BaseBackupCmd:
|
||||
PreventTransactionChain(true, "BASE_BACKUP");
|
||||
SendBaseBackup((BaseBackupCmd *) cmd_node);
|
||||
break;
|
||||
|
||||
@@ -1347,6 +1424,8 @@ exec_replication_command(const char *cmd_string)
|
||||
{
|
||||
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
|
||||
|
||||
PreventTransactionChain(true, "START_REPLICATION");
|
||||
|
||||
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
|
||||
StartReplication(cmd);
|
||||
else
|
||||
@@ -1355,6 +1434,7 @@ exec_replication_command(const char *cmd_string)
|
||||
}
|
||||
|
||||
case T_TimeLineHistoryCmd:
|
||||
PreventTransactionChain(true, "TIMELINE_HISTORY");
|
||||
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
|
||||
break;
|
||||
|
||||
@@ -1367,6 +1447,14 @@ exec_replication_command(const char *cmd_string)
|
||||
}
|
||||
break;
|
||||
|
||||
case T_SQLCmd:
|
||||
if (MyDatabaseId == InvalidOid)
|
||||
ereport(ERROR,
|
||||
(errmsg("not connected to database")));
|
||||
|
||||
/* Tell the caller that this wasn't a WalSender command. */
|
||||
return false;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unrecognized replication command node tag: %u",
|
||||
cmd_node->type);
|
||||
@@ -1378,6 +1466,8 @@ exec_replication_command(const char *cmd_string)
|
||||
|
||||
/* Send CommandComplete message */
|
||||
EndCommand("SELECT", DestRemote);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user