mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
Logical decoding of sequences
This extends the logical decoding to also decode sequence increments. We differentiate between sequences created in the current (in-progress) transaction, and sequences created earlier. This mixed behavior is necessary because while sequences are not transactional (increments are not subject to ROLLBACK), relfilenode changes are. So we do this: * Changes for sequences created in the same top-level transaction are treated as transactional, i.e. just like any other change from that transaction, and discarded in case of a rollback. * Changes for sequences created earlier are applied immediately, as if performed outside any transaction. This applies also after ALTER SEQUENCE, which may create a new relfilenode. Moreover, if we ever get support for DDL replication, the sequence won't exist until the transaction gets applied. Sequences created in the current transaction are tracked in a simple hash table, identified by a relfilenode. That means a sequence may already exist, but if a transaction does ALTER SEQUENCE then the increments for the new relfilenode will be treated as transactional. For each relfilenode we track the XID of (sub)transaction that created it, which is needed for cleanup at transaction end. We don't need to check the XID to decide if an increment is transactional - if we find a match in the hash table, it has to be the same transaction. This requires two minor changes to WAL-logging. Firstly, we need to ensure the sequence record has a valid XID - until now the the increment might have XID 0 if it was the first change in a subxact. But the sequence might have been created in the same top-level transaction. So we ensure the XID is assigned when WAL-logging increments. The other change is addition of "created" flag, marking increments for newly created relfilenodes. This makes it easier to maintain the hash table of sequences that need transactional handling. Note: This is needed because of subxacts. A XID 0 might still have the sequence created in a different subxact of the same top-level xact. This does not include any changes to test_decoding and/or the built-in replication - those will be committed in separate patches. A patch adding decoding of sequences was originally submitted by Cary Huang. This commit reworks various important aspects (e.g. the WAL logging and transactional/non-transactional handling). However, the original patch and reviews were very useful. Author: Tomas Vondra, Cary Huang Reviewed-by: Peter Eisentraut, Hannu Krosing, Andres Freund Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca
This commit is contained in:
@ -77,6 +77,40 @@
|
||||
* a bit more memory to the oldest subtransactions, because it's likely
|
||||
* they are the source for the next sequence of changes.
|
||||
*
|
||||
* When decoding sequences, we differentiate between a sequences created
|
||||
* in a (running) transaction, and sequences created in other (already
|
||||
* committed) transactions. Changes for sequences created in the same
|
||||
* top-level transaction are treated as "transactional" i.e. just like
|
||||
* any other change from that transaction (and discarded in case of a
|
||||
* rollback). Changes for sequences created earlier are treated as not
|
||||
* transactional - are processed immediately, as if performed outside
|
||||
* any transaction (and thus not rolled back).
|
||||
*
|
||||
* This mixed behavior is necessary - sequences are non-transactional
|
||||
* (e.g. ROLLBACK does not undo the sequence increments). But for new
|
||||
* sequences, we need to handle them in a transactional way, because if
|
||||
* we ever get some DDL support, the sequence won't exist until the
|
||||
* transaction gets applied. So we need to ensure the increments don't
|
||||
* happen until the sequence gets created.
|
||||
*
|
||||
* To differentiate which sequences are "old" and which were created
|
||||
* in a still-running transaction, we track sequences created in running
|
||||
* transactions in a hash table. Sequences are identified by relfilenode,
|
||||
* and we track XID of the (sub)transaction that created it. This means
|
||||
* that if a transaction does something that changes the relfilenode
|
||||
* (like an alter / reset of a sequence), the new relfilenode will be
|
||||
* treated as if created in the transaction. The list of sequences gets
|
||||
* discarded when the transaction completes (commit/rollback).
|
||||
*
|
||||
* We don't use the XID to check if it's the same top-level transaction.
|
||||
* It's enough to know it was created in an in-progress transaction,
|
||||
* and we know it must be the current one because otherwise it wouldn't
|
||||
* see the sequence object.
|
||||
*
|
||||
* The XID may be valid even for non-transactional sequences - we simply
|
||||
* keep the XID logged to WAL, it's up to the reorderbuffer to decide if
|
||||
* the increment is transactional.
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
@ -91,6 +125,7 @@
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "catalog/catalog.h"
|
||||
#include "commands/sequence.h"
|
||||
#include "lib/binaryheap.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
@ -116,6 +151,13 @@ typedef struct ReorderBufferTXNByIdEnt
|
||||
ReorderBufferTXN *txn;
|
||||
} ReorderBufferTXNByIdEnt;
|
||||
|
||||
/* entry for hash table we use to track sequences created in running xacts */
|
||||
typedef struct ReorderBufferSequenceEnt
|
||||
{
|
||||
RelFileNode rnode;
|
||||
TransactionId xid;
|
||||
} ReorderBufferSequenceEnt;
|
||||
|
||||
/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
|
||||
typedef struct ReorderBufferTupleCidKey
|
||||
{
|
||||
@ -339,6 +381,14 @@ ReorderBufferAllocate(void)
|
||||
buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
|
||||
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
|
||||
|
||||
/* hash table of sequences, mapping relfilenode to XID of transaction */
|
||||
hash_ctl.keysize = sizeof(RelFileNode);
|
||||
hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
|
||||
hash_ctl.hcxt = buffer->context;
|
||||
|
||||
buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
|
||||
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
|
||||
|
||||
buffer->by_txn_last_xid = InvalidTransactionId;
|
||||
buffer->by_txn_last_txn = NULL;
|
||||
|
||||
@ -525,6 +575,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
|
||||
change->data.truncate.relids = NULL;
|
||||
}
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_SEQUENCE:
|
||||
if (change->data.sequence.tuple)
|
||||
{
|
||||
ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
|
||||
change->data.sequence.tuple = NULL;
|
||||
}
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
|
||||
@ -859,6 +916,230 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Treat the sequence increment as transactional?
|
||||
*
|
||||
* The hash table tracks all sequences created in in-progress transactions,
|
||||
* so we simply do a lookup (the sequence is identified by relfilende). If
|
||||
* we find a match, the increment should be handled as transactional.
|
||||
*/
|
||||
bool
|
||||
ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
|
||||
RelFileNode rnode, bool created)
|
||||
{
|
||||
bool found = false;
|
||||
|
||||
if (created)
|
||||
return true;
|
||||
|
||||
hash_search(rb->sequences,
|
||||
(void *) &rnode,
|
||||
HASH_FIND,
|
||||
&found);
|
||||
|
||||
return found;
|
||||
}
|
||||
|
||||
/*
|
||||
* Cleanup sequences created in in-progress transactions.
|
||||
*
|
||||
* There's no way to search by XID, so we simply do a seqscan of all
|
||||
* the entries in the hash table. Hopefully there are only a couple
|
||||
* entries in most cases - people generally don't create many new
|
||||
* sequences over and over.
|
||||
*/
|
||||
static void
|
||||
ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
|
||||
{
|
||||
HASH_SEQ_STATUS scan_status;
|
||||
ReorderBufferSequenceEnt *ent;
|
||||
|
||||
hash_seq_init(&scan_status, rb->sequences);
|
||||
while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
|
||||
{
|
||||
/* skip sequences not from this transaction */
|
||||
if (ent->xid != xid)
|
||||
continue;
|
||||
|
||||
(void) hash_search(rb->sequences,
|
||||
(void *) &(ent->rnode),
|
||||
HASH_REMOVE, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* A transactional sequence increment is queued to be processed upon commit
|
||||
* and a non-transactional increment gets processed immediately.
|
||||
*
|
||||
* A sequence update may be both transactional and non-transactional. When
|
||||
* created in a running transaction, treat it as transactional and queue
|
||||
* the change in it. Otherwise treat it as non-transactional, so that we
|
||||
* don't forget the increment in case of a rollback.
|
||||
*/
|
||||
void
|
||||
ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
|
||||
Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
|
||||
RelFileNode rnode, bool transactional, bool created,
|
||||
ReorderBufferTupleBuf *tuplebuf)
|
||||
{
|
||||
/*
|
||||
* Change needs to be handled as transactional, because the sequence was
|
||||
* created in a transaction that is still running. In that case all the
|
||||
* changes need to be queued in that transaction, we must not send them
|
||||
* to the downstream until the transaction commits.
|
||||
*
|
||||
* There's a bit of a trouble with subtransactions - we can't queue it
|
||||
* into the subxact, because it might be rolled back and we'd lose the
|
||||
* increment. We need to queue it into the same (sub)xact that created
|
||||
* the sequence, which is why we track the XID in the hash table.
|
||||
*/
|
||||
if (transactional)
|
||||
{
|
||||
MemoryContext oldcontext;
|
||||
ReorderBufferChange *change;
|
||||
|
||||
/* lookup sequence by relfilenode */
|
||||
ReorderBufferSequenceEnt *ent;
|
||||
bool found;
|
||||
|
||||
/* transactional changes require a transaction */
|
||||
Assert(xid != InvalidTransactionId);
|
||||
|
||||
/* search the lookup table (we ignore the return value, found is enough) */
|
||||
ent = hash_search(rb->sequences,
|
||||
(void *) &rnode,
|
||||
created ? HASH_ENTER : HASH_FIND,
|
||||
&found);
|
||||
|
||||
/*
|
||||
* If this is the "create" increment, we must not have found any
|
||||
* pre-existing entry in the hash table (i.e. there must not be
|
||||
* any conflicting sequence).
|
||||
*/
|
||||
Assert(!(created && found));
|
||||
|
||||
/* But we must have either created or found an existing entry. */
|
||||
Assert(created || found);
|
||||
|
||||
/*
|
||||
* When creating the sequence, remember the XID of the transaction
|
||||
* that created id.
|
||||
*/
|
||||
if (created)
|
||||
ent->xid = xid;
|
||||
|
||||
/* XXX Maybe check that we're still in the same top-level xact? */
|
||||
|
||||
/* OK, allocate and queue the change */
|
||||
oldcontext = MemoryContextSwitchTo(rb->context);
|
||||
|
||||
change = ReorderBufferGetChange(rb);
|
||||
|
||||
change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
|
||||
change->origin_id = origin_id;
|
||||
|
||||
memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
|
||||
|
||||
change->data.sequence.tuple = tuplebuf;
|
||||
|
||||
/* add it to the same subxact that created the sequence */
|
||||
ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* This increment is for a sequence that was not created in any
|
||||
* running transaction, so we treat it as non-transactional and
|
||||
* just send it to the output plugin directly.
|
||||
*/
|
||||
ReorderBufferTXN *txn = NULL;
|
||||
volatile Snapshot snapshot_now = snapshot;
|
||||
bool using_subtxn;
|
||||
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
/* All "creates" have to be handled as transactional. */
|
||||
Assert(!created);
|
||||
|
||||
/* Make sure the sequence is not in the hash table. */
|
||||
{
|
||||
bool found;
|
||||
hash_search(rb->sequences,
|
||||
(void *) &rnode,
|
||||
HASH_FIND, &found);
|
||||
Assert(!found);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (xid != InvalidTransactionId)
|
||||
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
|
||||
|
||||
/* setup snapshot to allow catalog access */
|
||||
SetupHistoricSnapshot(snapshot_now, NULL);
|
||||
|
||||
/*
|
||||
* Decoding needs access to syscaches et al., which in turn use
|
||||
* heavyweight locks and such. Thus we need to have enough state around to
|
||||
* keep track of those. The easiest way is to simply use a transaction
|
||||
* internally. That also allows us to easily enforce that nothing writes
|
||||
* to the database by checking for xid assignments.
|
||||
*
|
||||
* When we're called via the SQL SRF there's already a transaction
|
||||
* started, so start an explicit subtransaction there.
|
||||
*/
|
||||
using_subtxn = IsTransactionOrTransactionBlock();
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
Relation relation;
|
||||
HeapTuple tuple;
|
||||
Form_pg_sequence_data seq;
|
||||
Oid reloid;
|
||||
|
||||
if (using_subtxn)
|
||||
BeginInternalSubTransaction("sequence");
|
||||
else
|
||||
StartTransactionCommand();
|
||||
|
||||
reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
|
||||
|
||||
if (reloid == InvalidOid)
|
||||
elog(ERROR, "could not map filenode \"%s\" to relation OID",
|
||||
relpathperm(rnode,
|
||||
MAIN_FORKNUM));
|
||||
|
||||
relation = RelationIdGetRelation(reloid);
|
||||
tuple = &tuplebuf->tuple;
|
||||
seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
|
||||
|
||||
rb->sequence(rb, txn, lsn, relation, transactional,
|
||||
seq->last_value, seq->log_cnt, seq->is_called);
|
||||
|
||||
RelationClose(relation);
|
||||
|
||||
TeardownHistoricSnapshot(false);
|
||||
|
||||
AbortCurrentTransaction();
|
||||
|
||||
if (using_subtxn)
|
||||
RollbackAndReleaseCurrentSubTransaction();
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
TeardownHistoricSnapshot(true);
|
||||
|
||||
AbortCurrentTransaction();
|
||||
|
||||
if (using_subtxn)
|
||||
RollbackAndReleaseCurrentSubTransaction();
|
||||
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* AssertTXNLsnOrder
|
||||
* Verify LSN ordering of transaction lists in the reorderbuffer
|
||||
@ -1535,6 +1816,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
||||
&found);
|
||||
Assert(found);
|
||||
|
||||
/* Remove sequences created in this transaction (if any). */
|
||||
ReorderBufferSequenceCleanup(rb, txn->xid);
|
||||
|
||||
/* remove entries spilled to disk */
|
||||
if (rbtxn_is_serialized(txn))
|
||||
ReorderBufferRestoreCleanup(rb, txn);
|
||||
@ -1950,6 +2234,29 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
change->data.msg.message);
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper function for ReorderBufferProcessTXN for applying sequences.
|
||||
*/
|
||||
static inline void
|
||||
ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change,
|
||||
bool streaming)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
Form_pg_sequence_data seq;
|
||||
|
||||
tuple = &change->data.sequence.tuple->tuple;
|
||||
seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
|
||||
|
||||
/* Only ever called from ReorderBufferApplySequence, so transational. */
|
||||
if (streaming)
|
||||
rb->stream_sequence(rb, txn, change->lsn, relation, true,
|
||||
seq->last_value, seq->log_cnt, seq->is_called);
|
||||
else
|
||||
rb->sequence(rb, txn, change->lsn, relation, true,
|
||||
seq->last_value, seq->log_cnt, seq->is_called);
|
||||
}
|
||||
|
||||
/*
|
||||
* Function to store the command id and snapshot at the end of the current
|
||||
* stream so that we can reuse the same while sending the next stream.
|
||||
@ -2392,6 +2699,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
|
||||
elog(ERROR, "tuplecid value in changequeue");
|
||||
break;
|
||||
|
||||
case REORDER_BUFFER_CHANGE_SEQUENCE:
|
||||
Assert(snapshot_now);
|
||||
|
||||
reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
|
||||
change->data.sequence.relnode.relNode);
|
||||
|
||||
if (reloid == InvalidOid)
|
||||
elog(ERROR, "could not map filenode \"%s\" to relation OID",
|
||||
relpathperm(change->data.sequence.relnode,
|
||||
MAIN_FORKNUM));
|
||||
|
||||
relation = RelationIdGetRelation(reloid);
|
||||
|
||||
if (!RelationIsValid(relation))
|
||||
elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
|
||||
reloid,
|
||||
relpathperm(change->data.sequence.relnode,
|
||||
MAIN_FORKNUM));
|
||||
|
||||
if (RelationIsLogicallyLogged(relation))
|
||||
ReorderBufferApplySequence(rb, txn, relation, change, streaming);
|
||||
|
||||
RelationClose(relation);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -3776,6 +4108,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
memcpy(data, change->data.truncate.relids, size);
|
||||
data += size;
|
||||
|
||||
break;
|
||||
}
|
||||
case REORDER_BUFFER_CHANGE_SEQUENCE:
|
||||
{
|
||||
char *data;
|
||||
ReorderBufferTupleBuf *tup;
|
||||
Size len = 0;
|
||||
|
||||
tup = change->data.sequence.tuple;
|
||||
|
||||
if (tup)
|
||||
{
|
||||
sz += sizeof(HeapTupleData);
|
||||
len = tup->tuple.t_len;
|
||||
sz += len;
|
||||
}
|
||||
|
||||
/* make sure we have enough space */
|
||||
ReorderBufferSerializeReserve(rb, sz);
|
||||
|
||||
data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
|
||||
/* might have been reallocated above */
|
||||
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
|
||||
|
||||
if (len)
|
||||
{
|
||||
memcpy(data, &tup->tuple, sizeof(HeapTupleData));
|
||||
data += sizeof(HeapTupleData);
|
||||
|
||||
memcpy(data, tup->tuple.t_data, len);
|
||||
data += len;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
||||
@ -4040,6 +4405,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
|
||||
{
|
||||
sz += sizeof(Oid) * change->data.truncate.nrelids;
|
||||
|
||||
break;
|
||||
}
|
||||
case REORDER_BUFFER_CHANGE_SEQUENCE:
|
||||
{
|
||||
ReorderBufferTupleBuf *tup;
|
||||
Size len = 0;
|
||||
|
||||
tup = change->data.sequence.tuple;
|
||||
|
||||
if (tup)
|
||||
{
|
||||
sz += sizeof(HeapTupleData);
|
||||
len = tup->tuple.t_len;
|
||||
sz += len;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
||||
@ -4341,6 +4722,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case REORDER_BUFFER_CHANGE_SEQUENCE:
|
||||
if (change->data.sequence.tuple)
|
||||
{
|
||||
uint32 tuplelen = ((HeapTuple) data)->t_len;
|
||||
|
||||
change->data.sequence.tuple =
|
||||
ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
|
||||
|
||||
/* restore ->tuple */
|
||||
memcpy(&change->data.sequence.tuple->tuple, data,
|
||||
sizeof(HeapTupleData));
|
||||
data += sizeof(HeapTupleData);
|
||||
|
||||
/* reset t_data pointer into the new tuplebuf */
|
||||
change->data.sequence.tuple->tuple.t_data =
|
||||
ReorderBufferTupleBufData(change->data.sequence.tuple);
|
||||
|
||||
/* restore tuple data itself */
|
||||
memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
|
||||
data += tuplelen;
|
||||
}
|
||||
break;
|
||||
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
|
||||
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
|
||||
|
Reference in New Issue
Block a user