mirror of
https://github.com/postgres/postgres.git
synced 2025-07-07 00:36:50 +03:00
pgindent run for 9.4
This includes removing tabs after periods in C comments, which was applied to back branches, so this change should not effect backpatching.
This commit is contained in:
@ -8,21 +8,21 @@
|
||||
* src/backend/replication/logical/logical.c
|
||||
*
|
||||
* NOTES
|
||||
* This file coordinates interaction between the various modules that
|
||||
* together provide logical decoding, primarily by providing so
|
||||
* called LogicalDecodingContexts. The goal is to encapsulate most of the
|
||||
* internal complexity for consumers of logical decoding, so they can
|
||||
* create and consume a changestream with a low amount of code. Builtin
|
||||
* consumers are the walsender and SQL SRF interface, but it's possible to
|
||||
* add further ones without changing core code, e.g. to consume changes in
|
||||
* a bgworker.
|
||||
* This file coordinates interaction between the various modules that
|
||||
* together provide logical decoding, primarily by providing so
|
||||
* called LogicalDecodingContexts. The goal is to encapsulate most of the
|
||||
* internal complexity for consumers of logical decoding, so they can
|
||||
* create and consume a changestream with a low amount of code. Builtin
|
||||
* consumers are the walsender and SQL SRF interface, but it's possible to
|
||||
* add further ones without changing core code, e.g. to consume changes in
|
||||
* a bgworker.
|
||||
*
|
||||
* The idea is that a consumer provides three callbacks, one to read WAL,
|
||||
* one to prepare a data write, and a final one for actually writing since
|
||||
* their implementation depends on the type of consumer. Check
|
||||
* logicalfuncs.c for an example implementation of a fairly simple consumer
|
||||
* and a implementation of a WAL reading callback that's suitable for
|
||||
* simple consumers.
|
||||
* The idea is that a consumer provides three callbacks, one to read WAL,
|
||||
* one to prepare a data write, and a final one for actually writing since
|
||||
* their implementation depends on the type of consumer. Check
|
||||
* logicalfuncs.c for an example implementation of a fairly simple consumer
|
||||
* and a implementation of a WAL reading callback that's suitable for
|
||||
* simple consumers.
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
@ -56,13 +56,13 @@ typedef struct LogicalErrorCallbackState
|
||||
/* wrappers around output plugin callbacks */
|
||||
static void output_plugin_error_callback(void *arg);
|
||||
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
|
||||
bool is_init);
|
||||
bool is_init);
|
||||
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
|
||||
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
|
||||
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn);
|
||||
XLogRecPtr commit_lsn);
|
||||
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change);
|
||||
Relation relation, ReorderBufferChange *change);
|
||||
|
||||
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
|
||||
|
||||
@ -90,18 +90,18 @@ CheckLogicalDecodingRequirements(void)
|
||||
*
|
||||
* There's basically three things missing to allow this:
|
||||
* 1) We need to be able to correctly and quickly identify the timeline a
|
||||
* LSN belongs to
|
||||
* LSN belongs to
|
||||
* 2) We need to force hot_standby_feedback to be enabled at all times so
|
||||
* the primary cannot remove rows we need.
|
||||
* the primary cannot remove rows we need.
|
||||
* 3) support dropping replication slots referring to a database, in
|
||||
* dbase_redo. There can't be any active ones due to HS recovery
|
||||
* conflicts, so that should be relatively easy.
|
||||
* dbase_redo. There can't be any active ones due to HS recovery
|
||||
* conflicts, so that should be relatively easy.
|
||||
* ----
|
||||
*/
|
||||
if (RecoveryInProgress())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("logical decoding cannot be used while in recovery")));
|
||||
errmsg("logical decoding cannot be used while in recovery")));
|
||||
}
|
||||
|
||||
/*
|
||||
@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
|
||||
LogicalOutputPluginWriterWrite do_write)
|
||||
{
|
||||
ReplicationSlot *slot;
|
||||
MemoryContext context, old_context;
|
||||
MemoryContext context,
|
||||
old_context;
|
||||
LogicalDecodingContext *ctx;
|
||||
|
||||
/* shorter lines... */
|
||||
@ -133,7 +134,10 @@ StartupDecodingContext(List *output_plugin_options,
|
||||
|
||||
ctx->context = context;
|
||||
|
||||
/* (re-)load output plugins, so we detect a bad (removed) output plugin now. */
|
||||
/*
|
||||
* (re-)load output plugins, so we detect a bad (removed) output plugin
|
||||
* now.
|
||||
*/
|
||||
LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
|
||||
|
||||
/*
|
||||
@ -195,10 +199,10 @@ CreateInitDecodingContext(char *plugin,
|
||||
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
||||
LogicalOutputPluginWriterWrite do_write)
|
||||
{
|
||||
TransactionId xmin_horizon = InvalidTransactionId;
|
||||
TransactionId xmin_horizon = InvalidTransactionId;
|
||||
ReplicationSlot *slot;
|
||||
LogicalDecodingContext *ctx;
|
||||
MemoryContext old_context;
|
||||
MemoryContext old_context;
|
||||
|
||||
/* shorter lines... */
|
||||
slot = MyReplicationSlot;
|
||||
@ -219,8 +223,8 @@ CreateInitDecodingContext(char *plugin,
|
||||
if (slot->data.database != MyDatabaseId)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("replication slot \"%s\" was not created in this database",
|
||||
NameStr(slot->data.name))));
|
||||
errmsg("replication slot \"%s\" was not created in this database",
|
||||
NameStr(slot->data.name))));
|
||||
|
||||
if (IsTransactionState() &&
|
||||
GetTopTransactionIdIfAny() != InvalidTransactionId)
|
||||
@ -252,9 +256,9 @@ CreateInitDecodingContext(char *plugin,
|
||||
*/
|
||||
if (!RecoveryInProgress())
|
||||
{
|
||||
XLogRecPtr flushptr;
|
||||
XLogRecPtr flushptr;
|
||||
|
||||
/* start at current insert position*/
|
||||
/* start at current insert position */
|
||||
slot->data.restart_lsn = GetXLogInsertRecPtr();
|
||||
|
||||
/* make sure we have enough information to start */
|
||||
@ -307,8 +311,8 @@ CreateInitDecodingContext(char *plugin,
|
||||
LWLockRelease(ProcArrayLock);
|
||||
|
||||
/*
|
||||
* tell the snapshot builder to only assemble snapshot once reaching
|
||||
* the a running_xact's record with the respective xmin.
|
||||
* tell the snapshot builder to only assemble snapshot once reaching the a
|
||||
* running_xact's record with the respective xmin.
|
||||
*/
|
||||
xmin_horizon = slot->data.catalog_xmin;
|
||||
|
||||
@ -316,7 +320,7 @@ CreateInitDecodingContext(char *plugin,
|
||||
ReplicationSlotSave();
|
||||
|
||||
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
|
||||
read_page, prepare_write, do_write);
|
||||
read_page, prepare_write, do_write);
|
||||
|
||||
/* call output plugin initialization callback */
|
||||
old_context = MemoryContextSwitchTo(ctx->context);
|
||||
@ -352,7 +356,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
|
||||
{
|
||||
LogicalDecodingContext *ctx;
|
||||
ReplicationSlot *slot;
|
||||
MemoryContext old_context;
|
||||
MemoryContext old_context;
|
||||
|
||||
/* shorter lines... */
|
||||
slot = MyReplicationSlot;
|
||||
@ -370,8 +374,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
|
||||
if (slot->data.database != MyDatabaseId)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
(errmsg("replication slot \"%s\" was not created in this database",
|
||||
NameStr(slot->data.name)))));
|
||||
(errmsg("replication slot \"%s\" was not created in this database",
|
||||
NameStr(slot->data.name)))));
|
||||
|
||||
if (start_lsn == InvalidXLogRecPtr)
|
||||
{
|
||||
@ -385,14 +389,14 @@ CreateDecodingContext(XLogRecPtr start_lsn,
|
||||
* pretty common for a client to acknowledge a LSN it doesn't have to
|
||||
* do anything for, and thus didn't store persistently, because the
|
||||
* xlog records didn't result in anything relevant for logical
|
||||
* decoding. Clients have to be able to do that to support
|
||||
* synchronous replication.
|
||||
* decoding. Clients have to be able to do that to support synchronous
|
||||
* replication.
|
||||
*/
|
||||
start_lsn = slot->data.confirmed_flush;
|
||||
elog(DEBUG1, "cannot stream from %X/%X, minimum is %X/%X, forwarding",
|
||||
(uint32)(start_lsn >> 32), (uint32)start_lsn,
|
||||
(uint32)(slot->data.confirmed_flush >> 32),
|
||||
(uint32)slot->data.confirmed_flush);
|
||||
(uint32) (start_lsn >> 32), (uint32) start_lsn,
|
||||
(uint32) (slot->data.confirmed_flush >> 32),
|
||||
(uint32) slot->data.confirmed_flush);
|
||||
}
|
||||
|
||||
ctx = StartupDecodingContext(output_plugin_options,
|
||||
@ -409,10 +413,10 @@ CreateDecodingContext(XLogRecPtr start_lsn,
|
||||
(errmsg("starting logical decoding for slot %s",
|
||||
NameStr(slot->data.name)),
|
||||
errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X",
|
||||
(uint32)(slot->data.confirmed_flush >> 32),
|
||||
(uint32)slot->data.confirmed_flush,
|
||||
(uint32)(slot->data.restart_lsn >> 32),
|
||||
(uint32)slot->data.restart_lsn)));
|
||||
(uint32) (slot->data.confirmed_flush >> 32),
|
||||
(uint32) slot->data.confirmed_flush,
|
||||
(uint32) (slot->data.restart_lsn >> 32),
|
||||
(uint32) slot->data.restart_lsn)));
|
||||
|
||||
return ctx;
|
||||
}
|
||||
@ -438,8 +442,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
|
||||
startptr = ctx->slot->data.restart_lsn;
|
||||
|
||||
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
|
||||
(uint32)(ctx->slot->data.restart_lsn >> 32),
|
||||
(uint32)ctx->slot->data.restart_lsn);
|
||||
(uint32) (ctx->slot->data.restart_lsn >> 32),
|
||||
(uint32) ctx->slot->data.restart_lsn);
|
||||
|
||||
/* Wait for a consistent starting point */
|
||||
for (;;)
|
||||
@ -543,14 +547,15 @@ static void
|
||||
output_plugin_error_callback(void *arg)
|
||||
{
|
||||
LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
|
||||
|
||||
/* not all callbacks have an associated LSN */
|
||||
if (state->report_location != InvalidXLogRecPtr)
|
||||
errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
|
||||
NameStr(state->ctx->slot->data.name),
|
||||
NameStr(state->ctx->slot->data.plugin),
|
||||
state->callback_name,
|
||||
(uint32)(state->report_location >> 32),
|
||||
(uint32)state->report_location);
|
||||
(uint32) (state->report_location >> 32),
|
||||
(uint32) state->report_location);
|
||||
else
|
||||
errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
|
||||
NameStr(state->ctx->slot->data.name),
|
||||
@ -643,7 +648,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
|
||||
|
||||
static void
|
||||
commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
XLogRecPtr commit_lsn)
|
||||
XLogRecPtr commit_lsn)
|
||||
{
|
||||
LogicalDecodingContext *ctx = cache->private_data;
|
||||
LogicalErrorCallbackState state;
|
||||
@ -652,7 +657,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
/* Push callback + info on the error context stack */
|
||||
state.ctx = ctx;
|
||||
state.callback_name = "commit";
|
||||
state.report_location = txn->final_lsn; /* beginning of commit record */
|
||||
state.report_location = txn->final_lsn; /* beginning of commit record */
|
||||
errcallback.callback = output_plugin_error_callback;
|
||||
errcallback.arg = (void *) &state;
|
||||
errcallback.previous = error_context_stack;
|
||||
@ -672,7 +677,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
|
||||
static void
|
||||
change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change)
|
||||
Relation relation, ReorderBufferChange *change)
|
||||
{
|
||||
LogicalDecodingContext *ctx = cache->private_data;
|
||||
LogicalErrorCallbackState state;
|
||||
@ -690,6 +695,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
/* set output state */
|
||||
ctx->accept_writes = true;
|
||||
ctx->write_xid = txn->xid;
|
||||
|
||||
/*
|
||||
* report this change's lsn so replies from clients can give an up2date
|
||||
* answer. This won't ever be enough (and shouldn't be!) to confirm
|
||||
@ -715,7 +721,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
|
||||
void
|
||||
LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
|
||||
{
|
||||
bool updated_xmin = false;
|
||||
bool updated_xmin = false;
|
||||
ReplicationSlot *slot;
|
||||
|
||||
slot = MyReplicationSlot;
|
||||
@ -725,16 +731,17 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
|
||||
/*
|
||||
* don't overwrite if we already have a newer xmin. This can
|
||||
* happen if we restart decoding in a slot.
|
||||
* don't overwrite if we already have a newer xmin. This can happen if we
|
||||
* restart decoding in a slot.
|
||||
*/
|
||||
if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
|
||||
{
|
||||
}
|
||||
|
||||
/*
|
||||
* If the client has already confirmed up to this lsn, we directly
|
||||
* can mark this as accepted. This can happen if we restart
|
||||
* decoding in a slot.
|
||||
* If the client has already confirmed up to this lsn, we directly can
|
||||
* mark this as accepted. This can happen if we restart decoding in a
|
||||
* slot.
|
||||
*/
|
||||
else if (current_lsn <= slot->data.confirmed_flush)
|
||||
{
|
||||
@ -744,6 +751,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
|
||||
/* our candidate can directly be used */
|
||||
updated_xmin = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Only increase if the previous values have been applied, otherwise we
|
||||
* might never end up updating if the receiver acks too slowly.
|
||||
@ -770,7 +778,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
|
||||
void
|
||||
LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
|
||||
{
|
||||
bool updated_lsn = false;
|
||||
bool updated_lsn = false;
|
||||
ReplicationSlot *slot;
|
||||
|
||||
slot = MyReplicationSlot;
|
||||
@ -781,13 +789,14 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
|
||||
/* don't overwrite if have a newer restart lsn*/
|
||||
/* don't overwrite if have a newer restart lsn */
|
||||
if (restart_lsn <= slot->data.restart_lsn)
|
||||
{
|
||||
}
|
||||
|
||||
/*
|
||||
* We might have already flushed far enough to directly accept this lsn, in
|
||||
* this case there is no need to check for existing candidate LSNs
|
||||
* We might have already flushed far enough to directly accept this lsn,
|
||||
* in this case there is no need to check for existing candidate LSNs
|
||||
*/
|
||||
else if (current_lsn <= slot->data.confirmed_flush)
|
||||
{
|
||||
@ -797,6 +806,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
|
||||
/* our candidate can directly be used */
|
||||
updated_lsn = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Only increase if the previous values have been applied, otherwise we
|
||||
* might never end up updating if the receiver acks too slowly. A missed
|
||||
@ -896,6 +906,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
|
||||
ReplicationSlotSave();
|
||||
elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
|
||||
}
|
||||
|
||||
/*
|
||||
* Now the new xmin is safely on disk, we can let the global value
|
||||
* advance. We do not take ProcArrayLock or similar since we only
|
||||
|
Reference in New Issue
Block a user