1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-07 19:06:32 +03:00

Transaction chaining

Add command variants COMMIT AND CHAIN and ROLLBACK AND CHAIN, which
start new transactions with the same transaction characteristics as the
just finished one, per SQL standard.

Support for transaction chaining in PL/pgSQL is also added.  This
functionality is especially useful when running COMMIT in a loop in
PL/pgSQL.

Reviewed-by: Fabien COELHO <coelho@cri.ensmp.fr>
Discussion: https://www.postgresql.org/message-id/flat/28536681-324b-10dc-ade8-ab46f7645a5a@2ndquadrant.com
This commit is contained in:
Peter Eisentraut
2019-03-24 10:33:14 +01:00
parent b2db277057
commit 280a408b48
26 changed files with 601 additions and 38 deletions

View File

@@ -190,6 +190,7 @@ typedef struct TransactionStateData
bool startedInRecovery; /* did we start in recovery? */
bool didLogXid; /* has xid been included in WAL record? */
int parallelModeLevel; /* Enter/ExitParallelMode counter */
bool chain; /* start a new block after this one */
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
@@ -2775,6 +2776,36 @@ StartTransactionCommand(void)
MemoryContextSwitchTo(CurTransactionContext);
}
/*
* Simple system for saving and restoring transaction characteristics
* (isolation level, read only, deferrable). We need this for transaction
* chaining, so that we can set the characteristics of the new transaction to
* be the same as the previous one. (We need something like this because the
* GUC system resets the characteristics at transaction end, so for example
* just skipping the reset in StartTransaction() won't work.)
*/
static int save_XactIsoLevel;
static bool save_XactReadOnly;
static bool save_XactDeferrable;
void
SaveTransactionCharacteristics(void)
{
save_XactIsoLevel = XactIsoLevel;
save_XactReadOnly = XactReadOnly;
save_XactDeferrable = XactDeferrable;
}
void
RestoreTransactionCharacteristics(void)
{
XactIsoLevel = save_XactIsoLevel;
XactReadOnly = save_XactReadOnly;
XactDeferrable = save_XactDeferrable;
}
/*
* CommitTransactionCommand
*/
@@ -2783,6 +2814,9 @@ CommitTransactionCommand(void)
{
TransactionState s = CurrentTransactionState;
if (s->chain)
SaveTransactionCharacteristics();
switch (s->blockState)
{
/*
@@ -2834,6 +2868,13 @@ CommitTransactionCommand(void)
case TBLOCK_END:
CommitTransaction();
s->blockState = TBLOCK_DEFAULT;
if (s->chain)
{
StartTransaction();
s->blockState = TBLOCK_INPROGRESS;
s->chain = false;
RestoreTransactionCharacteristics();
}
break;
/*
@@ -2853,6 +2894,13 @@ CommitTransactionCommand(void)
case TBLOCK_ABORT_END:
CleanupTransaction();
s->blockState = TBLOCK_DEFAULT;
if (s->chain)
{
StartTransaction();
s->blockState = TBLOCK_INPROGRESS;
s->chain = false;
RestoreTransactionCharacteristics();
}
break;
/*
@@ -2864,6 +2912,13 @@ CommitTransactionCommand(void)
AbortTransaction();
CleanupTransaction();
s->blockState = TBLOCK_DEFAULT;
if (s->chain)
{
StartTransaction();
s->blockState = TBLOCK_INPROGRESS;
s->chain = false;
RestoreTransactionCharacteristics();
}
break;
/*
@@ -3521,7 +3576,7 @@ PrepareTransactionBlock(const char *gid)
bool result;
/* Set up to commit the current transaction */
result = EndTransactionBlock();
result = EndTransactionBlock(false);
/* If successful, change outer tblock state to PREPARE */
if (result)
@@ -3567,7 +3622,7 @@ PrepareTransactionBlock(const char *gid)
* resource owner, etc while executing inside a Portal.
*/
bool
EndTransactionBlock(void)
EndTransactionBlock(bool chain)
{
TransactionState s = CurrentTransactionState;
bool result = false;
@@ -3693,6 +3748,13 @@ EndTransactionBlock(void)
break;
}
Assert(s->blockState == TBLOCK_STARTED ||
s->blockState == TBLOCK_END ||
s->blockState == TBLOCK_ABORT_END ||
s->blockState == TBLOCK_ABORT_PENDING);
s->chain = chain;
return result;
}
@@ -3703,7 +3765,7 @@ EndTransactionBlock(void)
* As above, we don't actually do anything here except change blockState.
*/
void
UserAbortTransactionBlock(void)
UserAbortTransactionBlock(bool chain)
{
TransactionState s = CurrentTransactionState;
@@ -3801,6 +3863,11 @@ UserAbortTransactionBlock(void)
BlockStateAsString(s->blockState));
break;
}
Assert(s->blockState == TBLOCK_ABORT_END ||
s->blockState == TBLOCK_ABORT_PENDING);
s->chain = chain;
}
/*

View File

@@ -443,7 +443,7 @@ T213 INSTEAD OF triggers YES
T231 Sensitive cursors YES
T241 START TRANSACTION statement YES
T251 SET TRANSACTION statement: LOCAL option NO
T261 Chained transactions NO
T261 Chained transactions YES
T271 Savepoints YES
T272 Enhanced savepoint management NO
T281 SELECT privilege with column granularity YES

View File

@@ -217,8 +217,8 @@ SPI_start_transaction(void)
MemoryContextSwitchTo(oldcontext);
}
void
SPI_commit(void)
static void
_SPI_commit(bool chain)
{
MemoryContext oldcontext = CurrentMemoryContext;
@@ -250,14 +250,36 @@ SPI_commit(void)
while (ActiveSnapshotSet())
PopActiveSnapshot();
if (chain)
SaveTransactionCharacteristics();
CommitTransactionCommand();
if (chain)
{
StartTransactionCommand();
RestoreTransactionCharacteristics();
}
MemoryContextSwitchTo(oldcontext);
_SPI_current->internal_xact = false;
}
void
SPI_rollback(void)
SPI_commit(void)
{
_SPI_commit(false);
}
void
SPI_commit_and_chain(void)
{
_SPI_commit(true);
}
static void
_SPI_rollback(bool chain)
{
MemoryContext oldcontext = CurrentMemoryContext;
@@ -274,12 +296,34 @@ SPI_rollback(void)
_SPI_current->internal_xact = true;
if (chain)
SaveTransactionCharacteristics();
AbortCurrentTransaction();
if (chain)
{
StartTransactionCommand();
RestoreTransactionCharacteristics();
}
MemoryContextSwitchTo(oldcontext);
_SPI_current->internal_xact = false;
}
void
SPI_rollback(void)
{
_SPI_rollback(false);
}
void
SPI_rollback_and_chain(void)
{
_SPI_rollback(true);
}
/*
* Clean up SPI state. Called on transaction end (of non-SPI-internal
* transactions) and when returning to the main loop on error.

View File

@@ -3666,6 +3666,7 @@ _copyTransactionStmt(const TransactionStmt *from)
COPY_NODE_FIELD(options);
COPY_STRING_FIELD(savepoint_name);
COPY_STRING_FIELD(gid);
COPY_SCALAR_FIELD(chain);
return newnode;
}

View File

@@ -1510,6 +1510,7 @@ _equalTransactionStmt(const TransactionStmt *a, const TransactionStmt *b)
COMPARE_NODE_FIELD(options);
COMPARE_STRING_FIELD(savepoint_name);
COMPARE_STRING_FIELD(gid);
COMPARE_SCALAR_FIELD(chain);
return true;
}

View File

@@ -312,6 +312,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <boolean> opt_or_replace
opt_grant_grant_option opt_grant_admin_option
opt_nowait opt_if_exists opt_with_data
opt_transaction_chain
%type <ival> opt_nowait_or_skip
%type <list> OptRoleList AlterOptRoleList
@@ -9792,11 +9793,12 @@ UnlistenStmt:
*****************************************************************************/
TransactionStmt:
ABORT_P opt_transaction
ABORT_P opt_transaction opt_transaction_chain
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_ROLLBACK;
n->options = NIL;
n->chain = $3;
$$ = (Node *)n;
}
| BEGIN_P opt_transaction transaction_mode_list_or_empty
@@ -9813,25 +9815,28 @@ TransactionStmt:
n->options = $3;
$$ = (Node *)n;
}
| COMMIT opt_transaction
| COMMIT opt_transaction opt_transaction_chain
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_COMMIT;
n->options = NIL;
n->chain = $3;
$$ = (Node *)n;
}
| END_P opt_transaction
| END_P opt_transaction opt_transaction_chain
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_COMMIT;
n->options = NIL;
n->chain = $3;
$$ = (Node *)n;
}
| ROLLBACK opt_transaction
| ROLLBACK opt_transaction opt_transaction_chain
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_ROLLBACK;
n->options = NIL;
n->chain = $3;
$$ = (Node *)n;
}
| SAVEPOINT ColId
@@ -9931,6 +9936,12 @@ transaction_mode_list_or_empty:
{ $$ = NIL; }
;
opt_transaction_chain:
AND CHAIN { $$ = true; }
| AND NO CHAIN { $$ = false; }
| /* EMPTY */ { $$ = false; }
;
/*****************************************************************************
*

View File

@@ -440,7 +440,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
case TRANS_STMT_COMMIT:
if (!EndTransactionBlock())
if (!EndTransactionBlock(stmt->chain))
{
/* report unsuccessful commit in completionTag */
if (completionTag)
@@ -471,7 +471,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
case TRANS_STMT_ROLLBACK:
UserAbortTransactionBlock();
UserAbortTransactionBlock(stmt->chain);
break;
case TRANS_STMT_SAVEPOINT: