1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

Revert "Logical decoding of sequences"

This reverts a sequence of commits, implementing features related to
logical decoding and replication of sequences:

 - 0da92dc530
 - 80901b3291
 - b779d7d8fd
 - d5ed9da41d
 - a180c2b34d
 - 75b1521dae
 - 2d2232933b
 - 002c9dd97a
 - 05843b1aa4

The implementation has issues, mostly due to combining transactional and
non-transactional behavior of sequences. It's not clear how this could
be fixed, but it'll require reworking significant part of the patch.

Discussion: https://postgr.es/m/95345a19-d508-63d1-860a-f5c2f41e8d40@enterprisedb.com
This commit is contained in:
Tomas Vondra
2022-04-07 18:13:13 +02:00
parent d7ab2a9a3c
commit 2c7ea57e56
73 changed files with 605 additions and 4762 deletions

View File

@@ -42,7 +42,6 @@
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
#include "commands/sequence.h"
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -64,7 +63,6 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
/* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -1252,132 +1250,3 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
}
/*
* DecodeSeqTuple
* decode tuple describing the sequence increment
*
* Sequences are represented as a table with a single row, which gets updated
* by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we
* simply copy it into the tuplebuf (similar to seq_redo).
*/
static void
DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
{
int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
Assert(datalen >= 0);
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
ItemPointerSetInvalid(&tuple->tuple.t_self);
tuple->tuple.t_tableOid = InvalidOid;
memcpy(((char *) tuple->tuple.t_data),
data + sizeof(xl_seq_rec),
SizeofHeapTupleHeader);
memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
datalen);
}
/*
* Handle sequence decode
*
* Decoding sequences is a bit tricky, because while most sequence actions
* are non-transactional (not subject to rollback), some need to be handled
* as transactional.
*
* By default, a sequence increment is non-transactional - we must not queue
* it in a transaction as other changes, because the transaction might get
* rolled back and we'd discard the increment. The downstream would not be
* notified about the increment, which is wrong.
*
* On the other hand, the sequence may be created in a transaction. In this
* case we *should* queue the change as other changes in the transaction,
* because we don't want to send the increments for unknown sequence to the
* plugin - it might get confused about which sequence it's related to etc.
*/
void
sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
ReorderBufferTupleBuf *tuplebuf;
RelFileNode target_node;
XLogReaderState *r = buf->record;
char *tupledata = NULL;
Size tuplelen;
Size datalen = 0;
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
xl_seq_rec *xlrec;
Snapshot snapshot;
RepOriginId origin_id = XLogRecGetOrigin(r);
bool transactional;
/* only decode changes flagged with XLOG_SEQ_LOG */
if (info != XLOG_SEQ_LOG)
elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
* point in decoding messages.
*/
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
ctx->fast_forward)
return;
/* only interested in our database */
XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
if (target_node.dbNode != ctx->slot->data.database)
return;
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
tupledata = XLogRecGetData(r);
datalen = XLogRecGetDataLen(r);
tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
/* extract the WAL record, with "created" flag */
xlrec = (xl_seq_rec *) XLogRecGetData(r);
/* XXX how could we have sequence change without data? */
if(!datalen || !tupledata)
return;
tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeSeqTuple(tupledata, datalen, tuplebuf);
/*
* Should we handle the sequence increment as transactional or not?
*
* If the sequence was created in a still-running transaction, treat
* it as transactional and queue the increments. Otherwise it needs
* to be treated as non-transactional, in which case we send it to
* the plugin right away.
*/
transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
target_node,
xlrec->created);
/* Skip the change if already processed (per the snapshot). */
if (transactional &&
!SnapBuildProcessChange(builder, xid, buf->origptr))
return;
else if (!transactional &&
(SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
SnapBuildXactNeedsSkip(builder, buf->origptr)))
return;
/* Queue the increment (or send immediately if not transactional). */
snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
origin_id, target_node, transactional,
xlrec->created, tuplebuf);
}

View File

@@ -73,10 +73,6 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr sequence_lsn, Relation rel,
bool transactional,
int64 last_value, int64 log_cnt, bool is_called);
/* streaming callbacks */
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -94,10 +90,6 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr sequence_lsn, Relation rel,
bool transactional,
int64 last_value, int64 log_cnt, bool is_called);
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
@@ -226,7 +218,6 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
ctx->reorder->sequence = sequence_cb_wrapper;
/*
* To support streaming, we require start/stop/abort/commit/change
@@ -243,7 +234,6 @@ StartupDecodingContext(List *output_plugin_options,
(ctx->callbacks.stream_commit_cb != NULL) ||
(ctx->callbacks.stream_change_cb != NULL) ||
(ctx->callbacks.stream_message_cb != NULL) ||
(ctx->callbacks.stream_sequence_cb != NULL) ||
(ctx->callbacks.stream_truncate_cb != NULL);
/*
@@ -261,7 +251,6 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->stream_commit = stream_commit_cb_wrapper;
ctx->reorder->stream_change = stream_change_cb_wrapper;
ctx->reorder->stream_message = stream_message_cb_wrapper;
ctx->reorder->stream_sequence = stream_sequence_cb_wrapper;
ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
@@ -1216,42 +1205,6 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
static void
sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr sequence_lsn, Relation rel, bool transactional,
int64 last_value, int64 log_cnt, bool is_called)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
Assert(!ctx->fast_forward);
if (ctx->callbacks.sequence_cb == NULL)
return;
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "sequence";
state.report_location = sequence_lsn;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = sequence_lsn;
/* do the actual work: call callback */
ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
last_value, log_cnt, is_called);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
static void
stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr first_lsn)
@@ -1557,47 +1510,6 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
static void
stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr sequence_lsn, Relation rel,
bool transactional,
int64 last_value, int64 log_cnt, bool is_called)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
Assert(!ctx->fast_forward);
/* We're only supposed to call this when streaming is supported. */
Assert(ctx->streaming);
/* this callback is optional */
if (ctx->callbacks.stream_sequence_cb == NULL)
return;
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "stream_sequence";
state.report_location = sequence_lsn;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = sequence_lsn;
/* do the actual work: call callback */
ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
last_value, log_cnt, is_called);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
static void
stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[],

View File

@@ -662,56 +662,6 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
pq_sendbytes(out, message, sz);
}
/*
* Write SEQUENCE to stream
*/
void
logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
XLogRecPtr lsn, bool transactional,
int64 last_value, int64 log_cnt, bool is_called)
{
uint8 flags = 0;
char *relname;
pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
pq_sendint8(out, flags);
pq_sendint64(out, lsn);
logicalrep_write_namespace(out, RelationGetNamespace(rel));
relname = RelationGetRelationName(rel);
pq_sendstring(out, relname);
pq_sendint8(out, transactional);
pq_sendint64(out, last_value);
pq_sendint64(out, log_cnt);
pq_sendint8(out, is_called);
}
/*
* Read SEQUENCE from the stream.
*/
void
logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
{
/* XXX skipping flags and lsn */
pq_getmsgint(in, 1);
pq_getmsgint64(in);
/* Read relation name from stream */
seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
seqdata->seqname = pstrdup(pq_getmsgstring(in));
seqdata->transactional = pq_getmsgint(in, 1);
seqdata->last_value = pq_getmsgint64(in);
seqdata->log_cnt = pq_getmsgint64(in);
seqdata->is_called = pq_getmsgint(in, 1);
}
/*
* Write relation description to the output stream.
*/
@@ -1286,8 +1236,6 @@ logicalrep_message_type(LogicalRepMsgType action)
return "STREAM ABORT";
case LOGICAL_REP_MSG_STREAM_PREPARE:
return "STREAM PREPARE";
case LOGICAL_REP_MSG_SEQUENCE:
return "SEQUENCE";
}
elog(ERROR, "invalid logical replication message type \"%c\"", action);

View File

@@ -77,40 +77,6 @@
* a bit more memory to the oldest subtransactions, because it's likely
* they are the source for the next sequence of changes.
*
* When decoding sequences, we differentiate between a sequences created
* in a (running) transaction, and sequences created in other (already
* committed) transactions. Changes for sequences created in the same
* top-level transaction are treated as "transactional" i.e. just like
* any other change from that transaction (and discarded in case of a
* rollback). Changes for sequences created earlier are treated as not
* transactional - are processed immediately, as if performed outside
* any transaction (and thus not rolled back).
*
* This mixed behavior is necessary - sequences are non-transactional
* (e.g. ROLLBACK does not undo the sequence increments). But for new
* sequences, we need to handle them in a transactional way, because if
* we ever get some DDL support, the sequence won't exist until the
* transaction gets applied. So we need to ensure the increments don't
* happen until the sequence gets created.
*
* To differentiate which sequences are "old" and which were created
* in a still-running transaction, we track sequences created in running
* transactions in a hash table. Sequences are identified by relfilenode,
* and we track XID of the (sub)transaction that created it. This means
* that if a transaction does something that changes the relfilenode
* (like an alter / reset of a sequence), the new relfilenode will be
* treated as if created in the transaction. The list of sequences gets
* discarded when the transaction completes (commit/rollback).
*
* We don't use the XID to check if it's the same top-level transaction.
* It's enough to know it was created in an in-progress transaction,
* and we know it must be the current one because otherwise it wouldn't
* see the sequence object.
*
* The XID may be valid even for non-transactional sequences - we simply
* keep the XID logged to WAL, it's up to the reorderbuffer to decide if
* the increment is transactional.
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -125,7 +91,6 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "commands/sequence.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -151,13 +116,6 @@ typedef struct ReorderBufferTXNByIdEnt
ReorderBufferTXN *txn;
} ReorderBufferTXNByIdEnt;
/* entry for hash table we use to track sequences created in running xacts */
typedef struct ReorderBufferSequenceEnt
{
RelFileNode rnode;
TransactionId xid;
} ReorderBufferSequenceEnt;
/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
typedef struct ReorderBufferTupleCidKey
{
@@ -388,14 +346,6 @@ ReorderBufferAllocate(void)
buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* hash table of sequences, mapping relfilenode to XID of transaction */
hash_ctl.keysize = sizeof(RelFileNode);
hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
hash_ctl.hcxt = buffer->context;
buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
@@ -582,13 +532,6 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
change->data.truncate.relids = NULL;
}
break;
case REORDER_BUFFER_CHANGE_SEQUENCE:
if (change->data.sequence.tuple)
{
ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
change->data.sequence.tuple = NULL;
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -923,230 +866,6 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
}
}
/*
* Treat the sequence increment as transactional?
*
* The hash table tracks all sequences created in in-progress transactions,
* so we simply do a lookup (the sequence is identified by relfilende). If
* we find a match, the increment should be handled as transactional.
*/
bool
ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
RelFileNode rnode, bool created)
{
bool found = false;
if (created)
return true;
hash_search(rb->sequences,
(void *) &rnode,
HASH_FIND,
&found);
return found;
}
/*
* Cleanup sequences created in in-progress transactions.
*
* There's no way to search by XID, so we simply do a seqscan of all
* the entries in the hash table. Hopefully there are only a couple
* entries in most cases - people generally don't create many new
* sequences over and over.
*/
static void
ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
{
HASH_SEQ_STATUS scan_status;
ReorderBufferSequenceEnt *ent;
hash_seq_init(&scan_status, rb->sequences);
while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
{
/* skip sequences not from this transaction */
if (ent->xid != xid)
continue;
(void) hash_search(rb->sequences,
(void *) &(ent->rnode),
HASH_REMOVE, NULL);
}
}
/*
* A transactional sequence increment is queued to be processed upon commit
* and a non-transactional increment gets processed immediately.
*
* A sequence update may be both transactional and non-transactional. When
* created in a running transaction, treat it as transactional and queue
* the change in it. Otherwise treat it as non-transactional, so that we
* don't forget the increment in case of a rollback.
*/
void
ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
RelFileNode rnode, bool transactional, bool created,
ReorderBufferTupleBuf *tuplebuf)
{
/*
* Change needs to be handled as transactional, because the sequence was
* created in a transaction that is still running. In that case all the
* changes need to be queued in that transaction, we must not send them
* to the downstream until the transaction commits.
*
* There's a bit of a trouble with subtransactions - we can't queue it
* into the subxact, because it might be rolled back and we'd lose the
* increment. We need to queue it into the same (sub)xact that created
* the sequence, which is why we track the XID in the hash table.
*/
if (transactional)
{
MemoryContext oldcontext;
ReorderBufferChange *change;
/* lookup sequence by relfilenode */
ReorderBufferSequenceEnt *ent;
bool found;
/* transactional changes require a transaction */
Assert(xid != InvalidTransactionId);
/* search the lookup table (we ignore the return value, found is enough) */
ent = hash_search(rb->sequences,
(void *) &rnode,
created ? HASH_ENTER : HASH_FIND,
&found);
/*
* If this is the "create" increment, we must not have found any
* pre-existing entry in the hash table (i.e. there must not be
* any conflicting sequence).
*/
Assert(!(created && found));
/* But we must have either created or found an existing entry. */
Assert(created || found);
/*
* When creating the sequence, remember the XID of the transaction
* that created id.
*/
if (created)
ent->xid = xid;
/* XXX Maybe check that we're still in the same top-level xact? */
/* OK, allocate and queue the change */
oldcontext = MemoryContextSwitchTo(rb->context);
change = ReorderBufferGetChange(rb);
change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
change->origin_id = origin_id;
memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
change->data.sequence.tuple = tuplebuf;
/* add it to the same subxact that created the sequence */
ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
MemoryContextSwitchTo(oldcontext);
}
else
{
/*
* This increment is for a sequence that was not created in any
* running transaction, so we treat it as non-transactional and
* just send it to the output plugin directly.
*/
ReorderBufferTXN *txn = NULL;
volatile Snapshot snapshot_now = snapshot;
bool using_subtxn;
#ifdef USE_ASSERT_CHECKING
/* All "creates" have to be handled as transactional. */
Assert(!created);
/* Make sure the sequence is not in the hash table. */
{
bool found;
hash_search(rb->sequences,
(void *) &rnode,
HASH_FIND, &found);
Assert(!found);
}
#endif
if (xid != InvalidTransactionId)
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
/* setup snapshot to allow catalog access */
SetupHistoricSnapshot(snapshot_now, NULL);
/*
* Decoding needs access to syscaches et al., which in turn use
* heavyweight locks and such. Thus we need to have enough state around to
* keep track of those. The easiest way is to simply use a transaction
* internally. That also allows us to easily enforce that nothing writes
* to the database by checking for xid assignments.
*
* When we're called via the SQL SRF there's already a transaction
* started, so start an explicit subtransaction there.
*/
using_subtxn = IsTransactionOrTransactionBlock();
PG_TRY();
{
Relation relation;
HeapTuple tuple;
Form_pg_sequence_data seq;
Oid reloid;
if (using_subtxn)
BeginInternalSubTransaction("sequence");
else
StartTransactionCommand();
reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
if (reloid == InvalidOid)
elog(ERROR, "could not map filenode \"%s\" to relation OID",
relpathperm(rnode,
MAIN_FORKNUM));
relation = RelationIdGetRelation(reloid);
tuple = &tuplebuf->tuple;
seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
rb->sequence(rb, txn, lsn, relation, transactional,
seq->last_value, seq->log_cnt, seq->is_called);
RelationClose(relation);
TeardownHistoricSnapshot(false);
AbortCurrentTransaction();
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
}
PG_CATCH();
{
TeardownHistoricSnapshot(true);
AbortCurrentTransaction();
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
PG_RE_THROW();
}
PG_END_TRY();
}
}
/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
@@ -1823,9 +1542,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
&found);
Assert(found);
/* Remove sequences created in this transaction (if any). */
ReorderBufferSequenceCleanup(rb, txn->xid);
/* remove entries spilled to disk */
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
@@ -2241,29 +1957,6 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
change->data.msg.message);
}
/*
* Helper function for ReorderBufferProcessTXN for applying sequences.
*/
static inline void
ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change,
bool streaming)
{
HeapTuple tuple;
Form_pg_sequence_data seq;
tuple = &change->data.sequence.tuple->tuple;
seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
/* Only ever called from ReorderBufferApplySequence, so transational. */
if (streaming)
rb->stream_sequence(rb, txn, change->lsn, relation, true,
seq->last_value, seq->log_cnt, seq->is_called);
else
rb->sequence(rb, txn, change->lsn, relation, true,
seq->last_value, seq->log_cnt, seq->is_called);
}
/*
* Function to store the command id and snapshot at the end of the current
* stream so that we can reuse the same while sending the next stream.
@@ -2706,31 +2399,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
elog(ERROR, "tuplecid value in changequeue");
break;
case REORDER_BUFFER_CHANGE_SEQUENCE:
Assert(snapshot_now);
reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
change->data.sequence.relnode.relNode);
if (reloid == InvalidOid)
elog(ERROR, "could not map filenode \"%s\" to relation OID",
relpathperm(change->data.sequence.relnode,
MAIN_FORKNUM));
relation = RelationIdGetRelation(reloid);
if (!RelationIsValid(relation))
elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
reloid,
relpathperm(change->data.sequence.relnode,
MAIN_FORKNUM));
if (RelationIsLogicallyLogged(relation))
ReorderBufferApplySequence(rb, txn, relation, change, streaming);
RelationClose(relation);
break;
}
}
@@ -4115,39 +3783,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
memcpy(data, change->data.truncate.relids, size);
data += size;
break;
}
case REORDER_BUFFER_CHANGE_SEQUENCE:
{
char *data;
ReorderBufferTupleBuf *tup;
Size len = 0;
tup = change->data.sequence.tuple;
if (tup)
{
sz += sizeof(HeapTupleData);
len = tup->tuple.t_len;
sz += len;
}
/* make sure we have enough space */
ReorderBufferSerializeReserve(rb, sz);
data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
/* might have been reallocated above */
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
if (len)
{
memcpy(data, &tup->tuple, sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
memcpy(data, tup->tuple.t_data, len);
data += len;
}
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4412,22 +4047,6 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
{
sz += sizeof(Oid) * change->data.truncate.nrelids;
break;
}
case REORDER_BUFFER_CHANGE_SEQUENCE:
{
ReorderBufferTupleBuf *tup;
Size len = 0;
tup = change->data.sequence.tuple;
if (tup)
{
sz += sizeof(HeapTupleData);
len = tup->tuple.t_len;
sz += len;
}
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -4729,30 +4348,6 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
case REORDER_BUFFER_CHANGE_SEQUENCE:
if (change->data.sequence.tuple)
{
uint32 tuplelen = ((HeapTuple) data)->t_len;
change->data.sequence.tuple =
ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
/* restore ->tuple */
memcpy(&change->data.sequence.tuple->tuple, data,
sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
/* reset t_data pointer into the new tuplebuf */
change->data.sequence.tuple->tuple.t_data =
ReorderBufferTupleBufData(change->data.sequence.tuple);
/* restore tuple data itself */
memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
data += tuplelen;
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:

View File

@@ -100,7 +100,6 @@
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "commands/sequence.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
@@ -1137,95 +1136,6 @@ copy_table(Relation rel)
logicalrep_rel_close(relmapentry, NoLock);
}
/*
* Fetch sequence data (current state) from the remote node.
*/
static void
fetch_sequence_data(char *nspname, char *relname,
int64 *last_value, int64 *log_cnt, bool *is_called)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID};
initStringInfo(&cmd);
appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
" FROM %s", quote_qualified_identifier(nspname, relname));
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
pfree(cmd.data);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
(errmsg("could not receive list of replicated tables from the publisher: %s",
res->err)));
/* Process the sequence. */
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
bool isnull;
*last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
*log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
Assert(!isnull);
*is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
Assert(!isnull);
ExecClearTuple(slot);
}
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
}
/*
* Copy existing data of a sequence from publisher.
*
* Caller is responsible for locking the local relation.
*/
static void
copy_sequence(Relation rel)
{
LogicalRepRelMapEntry *relmapentry;
LogicalRepRelation lrel;
List *qual = NIL;
StringInfoData cmd;
int64 last_value = 0,
log_cnt = 0;
bool is_called = 0;
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
RelationGetRelationName(rel), &lrel, &qual);
/* sequences don't have row filters */
Assert(!qual);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
/* Map the publisher relation to local one. */
relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
Assert(rel == relmapentry->localrel);
/* Start copy on the publisher. */
initStringInfo(&cmd);
Assert(lrel.relkind == RELKIND_SEQUENCE);
fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
/* tablesync sets the sequences in non-transactional way */
SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called);
logicalrep_rel_close(relmapentry, NoLock);
}
/*
* Determine the tablesync slot name.
*
@@ -1487,21 +1397,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
originname)));
}
/* Do the right action depending on the relation kind. */
if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
{
/* Now do the initial sequence copy */
PushActiveSnapshot(GetTransactionSnapshot());
copy_sequence(rel);
PopActiveSnapshot();
}
else
{
/* Now do the initial data copy */
PushActiveSnapshot(GetTransactionSnapshot());
copy_table(rel);
PopActiveSnapshot();
}
/* Now do the initial data copy */
PushActiveSnapshot(GetTransactionSnapshot());
copy_table(rel);
PopActiveSnapshot();
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)

View File

@@ -143,7 +143,6 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
@@ -1144,57 +1143,6 @@ apply_handle_origin(StringInfo s)
errmsg_internal("ORIGIN message sent out of order")));
}
/*
* Handle SEQUENCE message.
*/
static void
apply_handle_sequence(StringInfo s)
{
LogicalRepSequence seq;
Oid relid;
if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
return;
logicalrep_read_sequence(s, &seq);
/*
* Non-transactional sequence updates should not be part of a remote
* transaction. There should not be any running transaction.
*/
Assert((!seq.transactional) || in_remote_transaction);
Assert(!(!seq.transactional && in_remote_transaction));
Assert(!(!seq.transactional && IsTransactionState()));
/*
* Make sure we're in a transaction (needed by SetSequence). For
* non-transactional updates we're guaranteed to start a new one,
* and we'll commit it at the end.
*/
if (!IsTransactionState())
{
StartTransactionCommand();
maybe_reread_subscription();
}
relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
seq.seqname, -1),
RowExclusiveLock, false);
/* lock the sequence in AccessExclusiveLock, as expected by SetSequence */
LockRelationOid(relid, AccessExclusiveLock);
/* apply the sequence change */
SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
/*
* Commit the per-stream transaction (we only do this when not in
* remote transaction, i.e. for non-transactional sequence updates.
*/
if (!in_remote_transaction)
CommitTransactionCommand();
}
/*
* Handle STREAM START message.
*/
@@ -2563,10 +2511,6 @@ apply_dispatch(StringInfo s)
*/
break;
case LOGICAL_REP_MSG_SEQUENCE:
apply_handle_sequence(s);
return;
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
break;

View File

@@ -15,7 +15,6 @@
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "commands/defrem.h"
#include "executor/executor.h"
@@ -55,10 +54,6 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
static void pgoutput_sequence(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
Relation relation, bool transactional,
int64 last_value, int64 log_cnt, bool is_called);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -260,7 +255,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
cb->sequence_cb = pgoutput_sequence;
cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -277,7 +271,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
cb->stream_sequence_cb = pgoutput_sequence;
cb->stream_truncate_cb = pgoutput_truncate;
/* transaction streaming - two-phase commit */
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
@@ -291,7 +284,6 @@ parse_output_parameters(List *options, PGOutputData *data)
bool publication_names_given = false;
bool binary_option_given = false;
bool messages_option_given = false;
bool sequences_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
@@ -299,7 +291,6 @@ parse_output_parameters(List *options, PGOutputData *data)
data->streaming = false;
data->messages = false;
data->two_phase = false;
data->sequences = true;
foreach(lc, options)
{
@@ -368,16 +359,6 @@ parse_output_parameters(List *options, PGOutputData *data)
data->messages = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "sequences") == 0)
{
if (sequences_option_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
sequences_option_given = true;
data->sequences = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -1709,64 +1690,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
static void
pgoutput_sequence(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
Relation relation, bool transactional,
int64 last_value, int64 log_cnt, bool is_called)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
RelationSyncEntry *relentry;
if (!data->sequences)
return;
if (!is_publishable_relation(relation))
return;
/*
* Remember the xid for the message in streaming mode. See
* pgoutput_change.
*/
if (in_streaming)
xid = txn->xid;
relentry = get_rel_sync_entry(data, relation);
/*
* First check the sequence filter.
*
* We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
*/
if (!relentry->pubactions.pubsequence)
return;
/*
* Output BEGIN if we haven't yet. Avoid for non-transactional
* sequence changes.
*/
if (transactional)
{
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
/* Send BEGIN if we haven't yet */
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_sequence(ctx->out,
relation,
xid,
sequence_lsn,
transactional,
last_value,
log_cnt,
is_called);
OutputPluginWrite(ctx, true);
}
/*
* Currently we always forward.
*/
@@ -2052,8 +1975,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->schema_sent = false;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
entry->pubactions.pubsequence = false;
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
entry->new_slot = NULL;
entry->old_slot = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
@@ -2068,18 +1990,18 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
{
Oid schemaId = get_rel_namespace(relid);
List *pubids = GetRelationPublications(relid);
char relkind = get_rel_relkind(relid);
char objectType = pub_get_object_type_for_relkind(relkind);
/*
* We don't acquire a lock on the namespace system table as we build
* the cache entry using a historic snapshot and all the later changes
* are absorbed while decoding WAL.
*/
List *schemaPubids = GetSchemaPublications(schemaId, objectType);
List *schemaPubids = GetSchemaPublications(schemaId);
ListCell *lc;
Oid publish_as_relid = relid;
int publish_ancestor_level = 0;
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
List *rel_publications = NIL;
/* Reload publications if needed before use. */
@@ -2111,7 +2033,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
entry->pubactions.pubsequence = false;
/*
* Tuple slots cleanups. (Will be rebuilt later if needed).
@@ -2159,11 +2080,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/*
* If this is a FOR ALL TABLES publication, pick the partition root
* and set the ancestor level accordingly. If this is a FOR ALL
* SEQUENCES publication, we publish it too but we don't need to
* pick the partition root etc.
* and set the ancestor level accordingly.
*/
if (pub->alltables || pub->allsequences)
if (pub->alltables)
{
publish = true;
if (pub->pubviaroot && am_partition)
@@ -2227,7 +2146,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
entry->pubactions.pubsequence |= pub->pubactions.pubsequence;
/*
* We want to publish the changes as the top-most ancestor