1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Logical replication

- Add PUBLICATION catalogs and DDL
- Add SUBSCRIPTION catalog and DDL
- Define logical replication protocol and output plugin
- Add logical replication workers

From: Petr Jelinek <petr@2ndquadrant.com>
Reviewed-by: Steve Singer <steve@ssinger.info>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Erik Rijkers <er@xs4all.nl>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
This commit is contained in:
Peter Eisentraut
2017-01-19 12:00:00 -05:00
parent ba61a04bc7
commit 665d1fad99
119 changed files with 13354 additions and 95 deletions

View File

@@ -24,9 +24,11 @@
#include "access/xlog.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logicalproto.h"
#include "replication/walreceiver.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
PG_MODULE_MAGIC;
@@ -44,26 +46,35 @@ struct WalReceiverConn
/* Prototypes for interface functions */
static WalReceiverConn *libpqrcv_connect(const char *conninfo,
bool logical, const char *appname);
bool logical, const char *appname,
char **err);
static void libpqrcv_check_conninfo(const char *conninfo);
static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
static char *libpqrcv_identify_system(WalReceiverConn *conn,
TimeLineID *primary_tli);
TimeLineID *primary_tli,
int *server_version);
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
TimeLineID tli, char **filename,
char **content, int *len);
static bool libpqrcv_startstreaming(WalReceiverConn *conn,
TimeLineID tli, XLogRecPtr startpoint,
const char *slotname);
const WalRcvStreamOptions *options);
static void libpqrcv_endstreaming(WalReceiverConn *conn,
TimeLineID *next_tli);
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
pgsocket *wait_fd);
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
int nbytes);
static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname,
bool temporary,
XLogRecPtr *lsn);
static bool libpqrcv_command(WalReceiverConn *conn,
const char *cmd, char **err);
static void libpqrcv_disconnect(WalReceiverConn *conn);
static WalReceiverFunctionsType PQWalReceiverFunctions = {
libpqrcv_connect,
libpqrcv_check_conninfo,
libpqrcv_get_conninfo,
libpqrcv_identify_system,
libpqrcv_readtimelinehistoryfile,
@@ -71,11 +82,14 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
libpqrcv_endstreaming,
libpqrcv_receive,
libpqrcv_send,
libpqrcv_create_slot,
libpqrcv_command,
libpqrcv_disconnect
};
/* Prototypes for private functions */
static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
/*
* Module initialization function
@@ -90,9 +104,12 @@ _PG_init(void)
/*
* Establish the connection to the primary server for XLOG streaming
*
* Returns NULL on error and fills the err with palloc'ed error message.
*/
static WalReceiverConn *
libpqrcv_connect(const char *conninfo, bool logical, const char *appname)
libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
char **err)
{
WalReceiverConn *conn;
const char *keys[5];
@@ -123,14 +140,34 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname)
conn = palloc0(sizeof(WalReceiverConn));
conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
if (PQstatus(conn->streamConn) != CONNECTION_OK)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s",
PQerrorMessage(conn->streamConn))));
{
*err = pstrdup(PQerrorMessage(conn->streamConn));
return NULL;
}
conn->logical = logical;
return conn;
}
/*
* Validate connection info string (just try to parse it)
*/
static void
libpqrcv_check_conninfo(const char *conninfo)
{
PQconninfoOption *opts = NULL;
char *err = NULL;
opts = PQconninfoParse(conninfo, &err);
if (opts == NULL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("invalid connection string syntax: %s", err)));
PQconninfoFree(opts);
}
/*
* Return a user-displayable conninfo string. Any security-sensitive fields
* are obfuscated.
@@ -185,7 +222,8 @@ libpqrcv_get_conninfo(WalReceiverConn *conn)
* timeline ID of the primary.
*/
static char *
libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli,
int *server_version)
{
PGresult *res;
char *primary_sysid;
@@ -218,11 +256,13 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
*primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
PQclear(res);
*server_version = PQserverVersion(conn->streamConn);
return primary_sysid;
}
/*
* Start streaming WAL data from given startpoint and timeline.
* Start streaming WAL data from given streaming options.
*
* Returns true if we switched successfully to copy-both mode. False
* means the server received the command and executed it successfully, but
@@ -233,27 +273,54 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
*/
static bool
libpqrcv_startstreaming(WalReceiverConn *conn,
TimeLineID tli, XLogRecPtr startpoint,
const char *slotname)
const WalRcvStreamOptions *options)
{
StringInfoData cmd;
PGresult *res;
Assert(!conn->logical);
Assert(options->logical == conn->logical);
Assert(options->slotname || !options->logical);
initStringInfo(&cmd);
/* Start streaming from the point requested by startup process */
if (slotname != NULL)
appendStringInfo(&cmd,
"START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u",
slotname,
(uint32) (startpoint >> 32), (uint32) startpoint,
tli);
/* Build the command. */
appendStringInfoString(&cmd, "START_REPLICATION");
if (options->slotname != NULL)
appendStringInfo(&cmd, " SLOT \"%s\"",
options->slotname);
if (options->logical)
appendStringInfo(&cmd, " LOGICAL");
appendStringInfo(&cmd, " %X/%X",
(uint32) (options->startpoint >> 32),
(uint32) options->startpoint);
/*
* Additional options are different depending on if we are doing logical
* or physical replication.
*/
if (options->logical)
{
char *pubnames_str;
List *pubnames;
appendStringInfoString(&cmd, " (");
appendStringInfo(&cmd, "proto_version '%u'",
options->proto.logical.proto_version);
pubnames = options->proto.logical.publication_names;
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
appendStringInfo(&cmd, ", publication_names %s",
PQescapeLiteral(conn->streamConn, pubnames_str,
strlen(pubnames_str)));
appendStringInfoChar(&cmd, ')');
pfree(pubnames_str);
}
else
appendStringInfo(&cmd, "START_REPLICATION %X/%X TIMELINE %u",
(uint32) (startpoint >> 32), (uint32) startpoint,
tli);
appendStringInfo(&cmd, " TIMELINE %u",
options->proto.physical.startpointTLI);
/* Start streaming. */
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
@@ -577,3 +644,107 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
(errmsg("could not send data to WAL stream: %s",
PQerrorMessage(conn->streamConn))));
}
/*
* Create new replication slot.
* Returns the name of the exported snapshot for logical slot or NULL for
* physical slot.
*/
static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
bool temporary, XLogRecPtr *lsn)
{
PGresult *res;
StringInfoData cmd;
char *snapshot;
initStringInfo(&cmd);
appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname);
if (temporary)
appendStringInfo(&cmd, "TEMPORARY ");
if (conn->logical)
appendStringInfo(&cmd, "LOGICAL pgoutput");
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not create replication slot \"%s\": %s",
slotname, PQerrorMessage(conn->streamConn))));
}
*lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
CStringGetDatum(PQgetvalue(res, 0, 1))));
if (!PQgetisnull(res, 0, 2))
snapshot = pstrdup(PQgetvalue(res, 0, 2));
else
snapshot = NULL;
PQclear(res);
return snapshot;
}
/*
* Run command.
*
* Returns if the command has succeeded and fills the err with palloced
* error message if not.
*/
static bool
libpqrcv_command(WalReceiverConn *conn, const char *cmd, char **err)
{
PGresult *res;
res = libpqrcv_PQexec(conn->streamConn, cmd);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
PQclear(res);
*err = pstrdup(PQerrorMessage(conn->streamConn));
return false;
}
PQclear(res);
return true;
}
/*
* Given a List of strings, return it as single comma separated
* string, quoting identifiers as needed.
*
* This is essentially the reverse of SplitIdentifierString.
*
* The caller should free the result.
*/
static char *
stringlist_to_identifierstr(PGconn *conn, List *strings)
{
ListCell *lc;
StringInfoData res;
bool first = true;
initStringInfo(&res);
foreach (lc, strings)
{
char *val = strVal(lfirst(lc));
if (first)
first = false;
else
appendStringInfoChar(&res, ',');
appendStringInfoString(&res,
PQescapeIdentifier(conn, val, strlen(val)));
}
return res.data;
}

View File

@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = decode.o logical.o logicalfuncs.o message.o origin.o reorderbuffer.o \
snapbuild.o
OBJS = decode.o launcher.o logical.o logicalfuncs.o message.o origin.o \
proto.o relation.o reorderbuffer.o snapbuild.o worker.o
include $(top_srcdir)/src/backend/common.mk

View File

@@ -0,0 +1,759 @@
/*-------------------------------------------------------------------------
* launcher.c
* PostgreSQL logical replication worker launcher process
*
* Copyright (c) 2012-2016, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/logical/launcher.c
*
* NOTES
* This module contains the logical replication worker launcher which
* uses the background worker infrastructure to start the logical
* replication workers for every enabled subscription.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_subscription.h"
#include "libpq/pqsignal.h"
#include "postmaster/bgworker.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/slot.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/snapmgr.h"
/* max sleep time between cycles (3min) */
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
int max_logical_replication_workers = 4;
LogicalRepWorker *MyLogicalRepWorker = NULL;
typedef struct LogicalRepCtxStruct
{
/* Supervisor process. */
pid_t launcher_pid;
/* Background workers. */
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
} LogicalRepCtxStruct;
LogicalRepCtxStruct *LogicalRepCtx;
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
bool got_SIGTERM = false;
static bool on_commit_laucher_wakeup = false;
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
/*
* Load the list of subscriptions.
*
* Only the fields interesting for worker start/stop functions are filled for
* each subscription.
*/
static List *
get_subscription_list(void)
{
List *res = NIL;
Relation rel;
HeapScanDesc scan;
HeapTuple tup;
MemoryContext resultcxt;
/* This is the context that we will allocate our output data in */
resultcxt = CurrentMemoryContext;
/*
* Start a transaction so we can access pg_database, and get a snapshot.
* We don't have a use for the snapshot itself, but we're interested in
* the secondary effect that it sets RecentGlobalXmin. (This is critical
* for anything that reads heap pages, because HOT may decide to prune
* them even if the process doesn't attempt to modify any tuples.)
*/
StartTransactionCommand();
(void) GetTransactionSnapshot();
rel = heap_open(SubscriptionRelationId, AccessShareLock);
scan = heap_beginscan_catalog(rel, 0, NULL);
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
{
Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
Subscription *sub;
MemoryContext oldcxt;
/*
* Allocate our results in the caller's context, not the
* transaction's. We do this inside the loop, and restore the original
* context at the end, so that leaky things like heap_getnext() are
* not called in a potentially long-lived context.
*/
oldcxt = MemoryContextSwitchTo(resultcxt);
sub = (Subscription *) palloc(sizeof(Subscription));
sub->oid = HeapTupleGetOid(tup);
sub->dbid = subform->subdbid;
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
/* We don't fill fields we are not interested in. */
sub->conninfo = NULL;
sub->slotname = NULL;
sub->publications = NIL;
res = lappend(res, sub);
MemoryContextSwitchTo(oldcxt);
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
CommitTransactionCommand();
return res;
}
/*
* Wait for a background worker to start up and attach to the shmem context.
*
* This is like WaitForBackgroundWorkerStartup(), except that we wait for
* attaching, not just start and we also just exit if postmaster died.
*/
static bool
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
BackgroundWorkerHandle *handle)
{
BgwHandleStatus status;
int rc;
for (;;)
{
pid_t pid;
CHECK_FOR_INTERRUPTS();
status = GetBackgroundWorkerPid(handle, &pid);
/*
* Worker started and attached to our shmem. This check is safe
* because only laucher ever starts the workers, so nobody can steal
* the worker slot.
*/
if (status == BGWH_STARTED && worker->proc)
return true;
/* Worker didn't start or died before attaching to our shmem. */
if (status == BGWH_STOPPED)
return false;
/*
* We need timeout because we generaly don't get notified via latch
* about the worker attach.
*/
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_STARTUP);
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
ResetLatch(MyLatch);
}
return false;
}
/*
* Walks the workers array and searches for one that matches given
* subscription id.
*/
LogicalRepWorker *
logicalrep_worker_find(Oid subid)
{
int i;
LogicalRepWorker *res = NULL;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
if (w->subid == subid && w->proc && IsBackendPid(w->proc->pid))
{
res = w;
break;
}
}
return res;
}
/*
* Start new apply background worker.
*/
void
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
int slot;
LogicalRepWorker *worker = NULL;
ereport(LOG,
(errmsg("starting logical replication worker for subscription \"%s\"",
subname)));
/* Report this after the initial starting message for consistency. */
if (max_replication_slots == 0)
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("cannot start logical replication workers when max_replication_slots = 0")));
/*
* We need to do the modification of the shared memory under lock so that
* we have consistent view.
*/
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
/* Find unused worker slot. */
for (slot = 0; slot < max_logical_replication_workers; slot++)
{
if (!LogicalRepCtx->workers[slot].proc)
{
worker = &LogicalRepCtx->workers[slot];
break;
}
}
/* Bail if not found */
if (worker == NULL)
{
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of logical replication workers slots"),
errhint("You might need to increase max_logical_replication_workers.")));
return;
}
/* Prepare the worker info. */
memset(worker, 0, sizeof(LogicalRepWorker));
worker->dbid = dbid;
worker->userid = userid;
worker->subid = subid;
LWLockRelease(LogicalRepWorkerLock);
/* Register the new dynamic worker. */
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
bgw.bgw_main = ApplyWorkerMain;
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u", subid);
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = slot;
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background workers slots"),
errhint("You might need to increase max_worker_processes.")));
return;
}
/* Now wait until it attaches. */
WaitForReplicationWorkerAttach(worker, bgw_handle);
}
/*
* Stop the logical replication worker and wait until it detaches from the
* slot.
*
* The caller must hold LogicalRepLauncherLock to ensure that new workers are
* not being started during this function call.
*/
void
logicalrep_worker_stop(Oid subid)
{
LogicalRepWorker *worker;
Assert(LWLockHeldByMe(LogicalRepLauncherLock));
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid);
/* No worker, nothing to do. */
if (!worker)
{
LWLockRelease(LogicalRepWorkerLock);
return;
}
/*
* If we found worker but it does not have proc set it is starting up,
* wait for it to finish and then kill it.
*/
while (worker && !worker->proc)
{
int rc;
LWLockRelease(LogicalRepWorkerLock);
CHECK_FOR_INTERRUPTS();
/* Wait for signal. */
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_STARTUP);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
ResetLatch(&MyProc->procLatch);
/* Check if the worker has started. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid);
if (!worker || worker->proc)
break;
}
/* Now terminate the worker ... */
kill(worker->proc->pid, SIGTERM);
LWLockRelease(LogicalRepWorkerLock);
/* ... and wait for it to die. */
for (;;)
{
int rc;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
if (!worker->proc)
{
LWLockRelease(LogicalRepWorkerLock);
break;
}
LWLockRelease(LogicalRepWorkerLock);
CHECK_FOR_INTERRUPTS();
/* Wait for more work. */
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_SHUTDOWN);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
ResetLatch(&MyProc->procLatch);
}
}
/*
* Attach to a slot.
*/
void
logicalrep_worker_attach(int slot)
{
/* Block concurrent access. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
Assert(slot >= 0 && slot < max_logical_replication_workers);
MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
if (MyLogicalRepWorker->proc)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication worker slot %d already used by "
"another worker", slot)));
MyLogicalRepWorker->proc = MyProc;
before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
LWLockRelease(LogicalRepWorkerLock);
}
/*
* Detach the worker (cleans up the worker info).
*/
static void
logicalrep_worker_detach(void)
{
/* Block concurrent access. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
MyLogicalRepWorker->dbid = InvalidOid;
MyLogicalRepWorker->userid = InvalidOid;
MyLogicalRepWorker->subid = InvalidOid;
MyLogicalRepWorker->proc = NULL;
LWLockRelease(LogicalRepWorkerLock);
}
/*
* Cleanup function.
*
* Called on logical replication worker exit.
*/
static void
logicalrep_worker_onexit(int code, Datum arg)
{
logicalrep_worker_detach();
}
/* SIGTERM: set flag to exit at next convenient time */
void
logicalrep_worker_sigterm(SIGNAL_ARGS)
{
got_SIGTERM = true;
/* Waken anything waiting on the process latch */
SetLatch(MyLatch);
}
/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
*/
Size
ApplyLauncherShmemSize(void)
{
Size size;
/*
* Need the fixed struct and the array of LogicalRepWorker.
*/
size = sizeof(LogicalRepCtxStruct);
size = MAXALIGN(size);
size = add_size(size, mul_size(max_logical_replication_workers,
sizeof(LogicalRepWorker)));
return size;
}
void
ApplyLauncherRegister(void)
{
BackgroundWorker bgw;
if (max_logical_replication_workers == 0)
return;
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
bgw.bgw_main = ApplyLauncherMain;
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication launcher");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
/*
* ApplyLauncherShmemInit
* Allocate and initialize replication launcher shared memory
*/
void
ApplyLauncherShmemInit(void)
{
bool found;
LogicalRepCtx = (LogicalRepCtxStruct *)
ShmemInitStruct("Logical Replication Launcher Data",
ApplyLauncherShmemSize(),
&found);
if (!found)
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
}
/*
* Wakeup the launcher on commit if requested.
*/
void
AtCommit_ApplyLauncher(void)
{
if (on_commit_laucher_wakeup)
ApplyLauncherWakeup();
}
/*
* Request wakeup of the launcher on commit of the transaction.
*
* This is used to send launcher signal to stop sleeping and proccess the
* subscriptions when current transaction commits. Should be used when new
* tuple was added to the pg_subscription catalog.
*/
void
ApplyLauncherWakeupAtCommit(void)
{
if (!on_commit_laucher_wakeup)
on_commit_laucher_wakeup = true;
}
void
ApplyLauncherWakeup(void)
{
if (IsBackendPid(LogicalRepCtx->launcher_pid))
kill(LogicalRepCtx->launcher_pid, SIGUSR1);
}
/*
* Main loop for the apply launcher process.
*/
void
ApplyLauncherMain(Datum main_arg)
{
ereport(LOG,
(errmsg("logical replication launcher started")));
/* Establish signal handlers. */
pqsignal(SIGTERM, logicalrep_worker_sigterm);
BackgroundWorkerUnblockSignals();
/* Make it easy to identify our processes. */
SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
PGC_USERSET, PGC_S_SESSION);
LogicalRepCtx->launcher_pid = MyProcPid;
/*
* Establish connection to nailed catalogs (we only ever access
* pg_subscription).
*/
BackgroundWorkerInitializeConnection(NULL, NULL);
/* Enter main loop */
while (!got_SIGTERM)
{
int rc;
List *sublist;
ListCell *lc;
MemoryContext subctx;
MemoryContext oldctx;
TimestampTz now;
TimestampTz last_start_time = 0;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
now = GetCurrentTimestamp();
/* Limit the start retry to once a wal_retrieve_retry_interval */
if (TimestampDifferenceExceeds(last_start_time, now,
wal_retrieve_retry_interval))
{
/* Use temporary context for the database list and worker info. */
subctx = AllocSetContextCreate(TopMemoryContext,
"Logical Replication Launcher sublist",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldctx = MemoryContextSwitchTo(subctx);
/* Block any concurrent DROP SUBSCRIPTION. */
LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
/* search for subscriptions to start or stop. */
sublist = get_subscription_list();
/* Start the missing workers for enabled subscriptions. */
foreach(lc, sublist)
{
Subscription *sub = (Subscription *) lfirst(lc);
LogicalRepWorker *w;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
w = logicalrep_worker_find(sub->oid);
LWLockRelease(LogicalRepWorkerLock);
if (sub->enabled && w == NULL)
{
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner);
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
/* Limit to one worker per mainloop cycle. */
break;
}
}
LWLockRelease(LogicalRepLauncherLock);
/* Switch back to original memory context. */
MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */
MemoryContextDelete(subctx);
}
else
{
/*
* The wait in previous cycle was interruped in less than
* wal_retrieve_retry_interval since last worker was started,
* this usually means crash of the worker, so we should retry
* in wal_retrieve_retry_interval again.
*/
wait_time = wal_retrieve_retry_interval;
}
/* Wait for more work. */
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
wait_time,
WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
ResetLatch(&MyProc->procLatch);
}
LogicalRepCtx->launcher_pid = 0;
/* ... and if it returns, we're done */
ereport(LOG,
(errmsg("logical replication launcher shutting down")));
proc_exit(0);
}
/*
* Returns state of the subscriptions.
*/
Datum
pg_stat_get_subscription(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_SUBSCRIPTION_COLS 7
Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
int i;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
/* Make sure we get consistent view of the workers. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
for (i = 0; i <= max_logical_replication_workers; i++)
{
/* for each row */
Datum values[PG_STAT_GET_SUBSCRIPTION_COLS];
bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
int worker_pid;
LogicalRepWorker worker;
memcpy(&worker, &LogicalRepCtx->workers[i],
sizeof(LogicalRepWorker));
if (!worker.proc || !IsBackendPid(worker.proc->pid))
continue;
if (OidIsValid(subid) && worker.subid != subid)
continue;
worker_pid = worker.proc->pid;
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
values[0] = ObjectIdGetDatum(worker.subid);
values[1] = Int32GetDatum(worker_pid);
if (XLogRecPtrIsInvalid(worker.last_lsn))
nulls[2] = true;
else
values[2] = LSNGetDatum(worker.last_lsn);
if (worker.last_send_time == 0)
nulls[3] = true;
else
values[3] = TimestampTzGetDatum(worker.last_send_time);
if (worker.last_recv_time == 0)
nulls[4] = true;
else
values[4] = TimestampTzGetDatum(worker.last_recv_time);
if (XLogRecPtrIsInvalid(worker.reply_lsn))
nulls[5] = true;
else
values[5] = LSNGetDatum(worker.reply_lsn);
if (worker.reply_time == 0)
nulls[6] = true;
else
values[6] = TimestampTzGetDatum(worker.reply_time);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
/* If only a single subscription was requested, and we found it, break. */
if (OidIsValid(subid))
break;
}
LWLockRelease(LogicalRepWorkerLock);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
return (Datum) 0;
}

View File

@@ -0,0 +1,637 @@
/*-------------------------------------------------------------------------
*
* proto.c
* logical replication protocol functions
*
* Copyright (c) 2015, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/logical/proto.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/sysattr.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "replication/logicalproto.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
/*
* Protocol message flags.
*/
#define LOGICALREP_IS_REPLICA_IDENTITY 1
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
HeapTuple tuple);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
static void logicalrep_write_namespace(StringInfo out, Oid nspid);
static const char *logicalrep_read_namespace(StringInfo in);
/*
* Write BEGIN to the output stream.
*/
void
logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
{
pq_sendbyte(out, 'B'); /* BEGIN */
/* fixed fields */
pq_sendint64(out, txn->final_lsn);
pq_sendint64(out, txn->commit_time);
pq_sendint(out, txn->xid, 4);
}
/*
* Read transaction BEGIN from the stream.
*/
void
logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
{
/* read fields */
begin_data->final_lsn = pq_getmsgint64(in);
if (begin_data->final_lsn == InvalidXLogRecPtr)
elog(ERROR, "final_lsn not set in begin message");
begin_data->committime = pq_getmsgint64(in);
begin_data->xid = pq_getmsgint(in, 4);
}
/*
* Write COMMIT to the output stream.
*/
void
logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
uint8 flags = 0;
pq_sendbyte(out, 'C'); /* sending COMMIT */
/* send the flags field (unused for now) */
pq_sendbyte(out, flags);
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
pq_sendint64(out, txn->commit_time);
}
/*
* Read transaction COMMIT from the stream.
*/
void
logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
{
/* read flags (unused for now) */
uint8 flags = pq_getmsgbyte(in);
if (flags != 0)
elog(ERROR, "unknown flags %u in commit message", flags);
/* read fields */
commit_data->commit_lsn = pq_getmsgint64(in);
commit_data->end_lsn = pq_getmsgint64(in);
commit_data->committime = pq_getmsgint64(in);
}
/*
* Write ORIGIN to the output stream.
*/
void
logicalrep_write_origin(StringInfo out, const char *origin,
XLogRecPtr origin_lsn)
{
pq_sendbyte(out, 'O'); /* ORIGIN */
/* fixed fields */
pq_sendint64(out, origin_lsn);
/* origin string */
pq_sendstring(out, origin);
}
/*
* Read ORIGIN from the output stream.
*/
char *
logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
{
/* fixed fields */
*origin_lsn = pq_getmsgint64(in);
/* return origin */
return pstrdup(pq_getmsgstring(in));
}
/*
* Write INSERT to the output stream.
*/
void
logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
{
pq_sendbyte(out, 'I'); /* action INSERT */
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
/* use Oid as relation identifier */
pq_sendint(out, RelationGetRelid(rel), 4);
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newtuple);
}
/*
* Read INSERT from stream.
*
* Fills the new tuple.
*/
LogicalRepRelId
logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
{
char action;
LogicalRepRelId relid;
/* read the relation id */
relid = pq_getmsgint(in, 4);
action = pq_getmsgbyte(in);
if (action != 'N')
elog(ERROR, "expected new tuple but got %d",
action);
logicalrep_read_tuple(in, newtup);
return relid;
}
/*
* Write UPDATE to the output stream.
*/
void
logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
HeapTuple newtuple)
{
pq_sendbyte(out, 'U'); /* action UPDATE */
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
/* use Oid as relation identifier */
pq_sendint(out, RelationGetRelid(rel), 4);
if (oldtuple != NULL)
{
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldtuple);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
logicalrep_write_tuple(out, rel, newtuple);
}
/*
* Read UPDATE from stream.
*/
LogicalRepRelId
logicalrep_read_update(StringInfo in, bool *has_oldtuple,
LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup)
{
char action;
LogicalRepRelId relid;
/* read the relation id */
relid = pq_getmsgint(in, 4);
/* read and verify action */
action = pq_getmsgbyte(in);
if (action != 'K' && action != 'O' && action != 'N')
elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
action);
/* check for old tuple */
if (action == 'K' || action == 'O')
{
logicalrep_read_tuple(in, oldtup);
*has_oldtuple = true;
action = pq_getmsgbyte(in);
}
else
*has_oldtuple = false;
/* check for new tuple */
if (action != 'N')
elog(ERROR, "expected action 'N', got %c",
action);
logicalrep_read_tuple(in, newtup);
return relid;
}
/*
* Write DELETE to the output stream.
*/
void
logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
pq_sendbyte(out, 'D'); /* action DELETE */
/* use Oid as relation identifier */
pq_sendint(out, RelationGetRelid(rel), 4);
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldtuple);
}
/*
* Read DELETE from stream.
*
* Fills the old tuple.
*/
LogicalRepRelId
logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
{
char action;
LogicalRepRelId relid;
/* read the relation id */
relid = pq_getmsgint(in, 4);
/* read and verify action */
action = pq_getmsgbyte(in);
if (action != 'K' && action != 'O')
elog(ERROR, "expected action 'O' or 'K', got %c", action);
logicalrep_read_tuple(in, oldtup);
return relid;
}
/*
* Write relation description to the output stream.
*/
void
logicalrep_write_rel(StringInfo out, Relation rel)
{
char *relname;
pq_sendbyte(out, 'R'); /* sending RELATION */
/* use Oid as relation identifier */
pq_sendint(out, RelationGetRelid(rel), 4);
/* send qualified relation name */
logicalrep_write_namespace(out, RelationGetNamespace(rel));
relname = RelationGetRelationName(rel);
pq_sendstring(out, relname);
/* send replica identity */
pq_sendbyte(out, rel->rd_rel->relreplident);
/* send the attribute info */
logicalrep_write_attrs(out, rel);
}
/*
* Read the relation info from stream and return as LogicalRepRelation.
*/
LogicalRepRelation *
logicalrep_read_rel(StringInfo in)
{
LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
rel->remoteid = pq_getmsgint(in, 4);
/* Read relation name from stream */
rel->nspname = pstrdup(logicalrep_read_namespace(in));
rel->relname = pstrdup(pq_getmsgstring(in));
/* Read the replica identity. */
rel->replident = pq_getmsgbyte(in);
/* Get attribute description */
logicalrep_read_attrs(in, rel);
return rel;
}
/*
* Write type info to the output stream.
*
* This function will always write base type info.
*/
void
logicalrep_write_typ(StringInfo out, Oid typoid)
{
Oid basetypoid = getBaseType(typoid);
HeapTuple tup;
Form_pg_type typtup;
pq_sendbyte(out, 'Y'); /* sending TYPE */
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for type %u", basetypoid);
typtup = (Form_pg_type) GETSTRUCT(tup);
/* use Oid as relation identifier */
pq_sendint(out, typoid, 4);
/* send qualified type name */
logicalrep_write_namespace(out, typtup->typnamespace);
pq_sendstring(out, NameStr(typtup->typname));
ReleaseSysCache(tup);
}
/*
* Read type info from the output stream.
*/
void
logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
{
ltyp->remoteid = pq_getmsgint(in, 4);
/* Read tupe name from stream */
ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
ltyp->typname = pstrdup(pq_getmsgstring(in));
}
/*
* Write a tuple to the outputstream, in the most efficient format possible.
*/
static void
logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
{
TupleDesc desc;
Datum values[MaxTupleAttributeNumber];
bool isnull[MaxTupleAttributeNumber];
int i;
uint16 nliveatts = 0;
desc = RelationGetDescr(rel);
for (i = 0; i < desc->natts; i++)
{
if (desc->attrs[i]->attisdropped)
continue;
nliveatts++;
}
pq_sendint(out, nliveatts, 2);
/* try to allocate enough memory from the get-go */
enlargeStringInfo(out, tuple->t_len +
nliveatts * (1 + 4));
heap_deform_tuple(tuple, desc, values, isnull);
/* Write the values */
for (i = 0; i < desc->natts; i++)
{
HeapTuple typtup;
Form_pg_type typclass;
Form_pg_attribute att = desc->attrs[i];
char *outputstr;
int len;
/* skip dropped columns */
if (att->attisdropped)
continue;
if (isnull[i])
{
pq_sendbyte(out, 'n'); /* null column */
continue;
}
else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
{
pq_sendbyte(out, 'u'); /* unchanged toast column */
continue;
}
typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
if (!HeapTupleIsValid(typtup))
elog(ERROR, "cache lookup failed for type %u", att->atttypid);
typclass = (Form_pg_type) GETSTRUCT(typtup);
pq_sendbyte(out, 't'); /* 'text' data follows */
outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
len = strlen(outputstr) + 1; /* null terminated */
pq_sendint(out, len, 4); /* length */
appendBinaryStringInfo(out, outputstr, len); /* data */
pfree(outputstr);
ReleaseSysCache(typtup);
}
}
/*
* Read tuple in remote format from stream.
*
* The returned tuple points into the input stringinfo.
*/
static void
logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
{
int i;
int natts;
/* Get of attributes. */
natts = pq_getmsgint(in, 2);
memset(tuple->changed, 0, sizeof(tuple->changed));
/* Read the data */
for (i = 0; i < natts; i++)
{
char kind;
int len;
kind = pq_getmsgbyte(in);
switch (kind)
{
case 'n': /* null */
tuple->values[i] = NULL;
tuple->changed[i] = true;
break;
case 'u': /* unchanged column */
tuple->values[i] = (char *) 0xdeadbeef; /* make bad usage more obvious */
break;
case 't': /* text formatted value */
{
tuple->changed[i] = true;
len = pq_getmsgint(in, 4); /* read length */
/* and data */
tuple->values[i] = (char *) pq_getmsgbytes(in, len);
}
break;
default:
elog(ERROR, "unknown data representation type '%c'", kind);
}
}
}
/*
* Write relation attributes to the stream.
*/
static void
logicalrep_write_attrs(StringInfo out, Relation rel)
{
TupleDesc desc;
int i;
uint16 nliveatts = 0;
Bitmapset *idattrs = NULL;
bool replidentfull;
desc = RelationGetDescr(rel);
/* send number of live attributes */
for (i = 0; i < desc->natts; i++)
{
if (desc->attrs[i]->attisdropped)
continue;
nliveatts++;
}
pq_sendint(out, nliveatts, 2);
/* fetch bitmap of REPLICATION IDENTITY attributes */
replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
if (!replidentfull)
idattrs = RelationGetIndexAttrBitmap(rel,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
/* send the attributes */
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = desc->attrs[i];
uint8 flags = 0;
if (att->attisdropped)
continue;
/* REPLICA IDENTITY FULL means all colums are sent as part of key. */
if (replidentfull ||
bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
idattrs))
flags |= LOGICALREP_IS_REPLICA_IDENTITY;
pq_sendbyte(out, flags);
/* attribute name */
pq_sendstring(out, NameStr(att->attname));
/* attribute type id */
pq_sendint(out, (int) att->atttypid, sizeof(att->atttypid));
/* attribute mode */
pq_sendint(out, att->atttypmod, sizeof(att->atttypmod));
}
bms_free(idattrs);
}
/*
* Read relation attribute names from the stream.
*/
static void
logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
{
int i;
int natts;
char **attnames;
Oid *atttyps;
Bitmapset *attkeys = NULL;
natts = pq_getmsgint(in, 2);
attnames = palloc(natts * sizeof(char *));
atttyps = palloc(natts * sizeof(Oid));
/* read the attributes */
for (i = 0; i < natts; i++)
{
uint8 flags;
/* Check for replica identity column */
flags = pq_getmsgbyte(in);
if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
attkeys = bms_add_member(attkeys, i);
/* attribute name */
attnames[i] = pstrdup(pq_getmsgstring(in));
/* attribute type id */
atttyps[i] = (Oid) pq_getmsgint(in, 4);
/* we ignore attribute mode for now */
(void) pq_getmsgint(in, 4);
}
rel->attnames = attnames;
rel->atttyps = atttyps;
rel->attkeys = attkeys;
rel->natts = natts;
}
/*
* Write the namespace name or empty string for pg_catalog (to save space).
*/
static void
logicalrep_write_namespace(StringInfo out, Oid nspid)
{
if (nspid == PG_CATALOG_NAMESPACE)
pq_sendbyte(out, '\0');
else
{
char *nspname = get_namespace_name(nspid);
if (nspname == NULL)
elog(ERROR, "cache lookup failed for namespace %u",
nspid);
pq_sendstring(out, nspname);
}
}
/*
* Read the namespace name while treating empty string as pg_catalog.
*/
static const char *
logicalrep_read_namespace(StringInfo in)
{
const char *nspname = pq_getmsgstring(in);
if (nspname[0] == '\0')
nspname = "pg_catalog";
return nspname;
}

View File

@@ -0,0 +1,489 @@
/*-------------------------------------------------------------------------
* relation.c
* PostgreSQL logical replication
*
* Copyright (c) 2012-2016, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/logical/relation.c
*
* NOTES
* This file contains helper functions for logical replication relation
* mapping cache.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/sysattr.h"
#include "catalog/namespace.h"
#include "nodes/makefuncs.h"
#include "replication/logicalrelation.h"
#include "replication/worker_internal.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
uint32 hashvalue);
/*
* Relcache invalidation callback for our relation map cache.
*/
static void
logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
{
LogicalRepRelMapEntry *entry;
/* Just to be sure. */
if (LogicalRepRelMap == NULL)
return;
if (reloid != InvalidOid)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepRelMap);
/* TODO, use inverse lookup hashtable? */
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
{
if (entry->localreloid == reloid)
{
entry->localreloid = InvalidOid;
hash_seq_term(&status);
break;
}
}
}
else
{
/* invalidate all cache entries */
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepRelMap);
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
entry->localreloid = InvalidOid;
}
}
/*
* Initialize the relation map cache.
*/
static void
logicalrep_relmap_init()
{
HASHCTL ctl;
if (!LogicalRepRelMapContext)
LogicalRepRelMapContext =
AllocSetContextCreate(CacheMemoryContext,
"LogicalRepRelMapContext",
ALLOCSET_DEFAULT_SIZES);
/* Initialize the relation hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(LogicalRepRelId);
ctl.entrysize = sizeof(LogicalRepRelMapEntry);
ctl.hcxt = LogicalRepRelMapContext;
LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Initialize the type hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(LogicalRepTyp);
ctl.hcxt = LogicalRepRelMapContext;
/* This will usually be small. */
LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
HASH_ELEM | HASH_BLOBS |HASH_CONTEXT);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
(Datum) 0);
CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
(Datum) 0);
}
/*
* Free the entry of a relation map cache.
*/
static void
logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
{
LogicalRepRelation *remoterel;
remoterel = &entry->remoterel;
pfree(remoterel->nspname);
pfree(remoterel->relname);
if (remoterel->natts > 0)
{
int i;
for (i = 0; i < remoterel->natts; i++)
pfree(remoterel->attnames[i]);
pfree(remoterel->attnames);
pfree(remoterel->atttyps);
}
remoterel->attnames = NULL;
remoterel->atttyps = NULL;
bms_free(remoterel->attkeys);
remoterel->attkeys = NULL;
if (entry->attrmap)
pfree(entry->attrmap);
entry->attrmap = NULL;
remoterel->natts = 0;
entry->localreloid = InvalidOid;
entry->localrel = NULL;
}
/*
* Add new entry or update existing entry in the relation map cache.
*
* Called when new relation mapping is sent by the publisher to update
* our expected view of incoming data from said publisher.
*/
void
logicalrep_relmap_update(LogicalRepRelation *remoterel)
{
MemoryContext oldctx;
LogicalRepRelMapEntry *entry;
bool found;
int i;
if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();
/*
* HASH_ENTER returns the existing entry if present or creates a new one.
*/
entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
HASH_ENTER, &found);
if (found)
logicalrep_relmap_free_entry(entry);
/* Make cached copy of the data */
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->remoterel.remoteid = remoterel->remoteid;
entry->remoterel.nspname = pstrdup(remoterel->nspname);
entry->remoterel.relname = pstrdup(remoterel->relname);
entry->remoterel.natts = remoterel->natts;
entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
for (i = 0; i < remoterel->natts; i++)
{
entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
}
entry->remoterel.replident = remoterel->replident;
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
entry->attrmap = NULL;
entry->localreloid = InvalidOid;
MemoryContextSwitchTo(oldctx);
}
/*
* Find attribute index in TupleDesc struct by attribute name.
*
* Returns -1 if not found.
*/
static int
logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
{
int i;
for (i = 0; i < remoterel->natts; i++)
{
if (strcmp(remoterel->attnames[i], attname) == 0)
return i;
}
return -1;
}
/*
* Open the local relation associated with the remote one.
*
* Optionally rebuilds the Relcache mapping if it was invalidated
* by local DDL.
*/
LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
{
LogicalRepRelMapEntry *entry;
bool found;
if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();
/* Search for existing entry. */
entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
HASH_FIND, &found);
if (!found)
elog(ERROR, "no relation map entry for remote relation ID %u",
remoteid);
/* Need to update the local cache? */
if (!OidIsValid(entry->localreloid))
{
Oid relid;
int i;
int found;
Bitmapset *idkey;
TupleDesc desc;
LogicalRepRelation *remoterel;
MemoryContext oldctx;
remoterel = &entry->remoterel;
/* Try to find and lock the relation by name. */
relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
remoterel->relname, -1),
lockmode, true);
if (!OidIsValid(relid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" does not exist",
remoterel->nspname, remoterel->relname)));
entry->localrel = heap_open(relid, NoLock);
/*
* We currently only support writing to regular and partitioned
* tables.
*/
if (entry->localrel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("logical replication target relation \"%s.%s\" is not a table",
remoterel->nspname, remoterel->relname)));
/*
* Build the mapping of local attribute numbers to remote attribute
* numbers and validate that we don't miss any replicated columns
* as that would result in potentially unwanted data loss.
*/
desc = RelationGetDescr(entry->localrel);
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->attrmap = palloc(desc->natts * sizeof(int));
MemoryContextSwitchTo(oldctx);
found = 0;
for (i = 0; i < desc->natts; i++)
{
int attnum = logicalrep_rel_att_by_name(remoterel,
NameStr(desc->attrs[i]->attname));
entry->attrmap[i] = attnum;
if (attnum >= 0)
found++;
}
/* TODO, detail message with names of missing columns */
if (found < remoterel->natts)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" is missing "
"some replicated columns",
remoterel->nspname, remoterel->relname)));
/*
* Check that replica identity matches. We allow for stricter replica
* identity (fewer columns) on subscriber as that will not stop us
* from finding unique tuple. IE, if publisher has identity
* (id,timestamp) and subscriber just (id) this will not be a problem,
* but in the opposite scenario it will.
*
* Don't throw any error here just mark the relation entry as not
* updatable, as replica identity is only for updates and deletes
* but inserts can be replicated even without it.
*/
entry->updatable = true;
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
/* fallback to PK if no replica identity */
if (idkey == NULL)
{
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_PRIMARY_KEY);
/*
* If no replica identity index and no PK, the published table
* must have replica identity FULL.
*/
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
entry->updatable = false;
}
i = -1;
while ((i = bms_next_member(idkey, i)) >= 0)
{
int attnum = i + FirstLowInvalidHeapAttributeNumber;
if (!AttrNumberIsForUserDefinedAttr(attnum))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" uses "
"system columns in REPLICA IDENTITY index",
remoterel->nspname, remoterel->relname)));
attnum = AttrNumberGetAttrOffset(attnum);
if (!bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
{
entry->updatable = false;
break;
}
}
entry->localreloid = relid;
}
else
entry->localrel = heap_open(entry->localreloid, lockmode);
return entry;
}
/*
* Close the previously opened logical relation.
*/
void
logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
{
heap_close(rel->localrel, lockmode);
rel->localrel = NULL;
}
/*
* Type cache invalidation callback for our type map cache.
*/
static void
logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
LogicalRepTyp *entry;
/* Just to be sure. */
if (LogicalRepTypMap == NULL)
return;
/* invalidate all cache entries */
hash_seq_init(&status, LogicalRepTypMap);
while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
entry->typoid = InvalidOid;
}
/*
* Free the type map cache entry data.
*/
static void
logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
entry->typoid = InvalidOid;
}
/*
* Add new entry or update existing entry in the type map cache.
*/
void
logicalrep_typmap_update(LogicalRepTyp *remotetyp)
{
MemoryContext oldctx;
LogicalRepTyp *entry;
bool found;
if (LogicalRepTypMap == NULL)
logicalrep_relmap_init();
/*
* HASH_ENTER returns the existing entry if present or creates a new one.
*/
entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
HASH_ENTER, &found);
if (found)
logicalrep_typmap_free_entry(entry);
/* Make cached copy of the data */
entry->remoteid = remotetyp->remoteid;
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
entry->typoid = InvalidOid;
}
/*
* Fetch type info from the cache.
*/
Oid
logicalrep_typmap_getid(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
Oid nspoid;
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)
{
if (!get_typisdefined(remoteid))
ereport(ERROR,
(errmsg("builtin type %u not found", remoteid),
errhint("This can be caused by having publisher with "
"higher major version than subscriber")));
return remoteid;
}
if (LogicalRepTypMap == NULL)
logicalrep_relmap_init();
/* Try finding the mapping. */
entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
HASH_FIND, &found);
if (!found)
elog(ERROR, "no type map entry for remote type %u",
remoteid);
/* Found and mapped, return the oid. */
if (OidIsValid(entry->typoid))
return entry->typoid;
/* Otherwise, try to map to local type. */
nspoid = LookupExplicitNamespace(entry->nspname, true);
if (OidIsValid(nspoid))
entry->typoid = GetSysCacheOid2(TYPENAMENSP,
PointerGetDatum(entry->typname),
ObjectIdGetDatum(nspoid));
else
entry->typoid = InvalidOid;
if (!OidIsValid(entry->typoid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("data type \"%s.%s\" required for logical replication does not exist",
entry->nspname, entry->typname)));
return entry->typoid;
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,32 @@
#-------------------------------------------------------------------------
#
# Makefile--
# Makefile for src/backend/replication/pgoutput
#
# IDENTIFICATION
# src/backend/replication/pgoutput
#
#-------------------------------------------------------------------------
subdir = src/backend/replication/pgoutput
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = pgoutput.o $(WIN32RES)
PGFILEDESC = "pgoutput - standard logical replication output plugin"
NAME = pgoutput
all: all-shared-lib
include $(top_srcdir)/src/Makefile.shlib
install: all installdirs install-lib
installdirs: installdirs-lib
uninstall: uninstall-lib
clean distclean maintainer-clean: clean-lib
rm -f $(OBJS)

View File

@@ -0,0 +1,596 @@
/*-------------------------------------------------------------------------
*
* pgoutput.c
* Logical Replication output plugin
*
* Copyright (c) 2012-2015, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/pgoutput/pgoutput.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_publication.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/int8.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
PG_MODULE_MAGIC;
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static void pgoutput_startup(LogicalDecodingContext * ctx,
OutputPluginOptions *opt, bool is_init);
static void pgoutput_shutdown(LogicalDecodingContext * ctx);
static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static bool publications_valid;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
/* Entry in the map used to remember which relation schemas we sent. */
typedef struct RelationSyncEntry
{
Oid relid; /* relation oid */
bool schema_sent; /* did we send the schema? */
bool replicate_valid;
PublicationActions pubactions;
} RelationSyncEntry;
/* Map used to remember which relation schemas we sent. */
static HTAB *RelationSyncCache = NULL;
static void init_rel_sync_cache(MemoryContext decoding_context);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
/*
* Specify output plugin callbacks
*/
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
cb->startup_cb = pgoutput_startup;
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
}
static void
parse_output_parameters(List *options, uint32 *protocol_version,
List **publication_names)
{
ListCell *lc;
bool protocol_version_given = false;
bool publication_names_given = false;
foreach(lc, options)
{
DefElem *defel = (DefElem *) lfirst(lc);
Assert(defel->arg == NULL || IsA(defel->arg, String));
/* Check each param, whether or not we recognise it */
if (strcmp(defel->defname, "proto_version") == 0)
{
int64 parsed;
if (protocol_version_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
protocol_version_given = true;
if (!scanint8(strVal(defel->arg), true, &parsed))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid proto_version")));
if (parsed > PG_UINT32_MAX || parsed < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("proto_verson \"%s\" out of range",
strVal(defel->arg))));
*protocol_version = (uint32) parsed;
}
else if (strcmp(defel->defname, "publication_names") == 0)
{
if (publication_names_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
publication_names_given = true;
if (!SplitIdentifierString(strVal(defel->arg), ',',
publication_names))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("invalid publication_names syntax")));
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
}
/*
* Initialize this plugin
*/
static void
pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
bool is_init)
{
PGOutputData *data = palloc0(sizeof(PGOutputData));
/* Create our memory context for private allocations. */
data->context = AllocSetContextCreate(ctx->context,
"logical replication output context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
ctx->output_plugin_private = data;
/* This plugin uses binary protocol. */
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
/*
* This is replication start and not slot initialization.
*
* Parse and validate options passed by the client.
*/
if (!is_init)
{
/* Parse the params and ERROR if we see any we don't recognise */
parse_output_parameters(ctx->output_plugin_options,
&data->protocol_version,
&data->publication_names);
/* Check if we support requested protol */
if (data->protocol_version != LOGICALREP_PROTO_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("client sent proto_version=%d but we only support protocol %d or lower",
data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("client sent proto_version=%d but we only support protocol %d or higher",
data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
if (list_length(data->publication_names) < 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("publication_names parameter missing")));
/* Init publication state. */
data->publications = NIL;
publications_valid = false;
CacheRegisterSyscacheCallback(PUBLICATIONOID,
publication_invalidation_cb,
(Datum) 0);
/* Initialize relation schema cache. */
init_rel_sync_cache(CacheMemoryContext);
}
}
/*
* BEGIN callback
*/
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin(ctx->out, txn);
if (send_replication_origin)
{
char *origin;
/* Message boundary */
OutputPluginWrite(ctx, false);
OutputPluginPrepareWrite(ctx, true);
/*
* XXX: which behaviour we want here?
*
* Alternatives:
* - don't send origin message if origin name not found
* (that's what we do now)
* - throw error - that will break replication, not good
* - send some special "unknown" origin
*/
if (replorigin_by_oid(txn->origin_id, true, &origin))
logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
}
OutputPluginWrite(ctx, true);
}
/*
* COMMIT callback
*/
static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
}
/*
* Sends the decoded DML over wire.
*/
static void
pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
/* First check the table filter */
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert)
return;
break;
case REORDER_BUFFER_CHANGE_UPDATE:
if (!relentry->pubactions.pubupdate)
return;
break;
case REORDER_BUFFER_CHANGE_DELETE:
if (!relentry->pubactions.pubdelete)
return;
break;
default:
Assert(false);
}
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
/*
* Write the relation schema if the current schema haven't been sent yet.
*/
if (!relentry->schema_sent)
{
TupleDesc desc;
int i;
desc = RelationGetDescr(relation);
/*
* Write out type info if needed. We do that only for user created
* types.
*/
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = desc->attrs[i];
if (att->attisdropped)
continue;
if (att->atttypid < FirstNormalObjectId)
continue;
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, relation);
OutputPluginWrite(ctx, false);
relentry->schema_sent = true;
}
/* Send the data */
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation,
&change->data.tp.newtuple->tuple);
OutputPluginWrite(ctx, true);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
{
HeapTuple oldtuple = change->data.tp.oldtuple ?
&change->data.tp.oldtuple->tuple : NULL;
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple,
&change->data.tp.newtuple->tuple);
OutputPluginWrite(ctx, true);
break;
}
case REORDER_BUFFER_CHANGE_DELETE:
if (change->data.tp.oldtuple)
{
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_delete(ctx->out, relation,
&change->data.tp.oldtuple->tuple);
OutputPluginWrite(ctx, true);
}
else
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
break;
default:
Assert(false);
}
/* Cleanup */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/*
* Currently we always forward.
*/
static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
return false;
}
/*
* Shutdown the output plugin.
*
* Note, we don't need to clean the data->context as it's child context
* of the ctx->context so it will be cleaned up by logical decoding machinery.
*/
static void
pgoutput_shutdown(LogicalDecodingContext * ctx)
{
if (RelationSyncCache)
{
hash_destroy(RelationSyncCache);
RelationSyncCache = NULL;
}
}
/*
* Load publications from the list of publication names.
*/
static List *
LoadPublications(List *pubnames)
{
List *result = NIL;
ListCell *lc;
foreach (lc, pubnames)
{
char *pubname = (char *) lfirst(lc);
Publication *pub = GetPublicationByName(pubname, false);
result = lappend(result, pub);
}
return result;
}
/*
* Publication cache invalidation callback.
*/
static void
publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
{
publications_valid = false;
/*
* Also invalidate per-relation cache so that next time the filtering
* info is checked it will be updated with the new publication
* settings.
*/
rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
}
/*
* Initialize the relation schema sync cache for a decoding session.
*
* The hash table is destoyed at the end of a decoding session. While
* relcache invalidations still exist and will still be invoked, they
* will just see the null hash table global and take no action.
*/
static void
init_rel_sync_cache(MemoryContext cachectx)
{
HASHCTL ctl;
MemoryContext old_ctxt;
if (RelationSyncCache != NULL)
return;
/* Make a new hash table for the cache */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(RelationSyncEntry);
ctl.hcxt = cachectx;
old_ctxt = MemoryContextSwitchTo(cachectx);
RelationSyncCache = hash_create("logical replication output relation cache",
128, &ctl,
HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
(void) MemoryContextSwitchTo(old_ctxt);
Assert(RelationSyncCache != NULL);
CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
rel_sync_cache_publication_cb,
(Datum) 0);
}
/*
* Find or create entry in the relation schema cache.
*/
static RelationSyncEntry *
get_rel_sync_entry(PGOutputData *data, Oid relid)
{
RelationSyncEntry *entry;
bool found;
MemoryContext oldctx;
Assert(RelationSyncCache != NULL);
/* Find cached function info, creating if not found */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
(void *) &relid,
HASH_ENTER, &found);
MemoryContextSwitchTo(oldctx);
Assert(entry != NULL);
/* Not found means schema wasn't sent */
if (!found || !entry->replicate_valid)
{
List *pubids = GetRelationPublications(relid);
ListCell *lc;
/* Reload publications if needed before use. */
if (!publications_valid)
{
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
if (data->publications)
list_free_deep(data->publications);
data->publications = LoadPublications(data->publication_names);
MemoryContextSwitchTo(oldctx);
publications_valid = true;
}
/*
* Build publication cache. We can't use one provided by relcache
* as relcache considers all publications given relation is in, but
* here we only need to consider ones that the subscriber requested.
*/
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = false;
foreach(lc, data->publications)
{
Publication *pub = lfirst(lc);
if (pub->alltables || list_member_oid(pubids, pub->oid))
{
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
}
if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
entry->pubactions.pubdelete)
break;
}
list_free(pubids);
entry->replicate_valid = true;
}
if (!found)
entry->schema_sent = false;
return entry;
}
/*
* Relcache invalidation callback
*/
static void
rel_sync_cache_relation_cb(Datum arg, Oid relid)
{
RelationSyncEntry *entry;
/*
* We can get here if the plugin was used in SQL interface as the
* RelSchemaSyncCache is detroyed when the decoding finishes, but there
* is no way to unregister the relcache invalidation callback.
*/
if (RelationSyncCache == NULL)
return;
/*
* Nobody keeps pointers to entries in this hash table around outside
* logical decoding callback calls - but invalidation events can come in
* *during* a callback if we access the relcache in the callback. Because
* of that we must mark the cache entry as invalid but not remove it from
* the hash while it could still be referenced, then prune it at a later
* safe point.
*
* Getting invalidations for relations that aren't in the table is
* entirely normal, since there's no way to unregister for an
* invalidation event. So we don't care if it's found or not.
*/
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
HASH_FIND, NULL);
/*
* Reset schema sent status as the relation definition may have
* changed.
*/
if (entry != NULL)
entry->schema_sent = false;
}
/*
* Publication relation map syscache invalidation callback
*/
static void
rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
RelationSyncEntry *entry;
/*
* We can get here if the plugin was used in SQL interface as the
* RelSchemaSyncCache is detroyed when the decoding finishes, but there
* is no way to unregister the relcache invalidation callback.
*/
if (RelationSyncCache == NULL)
return;
/*
* There is no way to find which entry in our cache the hash belongs to
* so mark the whole cache as invalid.
*/
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
entry->replicate_valid = false;
}

View File

@@ -196,6 +196,7 @@ WalReceiverMain(void)
WalRcvData *walrcv = WalRcv;
TimestampTz last_recv_timestamp;
bool ping_sent;
char *err;
/*
* WalRcv should be set up already (if we are a backend, we inherit this
@@ -293,7 +294,10 @@ WalReceiverMain(void)
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
wrconn = walrcv_connect(conninfo, false, "walreceiver");
wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
if (!wrconn)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s", err)));
DisableWalRcvImmediateExit();
/*
@@ -316,13 +320,16 @@ WalReceiverMain(void)
{
char *primary_sysid;
char standby_sysid[32];
int server_version;
WalRcvStreamOptions options;
/*
* Check that we're connected to a valid server using the
* IDENTIFY_SYSTEM replication command,
* IDENTIFY_SYSTEM replication command.
*/
EnableWalRcvImmediateExit();
primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
&server_version);
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
GetSystemIdentifier());
@@ -368,9 +375,12 @@ WalReceiverMain(void)
* history file, bump recovery target timeline, and ask us to restart
* on the new timeline.
*/
options.logical = false;
options.startpoint = startpoint;
options.slotname = slotname[0] != '\0' ? slotname : NULL;
options.proto.physical.startpointTLI = startpointTLI;
ThisTimeLineID = startpointTLI;
if (walrcv_startstreaming(wrconn, startpointTLI, startpoint,
slotname[0] != '\0' ? slotname : NULL))
if (walrcv_startstreaming(wrconn, &options))
{
if (first_stream)
ereport(LOG,