diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index da23f89ca32..5d049cdc687 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -545,12 +545,15 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); executed within that transaction. A transaction that is prepared for a two-phase commit using PREPARE TRANSACTION will also be decoded if the output plugin callbacks needed for decoding - them are provided. It is possible that the current transaction which - is being decoded is aborted concurrently via a ROLLBACK PREPARED - command. In that case, the logical decoding of this transaction will - be aborted too. We will skip all the changes of such a transaction once - the abort is detected and abort the transaction when we read WAL for - ROLLBACK PREPARED. + them are provided. It is possible that the current prepared transaction + which is being decoded is aborted concurrently via a + ROLLBACK PREPARED command. In that case, the logical + decoding of this transaction will be aborted too. All the changes of such + a transaction are skipped once the abort is detected and the + prepare_cb callback is invoked. Thus even in case of + a concurrent abort, enough information is provided to the output plugin + for it to properly deal with ROLLBACK PREPARED once + that is decoded. diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 127f2c4b168..52d06285a21 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2664,6 +2664,14 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, txn->commit_time, txn->origin_id, txn->origin_lsn); + + /* + * We send the prepare for the concurrently aborted xacts so that later + * when rollback prepared is decoded and sent, the downstream should be + * able to rollback such a xact. See comments atop DecodePrepare. + */ + if (txn->concurrent_abort) + rb->prepare(rb, txn, txn->final_lsn); } /*