diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c3e16fbae6d..4f022b5a355 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -37,7 +37,8 @@ submake-isolation: submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding -REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared +REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \ + decoding_into_rel binary prepared regresscheck: all | submake-regress submake-test_decoding $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/xact.out b/contrib/test_decoding/expected/xact.out new file mode 100644 index 00000000000..507b701c3ab --- /dev/null +++ b/contrib/test_decoding/expected/xact.out @@ -0,0 +1,42 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- bug #13844, xids in non-decoded records need to be inspected +CREATE TABLE xact_test(data text); +INSERT INTO xact_test VALUES ('before-test'); +BEGIN; +-- perform operation in xact that creates and logs xid, but isn't decoded +SELECT * FROM xact_test FOR UPDATE; + data +------------- + before-test +(1 row) + +SAVEPOINT foo; +-- and now actually insert in subxact, xid is expected to be known +INSERT INTO xact_test VALUES ('after-assignment'); +COMMIT; +-- and now show those changes +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------- + BEGIN + table public.xact_test: INSERT: data[text]:'before-test' + COMMIT + BEGIN + table public.xact_test: INSERT: data[text]:'after-assignment' + COMMIT +(6 rows) + +DROP TABLE xact_test; +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/xact.sql b/contrib/test_decoding/sql/xact.sql new file mode 100644 index 00000000000..9ce238f62df --- /dev/null +++ b/contrib/test_decoding/sql/xact.sql @@ -0,0 +1,22 @@ +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- bug #13844, xids in non-decoded records need to be inspected +CREATE TABLE xact_test(data text); +INSERT INTO xact_test VALUES ('before-test'); + +BEGIN; +-- perform operation in xact that creates and logs xid, but isn't decoded +SELECT * FROM xact_test FOR UPDATE; +SAVEPOINT foo; +-- and now actually insert in subxact, xid is expected to be known +INSERT INTO xact_test VALUES ('after-assignment'); +COMMIT; +-- and now show those changes +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +DROP TABLE xact_test; + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8f8732afdce..9610b6ce773 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -77,6 +77,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); * Take every XLogReadRecord()ed record and perform the actions required to * decode it using the output plugin already setup in the logical decoding * context. + * + * NB: Note that every record's xid needs to be processed by reorderbuffer + * (xids contained in the content of records are not relevant for this rule). + * That means that for records which'd otherwise not go through the + * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to + * call ReorderBufferProcessXid for each record type by default, because + * e.g. empty xacts can be handled more efficiently if there's no previous + * state for them. */ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record) @@ -132,6 +140,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record) case RM_GIST_ID: case RM_SEQ_ID: case RM_SPGIST_ID: + /* just deal with xid, and done */ + ReorderBufferProcessXid(ctx->reorder, record->xl_xid, + buf.origptr); break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid); @@ -147,6 +158,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuild *builder = ctx->snapshot_builder; uint8 info = buf->record.xl_info & ~XLR_INFO_MASK; + ReorderBufferProcessXid(ctx->reorder, buf->record.xl_xid, + buf->origptr); + switch (info) { /* this is also used in END_OF_RECOVERY checkpoints */ @@ -187,7 +201,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogRecord *r = &buf->record; uint8 info = r->xl_info & ~XLR_INFO_MASK; - /* no point in doing anything yet, data could not be decoded anyway */ + /* + * No point in doing anything yet, data could not be decoded anyway. It's + * ok not to call ReorderBufferProcessXid() in that case, except in the + * assignment case there'll not be any later records with the same xid; + * and in the assignment case we'll not decode those xacts. + */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; @@ -302,6 +321,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * transactions in the changestream allowing for a kind of * distributed 2PC. */ + ReorderBufferProcessXid(reorder, r->xl_xid, buf->origptr); break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); @@ -318,6 +338,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogRecord *r = &buf->record; uint8 info = r->xl_info & ~XLR_INFO_MASK; + ReorderBufferProcessXid(ctx->reorder, r->xl_xid, buf->origptr); + switch (info) { case XLOG_RUNNING_XACTS: @@ -355,6 +377,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = buf->record.xl_xid; SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + /* no point in doing anything yet */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; @@ -408,6 +432,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = buf->record.xl_xid; SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + /* no point in doing anything yet */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 68cacff08b0..5b434ac825f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1669,16 +1669,21 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* - * Check whether a transaction is already known in this module.xs + * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at + * least once for every xid in XLogRecord->xl_xid (other places in records + * may, but do not have to be passed through here). + * + * Reorderbuffer keeps some datastructures about transactions in LSN order, + * for efficiency. To do that it has to know about when transactions are seen + * first in the WAL. As many types of records are not actually interesting for + * logical decoding, they do not necessarily pass though here. */ -bool -ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid) +void +ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) { - ReorderBufferTXN *txn; - - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - return txn != NULL; + /* many records won't have an xid assigned, centralize check here */ + if (xid != InvalidTransactionId) + ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); } /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 4374d6dd4ad..f16273a8209 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -636,8 +636,6 @@ SnapBuildClearExportedSnapshot() bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) { - bool is_old_tx; - /* * We can't handle data in transactions if we haven't built a snapshot * yet, so don't store them. @@ -658,9 +656,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) * If the reorderbuffer doesn't yet have a snapshot, add one now, it will * be needed to decode the change we're currently processing. */ - is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid); - - if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) + if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) { /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 3ee3b5a0c46..00183fa16d6 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -352,7 +352,7 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn CommandId cmin, CommandId cmax, CommandId combocid); void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); -bool ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid); +void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);