1
0
mirror of https://github.com/postgres/postgres.git synced 2025-12-06 00:02:13 +03:00

Post-PG 10 beta1 pgindent run

perltidy run not included.
This commit is contained in:
Bruce Momjian
2017-05-17 16:31:56 -04:00
parent 8a94332478
commit a6fd7b7a5f
310 changed files with 3338 additions and 3171 deletions

View File

@@ -57,8 +57,8 @@
/* max sleep time between cycles (3min) */
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
@@ -68,7 +68,7 @@ typedef struct LogicalRepCtxStruct
pid_t launcher_pid;
/* Background workers. */
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
} LogicalRepCtxStruct;
LogicalRepCtxStruct *LogicalRepCtx;
@@ -83,9 +83,9 @@ static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t got_SIGTERM = false;
static bool on_commit_launcher_wakeup = false;
static bool on_commit_launcher_wakeup = false;
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
/*
@@ -122,8 +122,8 @@ get_subscription_list(void)
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
{
Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
Subscription *sub;
MemoryContext oldcxt;
Subscription *sub;
MemoryContext oldcxt;
/*
* Allocate our results in the caller's context, not the
@@ -224,15 +224,16 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
LogicalRepWorker *
logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
{
int i;
LogicalRepWorker *res = NULL;
int i;
LogicalRepWorker *res = NULL;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
if (w->in_use && w->subid == subid && w->relid == relid &&
(!only_running || w->proc))
{
@@ -251,17 +252,17 @@ void
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
Oid relid)
{
BackgroundWorker bgw;
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
int i;
int slot = 0;
LogicalRepWorker *worker = NULL;
int nsyncworkers;
TimestampTz now;
int i;
int slot = 0;
LogicalRepWorker *worker = NULL;
int nsyncworkers;
TimestampTz now;
ereport(LOG,
(errmsg("starting logical replication worker for subscription \"%s\"",
subname)));
(errmsg("starting logical replication worker for subscription \"%s\"",
subname)));
/* Report this after the initial starting message for consistency. */
if (max_replication_slots == 0)
@@ -300,7 +301,7 @@ retry:
*/
if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
{
bool did_cleanup = false;
bool did_cleanup = false;
for (i = 0; i < max_logical_replication_workers; i++)
{
@@ -373,7 +374,7 @@ retry:
/* Register the new dynamic worker. */
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
@@ -394,7 +395,7 @@ retry:
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
errhint("You might need to increase max_worker_processes.")));
errhint("You might need to increase max_worker_processes.")));
return;
}
@@ -410,7 +411,7 @@ void
logicalrep_worker_stop(Oid subid, Oid relid)
{
LogicalRepWorker *worker;
uint16 generation;
uint16 generation;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
@@ -435,7 +436,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
*/
while (worker->in_use && !worker->proc)
{
int rc;
int rc;
LWLockRelease(LogicalRepWorkerLock);
@@ -478,7 +479,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
/* ... and wait for it to die. */
for (;;)
{
int rc;
int rc;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
if (!worker->proc || worker->generation != generation)
@@ -509,7 +510,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
void
logicalrep_worker_wakeup(Oid subid, Oid relid)
{
LogicalRepWorker *worker;
LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid, relid, true);
@@ -544,18 +545,18 @@ logicalrep_worker_attach(int slot)
{
LWLockRelease(LogicalRepWorkerLock);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication worker slot %d is empty, cannot attach",
slot)));
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication worker slot %d is empty, cannot attach",
slot)));
}
if (MyLogicalRepWorker->proc)
{
LWLockRelease(LogicalRepWorkerLock);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication worker slot %d is already used by "
"another worker, cannot attach", slot)));
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication worker slot %d is already used by "
"another worker, cannot attach", slot)));
}
MyLogicalRepWorker->proc = MyProc;
@@ -620,7 +621,7 @@ logicalrep_worker_onexit(int code, Datum arg)
void
logicalrep_worker_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
int save_errno = errno;
got_SIGTERM = true;
@@ -634,7 +635,7 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
void
logicalrep_worker_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
int save_errno = errno;
got_SIGHUP = true;
@@ -651,15 +652,16 @@ logicalrep_worker_sighup(SIGNAL_ARGS)
int
logicalrep_sync_worker_count(Oid subid)
{
int i;
int res = 0;
int i;
int res = 0;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
if (w->subid == subid && OidIsValid(w->relid))
res++;
}
@@ -699,7 +701,7 @@ ApplyLauncherRegister(void)
return;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
@@ -729,7 +731,7 @@ ApplyLauncherShmemInit(void)
if (!found)
{
int slot;
int slot;
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
@@ -783,7 +785,7 @@ ApplyLauncherWakeup(void)
void
ApplyLauncherMain(Datum main_arg)
{
TimestampTz last_start_time = 0;
TimestampTz last_start_time = 0;
ereport(DEBUG1,
(errmsg("logical replication launcher started")));
@@ -813,10 +815,10 @@ ApplyLauncherMain(Datum main_arg)
int rc;
List *sublist;
ListCell *lc;
MemoryContext subctx;
MemoryContext oldctx;
TimestampTz now;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
MemoryContext subctx;
MemoryContext oldctx;
TimestampTz now;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
now = GetCurrentTimestamp();
@@ -826,7 +828,7 @@ ApplyLauncherMain(Datum main_arg)
{
/* Use temporary context for the database list and worker info. */
subctx = AllocSetContextCreate(TopMemoryContext,
"Logical Replication Launcher sublist",
"Logical Replication Launcher sublist",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
@@ -838,8 +840,8 @@ ApplyLauncherMain(Datum main_arg)
/* Start the missing workers for enabled subscriptions. */
foreach(lc, sublist)
{
Subscription *sub = (Subscription *) lfirst(lc);
LogicalRepWorker *w;
Subscription *sub = (Subscription *) lfirst(lc);
LogicalRepWorker *w;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
@@ -864,9 +866,9 @@ ApplyLauncherMain(Datum main_arg)
{
/*
* The wait in previous cycle was interrupted in less than
* wal_retrieve_retry_interval since last worker was started,
* this usually means crash of the worker, so we should retry
* in wal_retrieve_retry_interval again.
* wal_retrieve_retry_interval since last worker was started, this
* usually means crash of the worker, so we should retry in
* wal_retrieve_retry_interval again.
*/
wait_time = wal_retrieve_retry_interval;
}
@@ -948,7 +950,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
Datum values[PG_STAT_GET_SUBSCRIPTION_COLS];
bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
int worker_pid;
LogicalRepWorker worker;
LogicalRepWorker worker;
memcpy(&worker, &LogicalRepCtx->workers[i],
sizeof(LogicalRepWorker));
@@ -992,7 +994,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
/* If only a single subscription was requested, and we found it, break. */
/*
* If only a single subscription was requested, and we found it,
* break.
*/
if (OidIsValid(subid))
break;
}

View File

@@ -118,7 +118,7 @@ StartupDecodingContext(List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
LogicalOutputPluginWriterUpdateProgress update_progress)
{
ReplicationSlot *slot;
MemoryContext context,
@@ -202,8 +202,8 @@ StartupDecodingContext(List *output_plugin_options,
* plugin contains the name of the output plugin
* output_plugin_options contains options passed to the output plugin
* read_page, prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
* actual, work.
* callbacks that have to be filled to perform the use-case dependent,
* actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
* as the decoding context because further memory contexts will be created
@@ -219,7 +219,7 @@ CreateInitDecodingContext(char *plugin,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
LogicalOutputPluginWriterUpdateProgress update_progress)
{
TransactionId xmin_horizon = InvalidTransactionId;
ReplicationSlot *slot;

View File

@@ -328,17 +328,19 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
{
LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
/*
* If only the confirmed_flush_lsn has changed the slot won't get
* marked as dirty by the above. Callers on the walsender interface
* are expected to keep track of their own progress and don't need
* it written out. But SQL-interface users cannot specify their own
* start positions and it's harder for them to keep track of their
* progress, so we should make more of an effort to save it for them.
* marked as dirty by the above. Callers on the walsender
* interface are expected to keep track of their own progress and
* don't need it written out. But SQL-interface users cannot
* specify their own start positions and it's harder for them to
* keep track of their progress, so we should make more of an
* effort to save it for them.
*
* Dirty the slot so it's written out at the next checkpoint. We'll
* still lose its position on crash, as documented, but it's better
* than always losing the position even on clean restart.
* Dirty the slot so it's written out at the next checkpoint.
* We'll still lose its position on crash, as documented, but it's
* better than always losing the position even on clean restart.
*/
ReplicationSlotMarkDirty();
}

View File

@@ -28,7 +28,7 @@
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
HeapTuple tuple);
HeapTuple tuple);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -72,7 +72,7 @@ void
logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
uint8 flags = 0;
uint8 flags = 0;
pq_sendbyte(out, 'C'); /* sending COMMIT */
@@ -92,7 +92,7 @@ void
logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
{
/* read flags (unused for now) */
uint8 flags = pq_getmsgbyte(in);
uint8 flags = pq_getmsgbyte(in);
if (flags != 0)
elog(ERROR, "unrecognized flags %u in commit message", flags);
@@ -136,7 +136,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
* Write INSERT to the output stream.
*/
void
logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
{
pq_sendbyte(out, 'I'); /* action INSERT */
@@ -160,7 +160,7 @@ LogicalRepRelId
logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
{
char action;
LogicalRepRelId relid;
LogicalRepRelId relid;
/* read the relation id */
relid = pq_getmsgint(in, 4);
@@ -180,7 +180,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
*/
void
logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
HeapTuple newtuple)
HeapTuple newtuple)
{
pq_sendbyte(out, 'U'); /* action UPDATE */
@@ -194,9 +194,9 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
if (oldtuple != NULL)
{
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
pq_sendbyte(out, 'K'); /* old key follows */
logicalrep_write_tuple(out, rel, oldtuple);
}
@@ -213,7 +213,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
LogicalRepTupleData *newtup)
{
char action;
LogicalRepRelId relid;
LogicalRepRelId relid;
/* read the relation id */
relid = pq_getmsgint(in, 4);
@@ -277,7 +277,7 @@ LogicalRepRelId
logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
{
char action;
LogicalRepRelId relid;
LogicalRepRelId relid;
/* read the relation id */
relid = pq_getmsgint(in, 4);
@@ -323,7 +323,7 @@ logicalrep_write_rel(StringInfo out, Relation rel)
LogicalRepRelation *
logicalrep_read_rel(StringInfo in)
{
LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
rel->remoteid = pq_getmsgint(in, 4);
@@ -424,12 +424,12 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
if (isnull[i])
{
pq_sendbyte(out, 'n'); /* null column */
pq_sendbyte(out, 'n'); /* null column */
continue;
}
else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
{
pq_sendbyte(out, 'u'); /* unchanged toast column */
pq_sendbyte(out, 'u'); /* unchanged toast column */
continue;
}
@@ -473,21 +473,21 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
switch (kind)
{
case 'n': /* null */
case 'n': /* null */
tuple->values[i] = NULL;
tuple->changed[i] = true;
break;
case 'u': /* unchanged column */
case 'u': /* unchanged column */
/* we don't receive the value of an unchanged column */
tuple->values[i] = NULL;
break;
case 't': /* text formatted value */
case 't': /* text formatted value */
{
int len;
tuple->changed[i] = true;
len = pq_getmsgint(in, 4); /* read length */
len = pq_getmsgint(in, 4); /* read length */
/* and data */
tuple->values[i] = palloc(len + 1);
@@ -534,7 +534,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
for (i = 0; i < desc->natts; i++)
{
Form_pg_attribute att = desc->attrs[i];
uint8 flags = 0;
uint8 flags = 0;
if (att->attisdropped)
continue;
@@ -612,7 +612,7 @@ logicalrep_write_namespace(StringInfo out, Oid nspid)
pq_sendbyte(out, '\0');
else
{
char *nspname = get_namespace_name(nspid);
char *nspname = get_namespace_name(nspid);
if (nspname == NULL)
elog(ERROR, "cache lookup failed for namespace %u",

View File

@@ -30,13 +30,13 @@
#include "utils/memutils.h"
#include "utils/syscache.h"
static MemoryContext LogicalRepRelMapContext = NULL;
static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
uint32 hashvalue);
uint32 hashvalue);
/*
* Relcache invalidation callback for our relation map cache.
@@ -44,7 +44,7 @@ static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
static void
logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
{
LogicalRepRelMapEntry *entry;
LogicalRepRelMapEntry *entry;
/* Just to be sure. */
if (LogicalRepRelMap == NULL)
@@ -110,7 +110,7 @@ logicalrep_relmap_init(void)
/* This will usually be small. */
LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
HASH_ELEM | HASH_BLOBS |HASH_CONTEXT);
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
@@ -134,7 +134,7 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
if (remoterel->natts > 0)
{
int i;
int i;
for (i = 0; i < remoterel->natts; i++)
pfree(remoterel->attnames[i]);
@@ -157,10 +157,10 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
void
logicalrep_relmap_update(LogicalRepRelation *remoterel)
{
MemoryContext oldctx;
LogicalRepRelMapEntry *entry;
bool found;
int i;
MemoryContext oldctx;
LogicalRepRelMapEntry *entry;
bool found;
int i;
if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();
@@ -202,7 +202,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
static int
logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
{
int i;
int i;
for (i = 0; i < remoterel->natts; i++)
{
@@ -222,7 +222,7 @@ logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
{
LogicalRepRelMapEntry *entry;
LogicalRepRelMapEntry *entry;
bool found;
if (LogicalRepRelMap == NULL)
@@ -245,7 +245,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Bitmapset *idkey;
TupleDesc desc;
LogicalRepRelation *remoterel;
MemoryContext oldctx;
MemoryContext oldctx;
remoterel = &entry->remoterel;
/* Try to find and lock the relation by name. */
@@ -265,8 +266,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
/*
* Build the mapping of local attribute numbers to remote attribute
* numbers and validate that we don't miss any replicated columns
* as that would result in potentially unwanted data loss.
* numbers and validate that we don't miss any replicated columns as
* that would result in potentially unwanted data loss.
*/
desc = RelationGetDescr(entry->localrel);
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
@@ -276,8 +277,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
found = 0;
for (i = 0; i < desc->natts; i++)
{
int attnum = logicalrep_rel_att_by_name(remoterel,
NameStr(desc->attrs[i]->attname));
int attnum = logicalrep_rel_att_by_name(remoterel,
NameStr(desc->attrs[i]->attname));
entry->attrmap[i] = attnum;
if (attnum >= 0)
found++;
@@ -287,9 +289,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
if (found < remoterel->natts)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" is missing "
"some replicated columns",
remoterel->nspname, remoterel->relname)));
errmsg("logical replication target relation \"%s.%s\" is missing "
"some replicated columns",
remoterel->nspname, remoterel->relname)));
/*
* Check that replica identity matches. We allow for stricter replica
@@ -299,8 +301,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
* but in the opposite scenario it will.
*
* Don't throw any error here just mark the relation entry as not
* updatable, as replica identity is only for updates and deletes
* but inserts can be replicated even without it.
* updatable, as replica identity is only for updates and deletes but
* inserts can be replicated even without it.
*/
entry->updatable = true;
idkey = RelationGetIndexAttrBitmap(entry->localrel,
@@ -310,6 +312,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
{
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_PRIMARY_KEY);
/*
* If no replica identity index and no PK, the published table
* must have replica identity FULL.
@@ -321,14 +324,14 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
i = -1;
while ((i = bms_next_member(idkey, i)) >= 0)
{
int attnum = i + FirstLowInvalidHeapAttributeNumber;
int attnum = i + FirstLowInvalidHeapAttributeNumber;
if (!AttrNumberIsForUserDefinedAttr(attnum))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" uses "
"system columns in REPLICA IDENTITY index",
remoterel->nspname, remoterel->relname)));
errmsg("logical replication target relation \"%s.%s\" uses "
"system columns in REPLICA IDENTITY index",
remoterel->nspname, remoterel->relname)));
attnum = AttrNumberGetAttrOffset(attnum);
@@ -371,7 +374,7 @@ static void
logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS status;
LogicalRepTyp *entry;
LogicalRepTyp *entry;
/* Just to be sure. */
if (LogicalRepTypMap == NULL)
@@ -402,9 +405,9 @@ logicalrep_typmap_free_entry(LogicalRepTyp *entry)
void
logicalrep_typmap_update(LogicalRepTyp *remotetyp)
{
MemoryContext oldctx;
LogicalRepTyp *entry;
bool found;
MemoryContext oldctx;
LogicalRepTyp *entry;
bool found;
if (LogicalRepTypMap == NULL)
logicalrep_relmap_init();
@@ -433,9 +436,9 @@ logicalrep_typmap_update(LogicalRepTyp *remotetyp)
Oid
logicalrep_typmap_getid(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
Oid nspoid;
LogicalRepTyp *entry;
bool found;
Oid nspoid;
/* Internal types are mapped directly. */
if (remoteid < FirstNormalObjectId)

View File

@@ -59,7 +59,7 @@
* by the following graph describing the SnapBuild->state transitions:
*
* +-------------------------+
* +----| START |-------------+
* +----| START |-------------+
* | +-------------------------+ |
* | | |
* | | |
@@ -68,22 +68,22 @@
* | | |
* | v |
* | +-------------------------+ v
* | | BUILDING_SNAPSHOT |------------>|
* | | BUILDING_SNAPSHOT |------------>|
* | +-------------------------+ |
* | | |
* | | |
* | running_xacts #2, xacts from #1 finished |
* | running_xacts #2, xacts from #1 finished |
* | | |
* | | |
* | v |
* | +-------------------------+ v
* | | FULL_SNAPSHOT |------------>|
* | | FULL_SNAPSHOT |------------>|
* | +-------------------------+ |
* | | |
* running_xacts | saved snapshot
* with zero xacts | at running_xacts's lsn
* | | |
* | running_xacts with xacts from #2 finished |
* | running_xacts with xacts from #2 finished |
* | | |
* | v |
* | +-------------------------+ |
@@ -209,9 +209,9 @@ struct SnapBuild
TransactionId was_xmin;
TransactionId was_xmax;
size_t was_xcnt; /* number of used xip entries */
size_t was_xcnt_space; /* allocated size of xip */
TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
size_t was_xcnt; /* number of used xip entries */
size_t was_xcnt_space; /* allocated size of xip */
TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
} was_running;
/*
@@ -608,8 +608,8 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
{
if (newxcnt >= GetMaxSnapshotXidCount())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("initial slot snapshot too large")));
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("initial slot snapshot too large")));
newxip[newxcnt++] = xid;
}
@@ -986,6 +986,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
if (NormalTransactionIdFollows(subxid, xmax))
xmax = subxid;
}
/*
* If we're forcing timetravel we also need visibility information
* about subtransaction, so keep track of subtransaction's state, even
@@ -1031,8 +1032,8 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
/*
* Adjust xmax of the snapshot builder, we only do that for committed,
* catalog modifying, transactions, everything else isn't interesting
* for us since we'll never look at the respective rows.
* catalog modifying, transactions, everything else isn't interesting for
* us since we'll never look at the respective rows.
*/
if (needs_timetravel &&
(!TransactionIdIsValid(builder->xmax) ||
@@ -1130,8 +1131,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
running->oldestRunningXid);
/*
* Increase shared memory limits, so vacuum can work on tuples we prevented
* from being pruned till now.
* Increase shared memory limits, so vacuum can work on tuples we
* prevented from being pruned till now.
*/
LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
@@ -1202,11 +1203,11 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* modifying transactions.
*
* c) First incrementally build a snapshot for catalog tuples
* (BUILDING_SNAPSHOT), that requires all, already in-progress,
* transactions to finish. Every transaction starting after that
* (FULL_SNAPSHOT state), has enough information to be decoded. But
* for older running transactions no viable snapshot exists yet, so
* CONSISTENT will only be reached once all of those have finished.
* (BUILDING_SNAPSHOT), that requires all, already in-progress,
* transactions to finish. Every transaction starting after that
* (FULL_SNAPSHOT state), has enough information to be decoded. But
* for older running transactions no viable snapshot exists yet, so
* CONSISTENT will only be reached once all of those have finished.
* ---
*/
@@ -1271,6 +1272,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
/* there won't be any state to cleanup */
return false;
}
/*
* c) transition from START to BUILDING_SNAPSHOT.
*
@@ -1308,6 +1310,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
SnapBuildWaitSnapshot(running, running->nextXid);
}
/*
* c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
*
@@ -1324,13 +1327,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
SnapBuildStartNextPhaseAt(builder, running->nextXid);
ereport(LOG,
(errmsg("logical decoding found initial consistent point at %X/%X",
(uint32) (lsn >> 32), (uint32) lsn),
errdetail("Waiting for transactions (approximately %d) older than %u to end.",
running->xcnt, running->nextXid)));
(errmsg("logical decoding found initial consistent point at %X/%X",
(uint32) (lsn >> 32), (uint32) lsn),
errdetail("Waiting for transactions (approximately %d) older than %u to end.",
running->xcnt, running->nextXid)));
SnapBuildWaitSnapshot(running, running->nextXid);
}
/*
* c) transition from FULL_SNAPSHOT to CONSISTENT.
*
@@ -1368,9 +1372,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
*
* This isn't required for the correctness of decoding, but to:
* a) allow isolationtester to notice that we're currently waiting for
* something.
* something.
* b) log a new xl_running_xacts record where it'd be helpful, without having
* to write for bgwriter or checkpointer.
* to write for bgwriter or checkpointer.
* ---
*/
static void
@@ -1383,9 +1387,9 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
TransactionId xid = running->xids[off];
/*
* Upper layers should prevent that we ever need to wait on
* ourselves. Check anyway, since failing to do so would either
* result in an endless wait or an Assert() failure.
* Upper layers should prevent that we ever need to wait on ourselves.
* Check anyway, since failing to do so would either result in an
* endless wait or an Assert() failure.
*/
if (TransactionIdIsCurrentTransactionId(xid))
elog(ERROR, "waiting for ourselves");
@@ -1864,8 +1868,9 @@ CheckPointSnapBuild(void)
char path[MAXPGPATH + 21];
/*
* We start off with a minimum of the last redo pointer. No new replication
* slot will start before that, so that's a safe upper bound for removal.
* We start off with a minimum of the last redo pointer. No new
* replication slot will start before that, so that's a safe upper bound
* for removal.
*/
redo = GetRedoRecPtr();

View File

@@ -113,7 +113,8 @@ StringInfo copybuf = NULL;
/*
* Exit routine for synchronization worker.
*/
static void pg_attribute_noreturn()
static void
pg_attribute_noreturn()
finish_sync_worker(void)
{
/*
@@ -148,12 +149,12 @@ finish_sync_worker(void)
static bool
wait_for_sync_status_change(Oid relid, char origstate)
{
int rc;
char state = origstate;
int rc;
char state = origstate;
while (!got_SIGTERM)
{
LogicalRepWorker *worker;
LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
@@ -269,7 +270,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
struct tablesync_start_time_mapping
{
Oid relid;
TimestampTz last_start_time;
TimestampTz last_start_time;
};
static List *table_states = NIL;
static HTAB *last_start_times = NULL;
@@ -281,9 +282,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
/* We need up to date sync state info for subscription tables here. */
if (!table_states_valid)
{
MemoryContext oldctx;
List *rstates;
ListCell *lc;
MemoryContext oldctx;
List *rstates;
ListCell *lc;
SubscriptionRelState *rstate;
/* Clean the old list. */
@@ -294,7 +295,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
started_tx = true;
/* Fetch all non-ready tables. */
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
@@ -324,6 +325,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
last_start_times = hash_create("Logical replication table sync worker start times",
256, &ctl, HASH_ELEM | HASH_BLOBS);
}
/*
* Clean up the hash table when we're done with all tables (just to
* release the bit of memory).
@@ -337,14 +339,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
/* Process all tables that are being synchronized. */
foreach(lc, table_states)
{
SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc);
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
if (rstate->state == SUBREL_STATE_SYNCDONE)
{
/*
* Apply has caught up to the position where the table sync
* has finished. Time to mark the table as ready so that
* apply will just continue to replicate it normally.
* Apply has caught up to the position where the table sync has
* finished. Time to mark the table as ready so that apply will
* just continue to replicate it normally.
*/
if (current_lsn >= rstate->lsn)
{
@@ -362,8 +364,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
else
{
LogicalRepWorker *syncworker;
int nsyncworkers = 0;
LogicalRepWorker *syncworker;
int nsyncworkers = 0;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
@@ -376,6 +378,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
SpinLockRelease(&syncworker->relmutex);
}
else
/*
* If no sync worker for this table yet, count running sync
* workers for this subscription, while we have the lock, for
@@ -394,16 +397,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* There are three possible synchronization situations here.
*
* a) Apply is in front of the table sync: We tell the table
* sync to CATCHUP.
* sync to CATCHUP.
*
* b) Apply is behind the table sync: We tell the table sync
* to mark the table as SYNCDONE and finish.
* c) Apply and table sync are at the same position: We tell
* table sync to mark the table as READY and finish.
* to mark the table as SYNCDONE and finish.
*
* In any case we'll need to wait for table sync to change
* the state in catalog and only then continue ourselves.
* c) Apply and table sync are at the same position: We tell
* table sync to mark the table as READY and finish.
*
* In any case we'll need to wait for table sync to change the
* state in catalog and only then continue ourselves.
*/
if (current_lsn > rstate->lsn)
{
@@ -427,20 +430,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
logicalrep_worker_wakeup_ptr(syncworker);
/*
* Enter busy loop and wait for synchronization status
* change.
* Enter busy loop and wait for synchronization status change.
*/
wait_for_sync_status_change(rstate->relid, rstate->state);
}
/*
* If there is no sync worker registered for the table and
* there is some free sync worker slot, start new sync worker
* for the table.
* If there is no sync worker registered for the table and there
* is some free sync worker slot, start new sync worker for the
* table.
*/
else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
{
TimestampTz now = GetCurrentTimestamp();
TimestampTz now = GetCurrentTimestamp();
struct tablesync_start_time_mapping *hentry;
bool found;
@@ -492,7 +494,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
for (i = 0; i < desc->natts; i++)
{
int remoteattnum = rel->attrmap[i];
int remoteattnum = rel->attrmap[i];
/* Skip dropped attributes. */
if (desc->attrs[i]->attisdropped)
@@ -503,7 +505,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
continue;
attnamelist = lappend(attnamelist,
makeString(rel->remoterel.attnames[remoteattnum]));
makeString(rel->remoterel.attnames[remoteattnum]));
}
return attnamelist;
@@ -516,8 +518,8 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
static int
copy_read_data(void *outbuf, int minread, int maxread)
{
int bytesread = 0;
int avail;
int bytesread = 0;
int avail;
/* If there are some leftover data from previous read, use them. */
avail = copybuf->len - copybuf->cursor;
@@ -601,13 +603,13 @@ static void
fetch_remote_table_info(char *nspname, char *relname,
LogicalRepRelation *lrel)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[2] = {OIDOID, CHAROID};
Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
bool isnull;
int natt;
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[2] = {OIDOID, CHAROID};
Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
bool isnull;
int natt;
lrel->nspname = nspname;
lrel->relname = relname;
@@ -615,14 +617,14 @@ fetch_remote_table_info(char *nspname, char *relname,
/* First fetch Oid and replica identity. */
initStringInfo(&cmd);
appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
" FROM pg_catalog.pg_class c"
" INNER JOIN pg_catalog.pg_namespace n"
" ON (c.relnamespace = n.oid)"
" WHERE n.nspname = %s"
" AND c.relname = %s"
" AND c.relkind = 'r'",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
" FROM pg_catalog.pg_class c"
" INNER JOIN pg_catalog.pg_namespace n"
" ON (c.relnamespace = n.oid)"
" WHERE n.nspname = %s"
" AND c.relname = %s"
" AND c.relkind = 'r'",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
if (res->status != WALRCV_OK_TUPLES)
@@ -653,7 +655,7 @@ fetch_remote_table_info(char *nspname, char *relname,
" a.attnum = ANY(i.indkey)"
" FROM pg_catalog.pg_attribute a"
" LEFT JOIN pg_catalog.pg_index i"
" ON (i.indexrelid = pg_get_replica_identity_index(%u))"
" ON (i.indexrelid = pg_get_replica_identity_index(%u))"
" WHERE a.attnum > 0::pg_catalog.int2"
" AND NOT a.attisdropped"
" AND a.attrelid = %u"
@@ -686,7 +688,7 @@ fetch_remote_table_info(char *nspname, char *relname,
/* Should never happen. */
if (++natt >= MaxTupleAttributeNumber)
elog(ERROR, "too many columns in remote table \"%s.%s\"",
nspname, relname);
nspname, relname);
ExecClearTuple(slot);
}
@@ -707,9 +709,9 @@ static void
copy_table(Relation rel)
{
LogicalRepRelMapEntry *relmapentry;
LogicalRepRelation lrel;
WalRcvExecResult *res;
StringInfoData cmd;
LogicalRepRelation lrel;
WalRcvExecResult *res;
StringInfoData cmd;
CopyState cstate;
List *attnamelist;
ParseState *pstate;
@@ -759,8 +761,8 @@ copy_table(Relation rel)
char *
LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
{
char *slotname;
char *err;
char *slotname;
char *err;
char relstate;
XLogRecPtr relstate_lsn;
@@ -783,7 +785,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* NAMEDATALEN on the remote that matters, but this scheme will also work
* reasonably if that is different.)
*/
StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
slotname = psprintf("%.*s_%u_sync_%u",
NAMEDATALEN - 28,
MySubscription->slotname,
@@ -801,7 +803,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
case SUBREL_STATE_DATASYNC:
{
Relation rel;
WalRcvExecResult *res;
WalRcvExecResult *res;
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -818,24 +820,23 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
pgstat_report_stat(false);
/*
* We want to do the table data sync in single
* transaction.
* We want to do the table data sync in single transaction.
*/
StartTransactionCommand();
/*
* Use standard write lock here. It might be better to
* disallow access to table while it's being synchronized.
* But we don't want to block the main apply process from
* working and it has to open relation in RowExclusiveLock
* when remapping remote relation id to local one.
* disallow access to table while it's being synchronized. But
* we don't want to block the main apply process from working
* and it has to open relation in RowExclusiveLock when
* remapping remote relation id to local one.
*/
rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock);
/*
* Create temporary slot for the sync process.
* We do this inside transaction so that we can use the
* snapshot made by the slot to get existing data.
* Create temporary slot for the sync process. We do this
* inside transaction so that we can use the snapshot made by
* the slot to get existing data.
*/
res = walrcv_exec(wrconn,
"BEGIN READ ONLY ISOLATION LEVEL "
@@ -849,10 +850,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/*
* Create new temporary logical decoding slot.
*
* We'll use slot for data copy so make sure the snapshot
* is used for the transaction, that way the COPY will get
* data that is consistent with the lsn used by the slot
* to start decoding.
* We'll use slot for data copy so make sure the snapshot is
* used for the transaction, that way the COPY will get data
* that is consistent with the lsn used by the slot to start
* decoding.
*/
walrcv_create_slot(wrconn, slotname, true,
CRS_USE_SNAPSHOT, origin_startpos);
@@ -872,8 +873,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
CommandCounterIncrement();
/*
* We are done with the initial data synchronization,
* update the state.
* We are done with the initial data synchronization, update
* the state.
*/
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
@@ -881,8 +882,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
* Wait for main apply worker to either tell us to
* catchup or that we are done.
* Wait for main apply worker to either tell us to catchup or
* that we are done.
*/
wait_for_sync_status_change(MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate);

View File

@@ -88,29 +88,29 @@
typedef struct FlushPosition
{
dlist_node node;
XLogRecPtr local_end;
XLogRecPtr remote_end;
dlist_node node;
XLogRecPtr local_end;
XLogRecPtr remote_end;
} FlushPosition;
static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
LogicalRepRelation *rel;
LogicalRepRelation *rel;
int attnum;
} SlotErrCallbackArg;
static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
WalReceiverConn *wrconn = NULL;
WalReceiverConn *wrconn = NULL;
Subscription *MySubscription = NULL;
bool MySubscriptionValid = false;
Subscription *MySubscription = NULL;
bool MySubscriptionValid = false;
bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
@@ -215,7 +215,7 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
*/
static void
slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
TupleTableSlot *slot)
TupleTableSlot *slot)
{
TupleDesc desc = RelationGetDescr(rel->localrel);
int num_phys_attrs = desc->natts;
@@ -271,9 +271,9 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
Oid remotetypoid,
localtypoid;
SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
Oid remotetypoid,
localtypoid;
if (errarg->attnum < 0)
return;
@@ -295,12 +295,12 @@ slot_store_error_callback(void *arg)
*/
static void
slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
char **values)
char **values)
{
int natts = slot->tts_tupleDescriptor->natts;
int i;
SlotErrCallbackArg errarg;
ErrorContextCallback errcallback;
int natts = slot->tts_tupleDescriptor->natts;
int i;
SlotErrCallbackArg errarg;
ErrorContextCallback errcallback;
ExecClearTuple(slot);
@@ -315,14 +315,14 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
/* Call the "in" function for each non-dropped attribute */
for (i = 0; i < natts; i++)
{
Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
int remoteattnum = rel->attrmap[i];
Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
int remoteattnum = rel->attrmap[i];
if (!att->attisdropped && remoteattnum >= 0 &&
values[remoteattnum] != NULL)
{
Oid typinput;
Oid typioparam;
Oid typinput;
Oid typioparam;
errarg.attnum = remoteattnum;
@@ -359,12 +359,12 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
*/
static void
slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
char **values, bool *replaces)
char **values, bool *replaces)
{
int natts = slot->tts_tupleDescriptor->natts;
int i;
SlotErrCallbackArg errarg;
ErrorContextCallback errcallback;
int natts = slot->tts_tupleDescriptor->natts;
int i;
SlotErrCallbackArg errarg;
ErrorContextCallback errcallback;
slot_getallattrs(slot);
ExecClearTuple(slot);
@@ -380,16 +380,16 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
/* Call the "in" function for each replaced attribute */
for (i = 0; i < natts; i++)
{
Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
int remoteattnum = rel->attrmap[i];
Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
int remoteattnum = rel->attrmap[i];
if (remoteattnum >= 0 && !replaces[remoteattnum])
continue;
if (remoteattnum >= 0 && values[remoteattnum] != NULL)
{
Oid typinput;
Oid typioparam;
Oid typinput;
Oid typioparam;
errarg.attnum = remoteattnum;
@@ -418,7 +418,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
static void
apply_handle_begin(StringInfo s)
{
LogicalRepBeginData begin_data;
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
@@ -437,7 +437,7 @@ apply_handle_begin(StringInfo s)
static void
apply_handle_commit(StringInfo s)
{
LogicalRepCommitData commit_data;
LogicalRepCommitData commit_data;
logicalrep_read_commit(s, &commit_data);
@@ -476,8 +476,8 @@ static void
apply_handle_origin(StringInfo s)
{
/*
* ORIGIN message can only come inside remote transaction and before
* any actual writes.
* ORIGIN message can only come inside remote transaction and before any
* actual writes.
*/
if (!in_remote_transaction ||
(IsTransactionState() && !am_tablesync_worker()))
@@ -497,7 +497,7 @@ apply_handle_origin(StringInfo s)
static void
apply_handle_relation(StringInfo s)
{
LogicalRepRelation *rel;
LogicalRepRelation *rel;
rel = logicalrep_read_rel(s);
logicalrep_relmap_update(rel);
@@ -512,7 +512,7 @@ apply_handle_relation(StringInfo s)
static void
apply_handle_type(StringInfo s)
{
LogicalRepTyp typ;
LogicalRepTyp typ;
logicalrep_read_typ(s, &typ);
logicalrep_typmap_update(&typ);
@@ -526,7 +526,7 @@ apply_handle_type(StringInfo s)
static Oid
GetRelationIdentityOrPK(Relation rel)
{
Oid idxoid;
Oid idxoid;
idxoid = RelationGetReplicaIndex(rel);
@@ -543,11 +543,11 @@ static void
apply_handle_insert(StringInfo s)
{
LogicalRepRelMapEntry *rel;
LogicalRepTupleData newtup;
LogicalRepRelId relid;
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
LogicalRepTupleData newtup;
LogicalRepRelId relid;
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
ensure_transaction();
@@ -607,15 +607,15 @@ check_relation_updatable(LogicalRepRelMapEntry *rel)
return;
/*
* We are in error mode so it's fine this is somewhat slow.
* It's better to give user correct error.
* We are in error mode so it's fine this is somewhat slow. It's better to
* give user correct error.
*/
if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publisher does not send replica identity column "
"expected by the logical replication target relation \"%s.%s\"",
"expected by the logical replication target relation \"%s.%s\"",
rel->remoterel.nspname, rel->remoterel.relname)));
}
@@ -637,17 +637,17 @@ static void
apply_handle_update(StringInfo s)
{
LogicalRepRelMapEntry *rel;
LogicalRepRelId relid;
Oid idxoid;
EState *estate;
EPQState epqstate;
LogicalRepTupleData oldtup;
LogicalRepTupleData newtup;
bool has_oldtup;
TupleTableSlot *localslot;
TupleTableSlot *remoteslot;
bool found;
MemoryContext oldctx;
LogicalRepRelId relid;
Oid idxoid;
EState *estate;
EPQState epqstate;
LogicalRepTupleData oldtup;
LogicalRepTupleData newtup;
bool has_oldtup;
TupleTableSlot *localslot;
TupleTableSlot *remoteslot;
bool found;
MemoryContext oldctx;
ensure_transaction();
@@ -685,8 +685,8 @@ apply_handle_update(StringInfo s)
MemoryContextSwitchTo(oldctx);
/*
* Try to find tuple using either replica identity index, primary key
* or if needed, sequential scan.
* Try to find tuple using either replica identity index, primary key or
* if needed, sequential scan.
*/
idxoid = GetRelationIdentityOrPK(rel->localrel);
Assert(OidIsValid(idxoid) ||
@@ -758,15 +758,15 @@ static void
apply_handle_delete(StringInfo s)
{
LogicalRepRelMapEntry *rel;
LogicalRepTupleData oldtup;
LogicalRepRelId relid;
Oid idxoid;
EState *estate;
EPQState epqstate;
TupleTableSlot *remoteslot;
TupleTableSlot *localslot;
bool found;
MemoryContext oldctx;
LogicalRepTupleData oldtup;
LogicalRepRelId relid;
Oid idxoid;
EState *estate;
EPQState epqstate;
TupleTableSlot *remoteslot;
TupleTableSlot *localslot;
bool found;
MemoryContext oldctx;
ensure_transaction();
@@ -802,8 +802,8 @@ apply_handle_delete(StringInfo s)
MemoryContextSwitchTo(oldctx);
/*
* Try to find tuple using either replica identity index, primary key
* or if needed, sequential scan.
* Try to find tuple using either replica identity index, primary key or
* if needed, sequential scan.
*/
idxoid = GetRelationIdentityOrPK(rel->localrel);
Assert(OidIsValid(idxoid) ||
@@ -826,7 +826,7 @@ apply_handle_delete(StringInfo s)
}
else
{
/* The tuple to be deleted could not be found.*/
/* The tuple to be deleted could not be found. */
ereport(DEBUG1,
(errmsg("logical replication could not find row for delete "
"in replication target %s",
@@ -856,46 +856,46 @@ apply_handle_delete(StringInfo s)
static void
apply_dispatch(StringInfo s)
{
char action = pq_getmsgbyte(s);
char action = pq_getmsgbyte(s);
switch (action)
{
/* BEGIN */
/* BEGIN */
case 'B':
apply_handle_begin(s);
break;
/* COMMIT */
/* COMMIT */
case 'C':
apply_handle_commit(s);
break;
/* INSERT */
/* INSERT */
case 'I':
apply_handle_insert(s);
break;
/* UPDATE */
/* UPDATE */
case 'U':
apply_handle_update(s);
break;
/* DELETE */
/* DELETE */
case 'D':
apply_handle_delete(s);
break;
/* RELATION */
/* RELATION */
case 'R':
apply_handle_relation(s);
break;
/* TYPE */
/* TYPE */
case 'Y':
apply_handle_type(s);
break;
/* ORIGIN */
/* ORIGIN */
case 'O':
apply_handle_origin(s);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid logical replication message type %c", action)));
errmsg("invalid logical replication message type %c", action)));
}
}
@@ -925,7 +925,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
dlist_foreach_modify(iter, &lsn_mapping)
{
FlushPosition *pos =
dlist_container(FlushPosition, node, iter.cur);
dlist_container(FlushPosition, node, iter.cur);
*write = pos->remote_end;
@@ -995,12 +995,12 @@ static void
LogicalRepApplyLoop(XLogRecPtr last_received)
{
/*
* Init the ApplyMessageContext which we clean up after each
* replication protocol message.
* Init the ApplyMessageContext which we clean up after each replication
* protocol message.
*/
ApplyMessageContext = AllocSetContextCreate(ApplyContext,
"ApplyMessageContext",
ALLOCSET_DEFAULT_SIZES);
"ApplyMessageContext",
ALLOCSET_DEFAULT_SIZES);
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1039,7 +1039,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
else
{
int c;
int c;
StringInfoData s;
/* Reset timeout. */
@@ -1108,7 +1108,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
/*
* If we didn't get any transactions for a while there might be
* unconsumed invalidation messages in the queue, consume them now.
* unconsumed invalidation messages in the queue, consume them
* now.
*/
AcceptInvalidationMessages();
if (!MySubscriptionValid)
@@ -1126,6 +1127,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (endofstream)
{
TimeLineID tli;
walrcv_endstreaming(wrconn, &tli);
break;
}
@@ -1152,19 +1154,18 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (rc & WL_TIMEOUT)
{
/*
* We didn't receive anything new. If we haven't heard
* anything from the server for more than
* wal_receiver_timeout / 2, ping the server. Also, if
* it's been longer than wal_receiver_status_interval
* since the last update we sent, send a status update to
* the master anyway, to report any progress in applying
* WAL.
* We didn't receive anything new. If we haven't heard anything
* from the server for more than wal_receiver_timeout / 2, ping
* the server. Also, if it's been longer than
* wal_receiver_status_interval since the last update we sent,
* send a status update to the master anyway, to report any
* progress in applying WAL.
*/
bool requestReply = false;
/*
* Check if time since last receive from standby has
* reached the configured limit.
* Check if time since last receive from standby has reached the
* configured limit.
*/
if (wal_receiver_timeout > 0)
{
@@ -1180,13 +1181,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
(errmsg("terminating logical replication worker due to timeout")));
/*
* We didn't receive anything new, for half of
* receiver replication timeout. Ping the server.
* We didn't receive anything new, for half of receiver
* replication timeout. Ping the server.
*/
if (!ping_sent)
{
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
(wal_receiver_timeout / 2));
(wal_receiver_timeout / 2));
if (now >= timeout)
{
requestReply = true;
@@ -1211,17 +1212,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
static void
send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
{
static StringInfo reply_message = NULL;
static TimestampTz send_time = 0;
static StringInfo reply_message = NULL;
static TimestampTz send_time = 0;
static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
static XLogRecPtr last_writepos = InvalidXLogRecPtr;
static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
XLogRecPtr writepos;
XLogRecPtr flushpos;
XLogRecPtr writepos;
XLogRecPtr flushpos;
TimestampTz now;
bool have_pending_txes;
bool have_pending_txes;
/*
* If the user doesn't want status to be reported to the publisher, be
@@ -1237,8 +1238,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
get_flush_position(&writepos, &flushpos, &have_pending_txes);
/*
* No outstanding transactions to flush, we can report the latest
* received position. This is important for synchronous replication.
* No outstanding transactions to flush, we can report the latest received
* position. This is important for synchronous replication.
*/
if (!have_pending_txes)
flushpos = writepos = recvpos;
@@ -1262,7 +1263,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
if (!reply_message)
{
MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
reply_message = makeStringInfo();
MemoryContextSwitchTo(oldctx);
}
@@ -1273,7 +1275,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
pq_sendint64(reply_message, now); /* sendTime */
pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
@@ -1300,9 +1302,9 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static void
reread_subscription(void)
{
MemoryContext oldctx;
Subscription *newsub;
bool started_tx = false;
MemoryContext oldctx;
Subscription *newsub;
bool started_tx = false;
/* This function might be called inside or outside of transaction. */
if (!IsTransactionState())
@@ -1317,47 +1319,45 @@ reread_subscription(void)
newsub = GetSubscription(MyLogicalRepWorker->subid, true);
/*
* Exit if the subscription was removed.
* This normally should not happen as the worker gets killed
* during DROP SUBSCRIPTION.
* Exit if the subscription was removed. This normally should not happen
* as the worker gets killed during DROP SUBSCRIPTION.
*/
if (!newsub)
{
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will "
"stop because the subscription was removed",
MySubscription->name)));
(errmsg("logical replication worker for subscription \"%s\" will "
"stop because the subscription was removed",
MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
}
/*
* Exit if the subscription was disabled.
* This normally should not happen as the worker gets killed
* during ALTER SUBSCRIPTION ... DISABLE.
* Exit if the subscription was disabled. This normally should not happen
* as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
*/
if (!newsub->enabled)
{
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will "
"stop because the subscription was disabled",
MySubscription->name)));
(errmsg("logical replication worker for subscription \"%s\" will "
"stop because the subscription was disabled",
MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
}
/*
* Exit if connection string was changed. The launcher will start
* new worker.
* Exit if connection string was changed. The launcher will start new
* worker.
*/
if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
{
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will "
"restart because the connection information was changed",
MySubscription->name)));
(errmsg("logical replication worker for subscription \"%s\" will "
"restart because the connection information was changed",
MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
@@ -1370,9 +1370,9 @@ reread_subscription(void)
if (strcmp(newsub->name, MySubscription->name) != 0)
{
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will "
"restart because subscription was renamed",
MySubscription->name)));
(errmsg("logical replication worker for subscription \"%s\" will "
"restart because subscription was renamed",
MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
@@ -1382,30 +1382,30 @@ reread_subscription(void)
Assert(newsub->slotname);
/*
* We need to make new connection to new slot if slot name has changed
* so exit here as well if that's the case.
* We need to make new connection to new slot if slot name has changed so
* exit here as well if that's the case.
*/
if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
{
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will "
"restart because the replication slot name was changed",
MySubscription->name)));
(errmsg("logical replication worker for subscription \"%s\" will "
"restart because the replication slot name was changed",
MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
}
/*
* Exit if publication list was changed. The launcher will start
* new worker.
* Exit if publication list was changed. The launcher will start new
* worker.
*/
if (!equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will "
"restart because subscription's publications were changed",
MySubscription->name)));
(errmsg("logical replication worker for subscription \"%s\" will "
"restart because subscription's publications were changed",
MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
@@ -1448,11 +1448,11 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
void
ApplyWorkerMain(Datum main_arg)
{
int worker_slot = DatumGetInt32(main_arg);
MemoryContext oldctx;
char originname[NAMEDATALEN];
XLogRecPtr origin_startpos;
char *myslotname;
int worker_slot = DatumGetInt32(main_arg);
MemoryContext oldctx;
char originname[NAMEDATALEN];
XLogRecPtr origin_startpos;
char *myslotname;
WalRcvStreamOptions options;
/* Attach to slot */
@@ -1488,8 +1488,8 @@ ApplyWorkerMain(Datum main_arg)
/* Load the subscription into persistent memory context. */
ApplyContext = AllocSetContextCreate(TopMemoryContext,
"ApplyContext",
ALLOCSET_DEFAULT_SIZES);
"ApplyContext",
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
@@ -1503,9 +1503,9 @@ ApplyWorkerMain(Datum main_arg)
if (!MySubscription->enabled)
{
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will not "
"start because the subscription was disabled during startup",
MySubscription->name)));
(errmsg("logical replication worker for subscription \"%s\" will not "
"start because the subscription was disabled during startup",
MySubscription->name)));
proc_exit(0);
}
@@ -1530,7 +1530,7 @@ ApplyWorkerMain(Datum main_arg)
if (am_tablesync_worker())
{
char *syncslotname;
char *syncslotname;
/* This is table synchroniation worker, call initial sync. */
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
@@ -1545,10 +1545,10 @@ ApplyWorkerMain(Datum main_arg)
else
{
/* This is main apply worker */
RepOriginId originid;
TimeLineID startpointTLI;
char *err;
int server_version;
RepOriginId originid;
TimeLineID startpointTLI;
char *err;
int server_version;
myslotname = MySubscription->slotname;
@@ -1570,9 +1570,8 @@ ApplyWorkerMain(Datum main_arg)
(errmsg("could not connect to the publisher: %s", err)));
/*
* We don't really use the output identify_system for anything
* but it does some initializations on the upstream so let's still
* call it.
* We don't really use the output identify_system for anything but it
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(wrconn, &startpointTLI,
&server_version);
@@ -1580,8 +1579,8 @@ ApplyWorkerMain(Datum main_arg)
}
/*
* Setup callback for syscache so that we know when something
* changes in the subscription relation state.
* Setup callback for syscache so that we know when something changes in
* the subscription relation state.
*/
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
invalidate_syncing_table_states,