mirror of
https://github.com/postgres/postgres.git
synced 2025-09-03 15:22:11 +03:00
pgindent run for 9.5
This commit is contained in:
@@ -51,7 +51,7 @@ typedef struct
|
||||
|
||||
|
||||
static int64 sendDir(char *path, int basepathlen, bool sizeonly,
|
||||
List *tablespaces, bool sendtblspclinks);
|
||||
List *tablespaces, bool sendtblspclinks);
|
||||
static bool sendFile(char *readfilename, char *tarfilename,
|
||||
struct stat * statbuf, bool missing_ok);
|
||||
static void sendFileWithContent(const char *filename, const char *content);
|
||||
@@ -130,11 +130,12 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
&labelfile, tblspcdir, &tablespaces,
|
||||
&tblspc_map_file,
|
||||
opt->progress, opt->sendtblspcmapfile);
|
||||
|
||||
/*
|
||||
* Once do_pg_start_backup has been called, ensure that any failure causes
|
||||
* us to abort the backup so we don't "leak" a backup counter. For this reason,
|
||||
* *all* functionality between do_pg_start_backup() and do_pg_stop_backup()
|
||||
* should be inside the error cleanup block!
|
||||
* us to abort the backup so we don't "leak" a backup counter. For this
|
||||
* reason, *all* functionality between do_pg_start_backup() and
|
||||
* do_pg_stop_backup() should be inside the error cleanup block!
|
||||
*/
|
||||
|
||||
PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
|
||||
@@ -145,8 +146,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
SendXlogRecPtrResult(startptr, starttli);
|
||||
|
||||
/*
|
||||
* Calculate the relative path of temporary statistics directory in order
|
||||
* to skip the files which are located in that directory later.
|
||||
* Calculate the relative path of temporary statistics directory in
|
||||
* order to skip the files which are located in that directory later.
|
||||
*/
|
||||
if (is_absolute_path(pgstat_stat_directory) &&
|
||||
strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
|
||||
@@ -900,8 +901,8 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces,
|
||||
/*
|
||||
* If there's a backup_label or tablespace_map file, it belongs to a
|
||||
* backup started by the user with pg_start_backup(). It is *not*
|
||||
* correct for this backup, our backup_label/tablespace_map is injected
|
||||
* into the tar separately.
|
||||
* correct for this backup, our backup_label/tablespace_map is
|
||||
* injected into the tar separately.
|
||||
*/
|
||||
if (strcmp(de->d_name, BACKUP_LABEL_FILE) == 0)
|
||||
continue;
|
||||
@@ -1226,8 +1227,8 @@ _tarWriteHeader(const char *filename, const char *linktarget,
|
||||
enum tarError rc;
|
||||
|
||||
rc = tarCreateHeader(h, filename, linktarget, statbuf->st_size,
|
||||
statbuf->st_mode, statbuf->st_uid, statbuf->st_gid,
|
||||
statbuf->st_mtime);
|
||||
statbuf->st_mode, statbuf->st_uid, statbuf->st_gid,
|
||||
statbuf->st_mtime);
|
||||
|
||||
switch (rc)
|
||||
{
|
||||
|
@@ -89,15 +89,15 @@ _PG_init(void)
|
||||
static void
|
||||
libpqrcv_connect(char *conninfo)
|
||||
{
|
||||
const char *keys[5];
|
||||
const char *vals[5];
|
||||
const char *keys[5];
|
||||
const char *vals[5];
|
||||
|
||||
/*
|
||||
* We use the expand_dbname parameter to process the connection string
|
||||
* (or URI), and pass some extra options. The deliberately undocumented
|
||||
* parameter "replication=true" makes it a replication connection.
|
||||
* The database name is ignored by the server in replication mode, but
|
||||
* specify "replication" for .pgpass lookup.
|
||||
* We use the expand_dbname parameter to process the connection string (or
|
||||
* URI), and pass some extra options. The deliberately undocumented
|
||||
* parameter "replication=true" makes it a replication connection. The
|
||||
* database name is ignored by the server in replication mode, but specify
|
||||
* "replication" for .pgpass lookup.
|
||||
*/
|
||||
keys[0] = "dbname";
|
||||
vals[0] = conninfo;
|
||||
|
@@ -67,9 +67,9 @@ static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
|
||||
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
|
||||
|
||||
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
||||
xl_xact_parsed_commit *parsed, TransactionId xid);
|
||||
xl_xact_parsed_commit *parsed, TransactionId xid);
|
||||
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
||||
xl_xact_parsed_abort *parsed, TransactionId xid);
|
||||
xl_xact_parsed_abort *parsed, TransactionId xid);
|
||||
|
||||
/* common function to decode tuples */
|
||||
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
|
||||
|
@@ -234,7 +234,7 @@ CreateInitDecodingContext(char *plugin,
|
||||
if (slot->data.database == InvalidOid)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("cannot use physical replication slot for logical decoding")));
|
||||
errmsg("cannot use physical replication slot for logical decoding")));
|
||||
|
||||
if (slot->data.database != MyDatabaseId)
|
||||
ereport(ERROR,
|
||||
@@ -726,7 +726,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
||||
{
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
bool ret;
|
||||
bool ret;
|
||||
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
|
@@ -400,7 +400,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
|
||||
* what we need.
|
||||
*/
|
||||
if (!binary &&
|
||||
ctx->options.output_type != OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
|
||||
ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("logical decoding output plugin \"%s\" produces binary output, but \"%s\" expects textual data",
|
||||
|
@@ -13,7 +13,7 @@
|
||||
* This file provides the following:
|
||||
* * An infrastructure to name nodes in a replication setup
|
||||
* * A facility to efficiently store and persist replication progress in an
|
||||
* efficient and durable manner.
|
||||
* efficient and durable manner.
|
||||
*
|
||||
* Replication origin consist out of a descriptive, user defined, external
|
||||
* name and a short, thus space efficient, internal 2 byte one. This split
|
||||
@@ -45,22 +45,22 @@
|
||||
* There are several levels of locking at work:
|
||||
*
|
||||
* * To create and drop replication origins an exclusive lock on
|
||||
* pg_replication_slot is required for the duration. That allows us to
|
||||
* safely and conflict free assign new origins using a dirty snapshot.
|
||||
* pg_replication_slot is required for the duration. That allows us to
|
||||
* safely and conflict free assign new origins using a dirty snapshot.
|
||||
*
|
||||
* * When creating an in-memory replication progress slot the ReplicationOirgin
|
||||
* LWLock has to be held exclusively; when iterating over the replication
|
||||
* progress a shared lock has to be held, the same when advancing the
|
||||
* replication progress of an individual backend that has not setup as the
|
||||
* session's replication origin.
|
||||
* LWLock has to be held exclusively; when iterating over the replication
|
||||
* progress a shared lock has to be held, the same when advancing the
|
||||
* replication progress of an individual backend that has not setup as the
|
||||
* session's replication origin.
|
||||
*
|
||||
* * When manipulating or looking at the remote_lsn and local_lsn fields of a
|
||||
* replication progress slot that slot's lwlock has to be held. That's
|
||||
* primarily because we do not assume 8 byte writes (the LSN) is atomic on
|
||||
* all our platforms, but it also simplifies memory ordering concerns
|
||||
* between the remote and local lsn. We use a lwlock instead of a spinlock
|
||||
* so it's less harmful to hold the lock over a WAL write
|
||||
* (c.f. AdvanceReplicationProgress).
|
||||
* replication progress slot that slot's lwlock has to be held. That's
|
||||
* primarily because we do not assume 8 byte writes (the LSN) is atomic on
|
||||
* all our platforms, but it also simplifies memory ordering concerns
|
||||
* between the remote and local lsn. We use a lwlock instead of a spinlock
|
||||
* so it's less harmful to hold the lock over a WAL write
|
||||
* (c.f. AdvanceReplicationProgress).
|
||||
*
|
||||
* ---------------------------------------------------------------------------
|
||||
*/
|
||||
@@ -105,7 +105,7 @@ typedef struct ReplicationState
|
||||
/*
|
||||
* Local identifier for the remote node.
|
||||
*/
|
||||
RepOriginId roident;
|
||||
RepOriginId roident;
|
||||
|
||||
/*
|
||||
* Location of the latest commit from the remote side.
|
||||
@@ -135,22 +135,22 @@ typedef struct ReplicationState
|
||||
*/
|
||||
typedef struct ReplicationStateOnDisk
|
||||
{
|
||||
RepOriginId roident;
|
||||
RepOriginId roident;
|
||||
XLogRecPtr remote_lsn;
|
||||
} ReplicationStateOnDisk;
|
||||
|
||||
|
||||
typedef struct ReplicationStateCtl
|
||||
{
|
||||
int tranche_id;
|
||||
LWLockTranche tranche;
|
||||
ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
|
||||
int tranche_id;
|
||||
LWLockTranche tranche;
|
||||
ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
|
||||
} ReplicationStateCtl;
|
||||
|
||||
/* external variables */
|
||||
RepOriginId replorigin_sesssion_origin = InvalidRepOriginId; /* assumed identity */
|
||||
RepOriginId replorigin_sesssion_origin = InvalidRepOriginId; /* assumed identity */
|
||||
XLogRecPtr replorigin_sesssion_origin_lsn = InvalidXLogRecPtr;
|
||||
TimestampTz replorigin_sesssion_origin_timestamp = 0;
|
||||
TimestampTz replorigin_sesssion_origin_timestamp = 0;
|
||||
|
||||
/*
|
||||
* Base address into a shared memory array of replication states of size
|
||||
@@ -188,7 +188,7 @@ replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
|
||||
if (!recoveryOK && RecoveryInProgress())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
|
||||
errmsg("cannot manipulate replication origins during recovery")));
|
||||
errmsg("cannot manipulate replication origins during recovery")));
|
||||
|
||||
}
|
||||
|
||||
@@ -207,9 +207,9 @@ RepOriginId
|
||||
replorigin_by_name(char *roname, bool missing_ok)
|
||||
{
|
||||
Form_pg_replication_origin ident;
|
||||
Oid roident = InvalidOid;
|
||||
HeapTuple tuple;
|
||||
Datum roname_d;
|
||||
Oid roident = InvalidOid;
|
||||
HeapTuple tuple;
|
||||
Datum roname_d;
|
||||
|
||||
roname_d = CStringGetTextDatum(roname);
|
||||
|
||||
@@ -235,10 +235,10 @@ replorigin_by_name(char *roname, bool missing_ok)
|
||||
RepOriginId
|
||||
replorigin_create(char *roname)
|
||||
{
|
||||
Oid roident;
|
||||
HeapTuple tuple = NULL;
|
||||
Relation rel;
|
||||
Datum roname_d;
|
||||
Oid roident;
|
||||
HeapTuple tuple = NULL;
|
||||
Relation rel;
|
||||
Datum roname_d;
|
||||
SnapshotData SnapshotDirty;
|
||||
SysScanDesc scan;
|
||||
ScanKeyData key;
|
||||
@@ -271,6 +271,7 @@ replorigin_create(char *roname)
|
||||
bool nulls[Natts_pg_replication_origin];
|
||||
Datum values[Natts_pg_replication_origin];
|
||||
bool collides;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
ScanKeyInit(&key,
|
||||
@@ -279,7 +280,7 @@ replorigin_create(char *roname)
|
||||
ObjectIdGetDatum(roident));
|
||||
|
||||
scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
|
||||
true /* indexOK */,
|
||||
true /* indexOK */ ,
|
||||
&SnapshotDirty,
|
||||
1, &key);
|
||||
|
||||
@@ -295,7 +296,7 @@ replorigin_create(char *roname)
|
||||
*/
|
||||
memset(&nulls, 0, sizeof(nulls));
|
||||
|
||||
values[Anum_pg_replication_origin_roident -1] = ObjectIdGetDatum(roident);
|
||||
values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
|
||||
values[Anum_pg_replication_origin_roname - 1] = roname_d;
|
||||
|
||||
tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
|
||||
@@ -306,7 +307,7 @@ replorigin_create(char *roname)
|
||||
}
|
||||
}
|
||||
|
||||
/* now release lock again, */
|
||||
/* now release lock again, */
|
||||
heap_close(rel, ExclusiveLock);
|
||||
|
||||
if (tuple == NULL)
|
||||
@@ -327,8 +328,8 @@ replorigin_create(char *roname)
|
||||
void
|
||||
replorigin_drop(RepOriginId roident)
|
||||
{
|
||||
HeapTuple tuple = NULL;
|
||||
Relation rel;
|
||||
HeapTuple tuple = NULL;
|
||||
Relation rel;
|
||||
int i;
|
||||
|
||||
Assert(IsTransactionState());
|
||||
@@ -379,7 +380,7 @@ replorigin_drop(RepOriginId roident)
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
/* now release lock again, */
|
||||
/* now release lock again, */
|
||||
heap_close(rel, ExclusiveLock);
|
||||
}
|
||||
|
||||
@@ -394,7 +395,7 @@ replorigin_drop(RepOriginId roident)
|
||||
bool
|
||||
replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
HeapTuple tuple;
|
||||
Form_pg_replication_origin ric;
|
||||
|
||||
Assert(OidIsValid((Oid) roident));
|
||||
@@ -446,7 +447,7 @@ ReplicationOriginShmemSize(void)
|
||||
size = add_size(size, offsetof(ReplicationStateCtl, states));
|
||||
|
||||
size = add_size(size,
|
||||
mul_size(max_replication_slots, sizeof(ReplicationState)));
|
||||
mul_size(max_replication_slots, sizeof(ReplicationState)));
|
||||
return size;
|
||||
}
|
||||
|
||||
@@ -462,11 +463,11 @@ ReplicationOriginShmemInit(void)
|
||||
ShmemInitStruct("ReplicationOriginState",
|
||||
ReplicationOriginShmemSize(),
|
||||
&found);
|
||||
replication_states = replication_states_ctl->states;
|
||||
replication_states = replication_states_ctl->states;
|
||||
|
||||
if (!found)
|
||||
{
|
||||
int i;
|
||||
int i;
|
||||
|
||||
replication_states_ctl->tranche_id = LWLockNewTrancheId();
|
||||
replication_states_ctl->tranche.name = "ReplicationOrigins";
|
||||
@@ -556,7 +557,7 @@ CheckPointReplicationOrigin(void)
|
||||
{
|
||||
ReplicationStateOnDisk disk_state;
|
||||
ReplicationState *curstate = &replication_states[i];
|
||||
XLogRecPtr local_lsn;
|
||||
XLogRecPtr local_lsn;
|
||||
|
||||
if (curstate->roident == InvalidRepOriginId)
|
||||
continue;
|
||||
@@ -636,16 +637,17 @@ void
|
||||
StartupReplicationOrigin(void)
|
||||
{
|
||||
const char *path = "pg_logical/replorigin_checkpoint";
|
||||
int fd;
|
||||
int readBytes;
|
||||
uint32 magic = REPLICATION_STATE_MAGIC;
|
||||
int last_state = 0;
|
||||
pg_crc32c file_crc;
|
||||
pg_crc32c crc;
|
||||
int fd;
|
||||
int readBytes;
|
||||
uint32 magic = REPLICATION_STATE_MAGIC;
|
||||
int last_state = 0;
|
||||
pg_crc32c file_crc;
|
||||
pg_crc32c crc;
|
||||
|
||||
/* don't want to overwrite already existing state */
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
static bool already_started = false;
|
||||
|
||||
Assert(!already_started);
|
||||
already_started = true;
|
||||
#endif
|
||||
@@ -660,8 +662,8 @@ StartupReplicationOrigin(void)
|
||||
fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
|
||||
|
||||
/*
|
||||
* might have had max_replication_slots == 0 last run, or we just brought up a
|
||||
* standby.
|
||||
* might have had max_replication_slots == 0 last run, or we just brought
|
||||
* up a standby.
|
||||
*/
|
||||
if (fd < 0 && errno == ENOENT)
|
||||
return;
|
||||
@@ -681,8 +683,8 @@ StartupReplicationOrigin(void)
|
||||
|
||||
if (magic != REPLICATION_STATE_MAGIC)
|
||||
ereport(PANIC,
|
||||
(errmsg("replication checkpoint has wrong magic %u instead of %u",
|
||||
magic, REPLICATION_STATE_MAGIC)));
|
||||
(errmsg("replication checkpoint has wrong magic %u instead of %u",
|
||||
magic, REPLICATION_STATE_MAGIC)));
|
||||
|
||||
/* we can skip locking here, no other access is possible */
|
||||
|
||||
@@ -697,7 +699,7 @@ StartupReplicationOrigin(void)
|
||||
if (readBytes == sizeof(crc))
|
||||
{
|
||||
/* not pretty, but simple ... */
|
||||
file_crc = *(pg_crc32c*) &disk_state;
|
||||
file_crc = *(pg_crc32c *) &disk_state;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -731,8 +733,8 @@ StartupReplicationOrigin(void)
|
||||
|
||||
elog(LOG, "recovered replication state of node %u to %X/%X",
|
||||
disk_state.roident,
|
||||
(uint32)(disk_state.remote_lsn >> 32),
|
||||
(uint32)disk_state.remote_lsn);
|
||||
(uint32) (disk_state.remote_lsn >> 32),
|
||||
(uint32) disk_state.remote_lsn);
|
||||
}
|
||||
|
||||
/* now check checksum */
|
||||
@@ -756,18 +758,18 @@ replorigin_redo(XLogReaderState *record)
|
||||
case XLOG_REPLORIGIN_SET:
|
||||
{
|
||||
xl_replorigin_set *xlrec =
|
||||
(xl_replorigin_set *) XLogRecGetData(record);
|
||||
(xl_replorigin_set *) XLogRecGetData(record);
|
||||
|
||||
replorigin_advance(xlrec->node_id,
|
||||
xlrec->remote_lsn, record->EndRecPtr,
|
||||
xlrec->force /* backward */,
|
||||
false /* WAL log */);
|
||||
xlrec->force /* backward */ ,
|
||||
false /* WAL log */ );
|
||||
break;
|
||||
}
|
||||
case XLOG_REPLORIGIN_DROP:
|
||||
{
|
||||
xl_replorigin_drop *xlrec;
|
||||
int i;
|
||||
int i;
|
||||
|
||||
xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
|
||||
|
||||
@@ -812,7 +814,7 @@ replorigin_advance(RepOriginId node,
|
||||
XLogRecPtr remote_commit, XLogRecPtr local_commit,
|
||||
bool go_backward, bool wal_log)
|
||||
{
|
||||
int i;
|
||||
int i;
|
||||
ReplicationState *replication_state = NULL;
|
||||
ReplicationState *free_state = NULL;
|
||||
|
||||
@@ -899,6 +901,7 @@ replorigin_advance(RepOriginId node,
|
||||
if (wal_log)
|
||||
{
|
||||
xl_replorigin_set xlrec;
|
||||
|
||||
xlrec.remote_lsn = remote_commit;
|
||||
xlrec.node_id = node;
|
||||
xlrec.force = go_backward;
|
||||
@@ -911,8 +914,8 @@ replorigin_advance(RepOriginId node,
|
||||
|
||||
/*
|
||||
* Due to - harmless - race conditions during a checkpoint we could see
|
||||
* values here that are older than the ones we already have in
|
||||
* memory. Don't overwrite those.
|
||||
* values here that are older than the ones we already have in memory.
|
||||
* Don't overwrite those.
|
||||
*/
|
||||
if (go_backward || replication_state->remote_lsn < remote_commit)
|
||||
replication_state->remote_lsn = remote_commit;
|
||||
@@ -973,7 +976,6 @@ replorigin_get_progress(RepOriginId node, bool flush)
|
||||
static void
|
||||
ReplicationOriginExitCleanup(int code, Datum arg)
|
||||
{
|
||||
|
||||
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
|
||||
|
||||
if (session_replication_state != NULL &&
|
||||
@@ -1000,8 +1002,8 @@ void
|
||||
replorigin_session_setup(RepOriginId node)
|
||||
{
|
||||
static bool registered_cleanup;
|
||||
int i;
|
||||
int free_slot = -1;
|
||||
int i;
|
||||
int free_slot = -1;
|
||||
|
||||
if (!registered_cleanup)
|
||||
{
|
||||
@@ -1014,7 +1016,7 @@ replorigin_session_setup(RepOriginId node)
|
||||
if (session_replication_state != NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("cannot setup replication origin when one is already setup")));
|
||||
errmsg("cannot setup replication origin when one is already setup")));
|
||||
|
||||
/* Lock exclusively, as we may have to create a new table entry. */
|
||||
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
|
||||
@@ -1043,8 +1045,8 @@ replorigin_session_setup(RepOriginId node)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_IN_USE),
|
||||
errmsg("replication identiefer %d is already active for pid %d",
|
||||
curstate->roident, curstate->acquired_by)));
|
||||
errmsg("replication identiefer %d is already active for pid %d",
|
||||
curstate->roident, curstate->acquired_by)));
|
||||
}
|
||||
|
||||
/* ok, found slot */
|
||||
@@ -1126,8 +1128,8 @@ replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
|
||||
XLogRecPtr
|
||||
replorigin_session_get_progress(bool flush)
|
||||
{
|
||||
XLogRecPtr remote_lsn;
|
||||
XLogRecPtr local_lsn;
|
||||
XLogRecPtr remote_lsn;
|
||||
XLogRecPtr local_lsn;
|
||||
|
||||
Assert(session_replication_state != NULL);
|
||||
|
||||
@@ -1158,7 +1160,7 @@ replorigin_session_get_progress(bool flush)
|
||||
Datum
|
||||
pg_replication_origin_create(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *name;
|
||||
char *name;
|
||||
RepOriginId roident;
|
||||
|
||||
replorigin_check_prerequisites(false, false);
|
||||
@@ -1177,7 +1179,7 @@ pg_replication_origin_create(PG_FUNCTION_ARGS)
|
||||
Datum
|
||||
pg_replication_origin_drop(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *name;
|
||||
char *name;
|
||||
RepOriginId roident;
|
||||
|
||||
replorigin_check_prerequisites(false, false);
|
||||
@@ -1200,7 +1202,7 @@ pg_replication_origin_drop(PG_FUNCTION_ARGS)
|
||||
Datum
|
||||
pg_replication_origin_oid(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *name;
|
||||
char *name;
|
||||
RepOriginId roident;
|
||||
|
||||
replorigin_check_prerequisites(false, false);
|
||||
@@ -1221,7 +1223,7 @@ pg_replication_origin_oid(PG_FUNCTION_ARGS)
|
||||
Datum
|
||||
pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *name;
|
||||
char *name;
|
||||
RepOriginId origin;
|
||||
|
||||
replorigin_check_prerequisites(true, false);
|
||||
@@ -1329,8 +1331,8 @@ Datum
|
||||
pg_replication_origin_advance(PG_FUNCTION_ARGS)
|
||||
{
|
||||
text *name = PG_GETARG_TEXT_P(0);
|
||||
XLogRecPtr remote_commit = PG_GETARG_LSN(1);
|
||||
RepOriginId node;
|
||||
XLogRecPtr remote_commit = PG_GETARG_LSN(1);
|
||||
RepOriginId node;
|
||||
|
||||
replorigin_check_prerequisites(true, false);
|
||||
|
||||
@@ -1345,7 +1347,7 @@ pg_replication_origin_advance(PG_FUNCTION_ARGS)
|
||||
* set up the initial replication state, but not for replay.
|
||||
*/
|
||||
replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
|
||||
true /* go backward */, true /* wal log */);
|
||||
true /* go backward */ , true /* wal log */ );
|
||||
|
||||
UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
|
||||
|
||||
@@ -1365,7 +1367,7 @@ pg_replication_origin_progress(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *name;
|
||||
bool flush;
|
||||
RepOriginId roident;
|
||||
RepOriginId roident;
|
||||
XLogRecPtr remote_lsn = InvalidXLogRecPtr;
|
||||
|
||||
replorigin_check_prerequisites(true, true);
|
||||
@@ -1456,7 +1458,7 @@ pg_show_replication_origin_status(PG_FUNCTION_ARGS)
|
||||
* silently accept that it might be gone.
|
||||
*/
|
||||
if (replorigin_by_oid(state->roident, true,
|
||||
&roname))
|
||||
&roname))
|
||||
{
|
||||
values[1] = CStringGetTextDatum(roname);
|
||||
nulls[1] = false;
|
||||
@@ -1464,7 +1466,7 @@ pg_show_replication_origin_status(PG_FUNCTION_ARGS)
|
||||
|
||||
LWLockAcquire(&state->lock, LW_SHARED);
|
||||
|
||||
values[ 2] = LSNGetDatum(state->remote_lsn);
|
||||
values[2] = LSNGetDatum(state->remote_lsn);
|
||||
nulls[2] = false;
|
||||
|
||||
values[3] = LSNGetDatum(state->local_lsn);
|
||||
|
@@ -1337,6 +1337,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
switch (change->action)
|
||||
{
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
||||
|
||||
/*
|
||||
* Confirmation for speculative insertion arrived. Simply
|
||||
* use as a normal record. It'll be cleaned up at the end
|
||||
@@ -1380,10 +1381,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
goto change_done;
|
||||
|
||||
/*
|
||||
* For now ignore sequence changes entirely. Most of
|
||||
* the time they don't log changes using records we
|
||||
* understand, so it doesn't make sense to handle the
|
||||
* few cases we do.
|
||||
* For now ignore sequence changes entirely. Most of the
|
||||
* time they don't log changes using records we
|
||||
* understand, so it doesn't make sense to handle the few
|
||||
* cases we do.
|
||||
*/
|
||||
if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
|
||||
goto change_done;
|
||||
@@ -1395,9 +1396,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
rb->apply_change(rb, txn, relation, change);
|
||||
|
||||
/*
|
||||
* Only clear reassembled toast chunks if we're
|
||||
* sure they're not required anymore. The creator
|
||||
* of the tuple tells us.
|
||||
* Only clear reassembled toast chunks if we're sure
|
||||
* they're not required anymore. The creator of the
|
||||
* tuple tells us.
|
||||
*/
|
||||
if (change->data.tp.clear_toast_afterwards)
|
||||
ReorderBufferToastReset(rb, txn);
|
||||
@@ -1418,7 +1419,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
change);
|
||||
}
|
||||
|
||||
change_done:
|
||||
change_done:
|
||||
|
||||
/*
|
||||
* Either speculative insertion was confirmed, or it was
|
||||
* unsuccessful and the record isn't needed anymore.
|
||||
@@ -1437,6 +1439,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
break;
|
||||
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
|
||||
|
||||
/*
|
||||
* Speculative insertions are dealt with by delaying the
|
||||
* processing of the insert until the confirmation record
|
||||
@@ -1704,9 +1707,9 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
|
||||
txn->final_lsn = lsn;
|
||||
|
||||
/*
|
||||
* Process cache invalidation messages if there are any. Even if we're
|
||||
* not interested in the transaction's contents, it could have manipulated
|
||||
* the catalog and we need to update the caches according to that.
|
||||
* Process cache invalidation messages if there are any. Even if we're not
|
||||
* interested in the transaction's contents, it could have manipulated the
|
||||
* catalog and we need to update the caches according to that.
|
||||
*/
|
||||
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
|
||||
{
|
||||
@@ -2068,7 +2071,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
|
||||
switch (change->action)
|
||||
{
|
||||
/* fall through these, they're all similar enough */
|
||||
/* fall through these, they're all similar enough */
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
@@ -2322,7 +2325,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
/* restore individual stuff */
|
||||
switch (change->action)
|
||||
{
|
||||
/* fall through these, they're all similar enough */
|
||||
/* fall through these, they're all similar enough */
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
|
@@ -153,9 +153,8 @@ struct SnapBuild
|
||||
TransactionId xmax;
|
||||
|
||||
/*
|
||||
* Don't replay commits from an LSN < this LSN. This can be set
|
||||
* externally but it will also be advanced (never retreat) from within
|
||||
* snapbuild.c.
|
||||
* Don't replay commits from an LSN < this LSN. This can be set externally
|
||||
* but it will also be advanced (never retreat) from within snapbuild.c.
|
||||
*/
|
||||
XLogRecPtr start_decoding_at;
|
||||
|
||||
@@ -244,7 +243,7 @@ struct SnapBuild
|
||||
* removes knowledge about the previously used resowner, so we save it here.
|
||||
*/
|
||||
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
|
||||
static bool ExportInProgress = false;
|
||||
static bool ExportInProgress = false;
|
||||
|
||||
/* transaction state manipulation functions */
|
||||
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
|
||||
@@ -599,7 +598,7 @@ SnapBuildExportSnapshot(SnapBuild *builder)
|
||||
|
||||
ereport(LOG,
|
||||
(errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
|
||||
"exported logical decoding snapshot: \"%s\" with %u transaction IDs",
|
||||
"exported logical decoding snapshot: \"%s\" with %u transaction IDs",
|
||||
snap->xcnt,
|
||||
snapname, snap->xcnt)));
|
||||
return snapname;
|
||||
@@ -904,8 +903,8 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
|
||||
ereport(LOG,
|
||||
(errmsg("logical decoding found consistent point at %X/%X",
|
||||
(uint32) (lsn >> 32), (uint32) lsn),
|
||||
errdetail("Transaction ID %u finished; no more running transactions.",
|
||||
xid)));
|
||||
errdetail("Transaction ID %u finished; no more running transactions.",
|
||||
xid)));
|
||||
builder->state = SNAPBUILD_CONSISTENT;
|
||||
}
|
||||
}
|
||||
@@ -1232,8 +1231,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
|
||||
{
|
||||
ereport(DEBUG1,
|
||||
(errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
|
||||
(uint32) (lsn >> 32), (uint32) lsn),
|
||||
errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
|
||||
(uint32) (lsn >> 32), (uint32) lsn),
|
||||
errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
|
||||
builder->initial_xmin_horizon, running->oldestRunningXid)));
|
||||
return true;
|
||||
}
|
||||
@@ -1252,8 +1251,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
|
||||
builder->start_decoding_at = lsn + 1;
|
||||
|
||||
/* As no transactions were running xmin/xmax can be trivially set. */
|
||||
builder->xmin = running->nextXid; /* < are finished */
|
||||
builder->xmax = running->nextXid; /* >= are running */
|
||||
builder->xmin = running->nextXid; /* < are finished */
|
||||
builder->xmax = running->nextXid; /* >= are running */
|
||||
|
||||
/* so we can safely use the faster comparisons */
|
||||
Assert(TransactionIdIsNormal(builder->xmin));
|
||||
@@ -1302,8 +1301,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
|
||||
* currently running transactions have finished. We'll update both
|
||||
* while waiting for the pending transactions to finish.
|
||||
*/
|
||||
builder->xmin = running->nextXid; /* < are finished */
|
||||
builder->xmax = running->nextXid; /* >= are running */
|
||||
builder->xmin = running->nextXid; /* < are finished */
|
||||
builder->xmax = running->nextXid; /* >= are running */
|
||||
|
||||
/* so we can safely use the faster comparisons */
|
||||
Assert(TransactionIdIsNormal(builder->xmin));
|
||||
@@ -1688,7 +1687,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
|
||||
|
||||
INIT_CRC32C(checksum);
|
||||
COMP_CRC32C(checksum,
|
||||
((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
|
||||
((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
|
||||
SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
|
||||
|
||||
/* read SnapBuild */
|
||||
|
@@ -84,7 +84,7 @@ typedef struct ReplicationSlotOnDisk
|
||||
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
|
||||
|
||||
#define SLOT_MAGIC 0x1051CA1 /* format identifier */
|
||||
#define SLOT_VERSION 2 /* version for new files */
|
||||
#define SLOT_VERSION 2 /* version for new files */
|
||||
|
||||
/* Control array for replication slot management */
|
||||
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
|
||||
@@ -349,8 +349,8 @@ ReplicationSlotAcquire(const char *name)
|
||||
if (active_pid != 0)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_IN_USE),
|
||||
errmsg("replication slot \"%s\" is already active for pid %d",
|
||||
name, active_pid)));
|
||||
errmsg("replication slot \"%s\" is already active for pid %d",
|
||||
name, active_pid)));
|
||||
|
||||
/* We made this slot active, so it's ours now. */
|
||||
MyReplicationSlot = slot;
|
||||
|
@@ -99,9 +99,9 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
|
||||
CheckLogicalDecodingRequirements();
|
||||
|
||||
/*
|
||||
* Acquire a logical decoding slot, this will check for conflicting
|
||||
* names. Initially create it as ephemeral - that allows us to nicely
|
||||
* handle errors during initialization because it'll get dropped if this
|
||||
* Acquire a logical decoding slot, this will check for conflicting names.
|
||||
* Initially create it as ephemeral - that allows us to nicely handle
|
||||
* errors during initialization because it'll get dropped if this
|
||||
* transaction fails. We'll make it persistent at the end.
|
||||
*/
|
||||
ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
|
||||
|
@@ -329,7 +329,7 @@ GetReplicationApplyDelay(void)
|
||||
long secs;
|
||||
int usecs;
|
||||
|
||||
TimestampTz chunckReplayStartTime;
|
||||
TimestampTz chunckReplayStartTime;
|
||||
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
receivePtr = walrcv->receivedUpto;
|
||||
|
@@ -781,6 +781,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||
else
|
||||
{
|
||||
CheckLogicalDecodingRequirements();
|
||||
|
||||
/*
|
||||
* Initially create the slot as ephemeral - that allows us to nicely
|
||||
* handle errors during initialization because it'll get dropped if
|
||||
@@ -1266,9 +1267,9 @@ exec_replication_command(const char *cmd_string)
|
||||
MemoryContext old_context;
|
||||
|
||||
/*
|
||||
* Log replication command if log_replication_commands is enabled.
|
||||
* Even when it's disabled, log the command with DEBUG1 level for
|
||||
* backward compatibility.
|
||||
* Log replication command if log_replication_commands is enabled. Even
|
||||
* when it's disabled, log the command with DEBUG1 level for backward
|
||||
* compatibility.
|
||||
*/
|
||||
ereport(log_replication_commands ? LOG : DEBUG1,
|
||||
(errmsg("received replication command: %s", cmd_string)));
|
||||
@@ -2663,8 +2664,8 @@ WalSndWakeup(void)
|
||||
|
||||
for (i = 0; i < max_wal_senders; i++)
|
||||
{
|
||||
Latch *latch;
|
||||
WalSnd *walsnd = &WalSndCtl->walsnds[i];
|
||||
Latch *latch;
|
||||
WalSnd *walsnd = &WalSndCtl->walsnds[i];
|
||||
|
||||
/*
|
||||
* Get latch pointer with spinlock held, for the unlikely case that
|
||||
|
Reference in New Issue
Block a user