diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index a7fa6d89a70..89130fa21df 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -461,14 +461,24 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, proc->pgprocno = gxact->pgprocno; SHMQueueElemInit(&(proc->links)); proc->waitStatus = STATUS_OK; - /* We set up the gxact's VXID as InvalidBackendId/XID */ - proc->lxid = (LocalTransactionId) xid; + if (LocalTransactionIdIsValid(MyProc->lxid)) + { + /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */ + proc->lxid = MyProc->lxid; + proc->backendId = MyBackendId; + } + else + { + Assert(AmStartupProcess() || !IsPostmasterEnvironment); + /* GetLockConflicts() uses this to specify a wait on the XID */ + proc->lxid = xid; + proc->backendId = InvalidBackendId; + } pgxact->xid = xid; pgxact->xmin = InvalidTransactionId; pgxact->delayChkpt = false; pgxact->vacuumFlags = 0; proc->pid = 0; - proc->backendId = InvalidBackendId; proc->databaseId = databaseid; proc->roleId = owner; proc->tempNamespaceId = InvalidOid; @@ -843,6 +853,53 @@ TwoPhaseGetGXact(TransactionId xid) return result; } +/* + * TwoPhaseGetXidByVirtualXID + * Lookup VXID among xacts prepared since last startup. + * + * (This won't find recovered xacts.) If more than one matches, return any + * and set "have_more" to true. To witness multiple matches, a single + * BackendId must consume 2^32 LXIDs, with no intervening database restart. + */ +TransactionId +TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, + bool *have_more) +{ + int i; + TransactionId result = InvalidTransactionId; + + Assert(VirtualTransactionIdIsValid(vxid)); + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGPROC *proc; + VirtualTransactionId proc_vxid; + + if (!gxact->valid) + continue; + proc = &ProcGlobal->allProcs[gxact->pgprocno]; + GET_VXID_FROM_PGPROC(proc_vxid, *proc); + if (VirtualTransactionIdEquals(vxid, proc_vxid)) + { + /* Startup process sets proc->backendId to InvalidBackendId. */ + Assert(!gxact->inredo); + + if (result != InvalidTransactionId) + { + *have_more = true; + break; + } + result = gxact->xid; + } + } + + LWLockRelease(TwoPhaseStateLock); + + return result; +} + /* * TwoPhaseGetDummyProc * Get the dummy backend ID for prepared transaction specified by XID diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 5e9dbdad584..53cd904b9a3 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2393,6 +2393,13 @@ PrepareTransaction(void) /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; + /* + * Transfer our locks to a dummy PGPROC. This has to be done before + * ProcArrayClearTransaction(). Otherwise, a GetLockConflicts() would + * conclude "xact already committed or aborted" for our locks. + */ + PostPrepare_Locks(xid); + /* * Let others know about no transaction in progress by me. This has to be * done *after* the prepared transaction has been marked valid, else @@ -2432,7 +2439,6 @@ PrepareTransaction(void) PostPrepare_MultiXact(xid); - PostPrepare_Locks(xid); PostPrepare_PredicateLocks(xid); ResourceOwnerRelease(TopTransactionResourceOwner, diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index a0a0a9fea90..19a9bf83896 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -822,9 +822,10 @@ XactLockTableWaitErrorCb(void *arg) * To do this, obtain the current list of lockers, and wait on their VXIDs * until they are finished. * - * Note we don't try to acquire the locks on the given locktags, only the VXIDs - * of its lock holders; if somebody grabs a conflicting lock on the objects - * after we obtained our initial list of lockers, we will not wait for them. + * Note we don't try to acquire the locks on the given locktags, only the + * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock + * on the objects after we obtained our initial list of lockers, we will not + * wait for them. */ void WaitForLockersMultiple(List *locktags, LOCKMODE lockmode) diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 55e4225fa2b..0af2cab3741 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -2784,8 +2784,12 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock) * * The result array is palloc'd and is terminated with an invalid VXID. * - * Of course, the result could be out of date by the time it's returned, - * so use of this function has to be thought about carefully. + * Of course, the result could be out of date by the time it's returned, so + * use of this function has to be thought about carefully. Similarly, a + * PGPROC with no "lxid" will be considered non-conflicting regardless of any + * lock it holds. Existing callers don't care about a locker after that + * locker's pg_xact updates complete. CommitTransaction() clears "lxid" after + * pg_xact updates and before releasing locks. * * Note we never include the current xact's vxid in the result array, * since an xact never blocks itself. @@ -4402,37 +4406,80 @@ VirtualXactLockTableCleanup(void) } } +/* + * XactLockForVirtualXact + * + * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid, + * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid). Unlike those + * functions, it assumes "xid" is never a subtransaction and that "xid" is + * prepared, committed, or aborted. + * + * If !TransactionIdIsValid(xid), this locks every prepared XID having been + * known as "vxid" before its PREPARE TRANSACTION. + */ +static bool +XactLockForVirtualXact(VirtualTransactionId vxid, + TransactionId xid, bool wait) +{ + bool more = false; + + /* There is no point to wait for 2PCs if you have no 2PCs. */ + if (max_prepared_xacts == 0) + return true; + + do + { + LockAcquireResult lar; + LOCKTAG tag; + + /* Clear state from previous iterations. */ + if (more) + { + xid = InvalidTransactionId; + more = false; + } + + /* If we have no xid, try to find one. */ + if (!TransactionIdIsValid(xid)) + xid = TwoPhaseGetXidByVirtualXID(vxid, &more); + if (!TransactionIdIsValid(xid)) + { + Assert(!more); + return true; + } + + /* Check or wait for XID completion. */ + SET_LOCKTAG_TRANSACTION(tag, xid); + lar = LockAcquire(&tag, ShareLock, false, !wait); + if (lar == LOCKACQUIRE_NOT_AVAIL) + return false; + LockRelease(&tag, ShareLock, false); + } while (more); + + return true; +} + /* * VirtualXactLock * - * If wait = true, wait until the given VXID has been released, and then - * return true. + * If wait = true, wait as long as the given VXID or any XID acquired by the + * same transaction is still running. Then, return true. * - * If wait = false, just check whether the VXID is still running, and return - * true or false. + * If wait = false, just check whether that VXID or one of those XIDs is still + * running, and return true or false. */ bool VirtualXactLock(VirtualTransactionId vxid, bool wait) { LOCKTAG tag; PGPROC *proc; + TransactionId xid = InvalidTransactionId; Assert(VirtualTransactionIdIsValid(vxid)); - if (VirtualTransactionIdIsPreparedXact(vxid)) - { - LockAcquireResult lar; - - /* - * Prepared transactions don't hold vxid locks. The - * LocalTransactionId is always a normal, locked XID. - */ - SET_LOCKTAG_TRANSACTION(tag, vxid.localTransactionId); - lar = LockAcquire(&tag, ShareLock, false, !wait); - if (lar != LOCKACQUIRE_NOT_AVAIL) - LockRelease(&tag, ShareLock, false); - return lar != LOCKACQUIRE_NOT_AVAIL; - } + if (VirtualTransactionIdIsRecoveredPreparedXact(vxid)) + /* no vxid lock; localTransactionId is a normal, locked XID */ + return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait); SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid); @@ -4446,7 +4493,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) */ proc = BackendIdGetProc(vxid.backendId); if (proc == NULL) - return true; + return XactLockForVirtualXact(vxid, InvalidTransactionId, wait); /* * We must acquire this lock before checking the backendId and lxid @@ -4455,12 +4502,12 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) */ LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE); - /* If the transaction has ended, our work here is done. */ if (proc->backendId != vxid.backendId || proc->fpLocalTransactionId != vxid.localTransactionId) { + /* VXID ended */ LWLockRelease(&proc->backendLock); - return true; + return XactLockForVirtualXact(vxid, InvalidTransactionId, wait); } /* @@ -4507,6 +4554,16 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) proc->fpVXIDLock = false; } + /* + * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID() + * search. The proc might have assigned this XID but not yet locked it, + * in which case the proc will lock this XID before releasing the VXID. + * The backendLock critical section excludes VirtualXactLockTableCleanup(), + * so we won't save an XID of a different VXID. It doesn't matter whether + * we save this before or after setting up the primary lock table entry. + */ + xid = ProcGlobal->allPgXact[proc->pgprocno].xid; + /* Done with proc->fpLockBits */ LWLockRelease(&proc->backendLock); @@ -4514,7 +4571,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) (void) LockAcquire(&tag, ShareLock, false, false); LockRelease(&tag, ShareLock, false); - return true; + return XactLockForVirtualXact(vxid, xid, wait); } /* diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 1fdc904087c..13410f0290c 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -65,6 +65,20 @@ * (XXX is it worth testing likewise for duplicate catcache flush entries? * Probably not.) * + * Many subsystems own higher-level caches that depend on relcache and/or + * catcache, and they register callbacks here to invalidate their caches. + * While building a higher-level cache entry, a backend may receive a + * callback for the being-built entry or one of its dependencies. This + * implies the new higher-level entry would be born stale, and it might + * remain stale for the life of the backend. Many caches do not prevent + * that. They rely on DDL for can't-miss catalog changes taking + * AccessExclusiveLock on suitable objects. (For a change made with less + * locking, backends might never read the change.) The relation cache, + * however, needs to reflect changes from CREATE INDEX CONCURRENTLY no later + * than the beginning of the next transaction. Hence, when a relevant + * invalidation callback arrives during a build, relcache.c reattempts that + * build. Caches with similar needs could do likewise. + * * If a relcache flush is issued for a system relation that we preload * from the relcache init file, we must also delete the init file so that * it will be rebuilt during the next backend restart. The actual work of diff --git a/src/bin/pgbench/t/023_cic_2pc.pl b/src/bin/pgbench/t/023_cic_2pc.pl new file mode 100644 index 00000000000..e29c2564b77 --- /dev/null +++ b/src/bin/pgbench/t/023_cic_2pc.pl @@ -0,0 +1,205 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test CREATE INDEX CONCURRENTLY with concurrent prepared-xact modifications +use strict; +use warnings; + +use Config; +use PostgresNode; +use TestLib; + +use Test::More tests => 6; + +my ($node, $result); + +# +# Test set-up +# +$node = get_new_node('CIC_2PC_test'); +$node->init; +$node->append_conf('postgresql.conf', 'max_prepared_transactions = 10'); +$node->append_conf('postgresql.conf', 'lock_timeout = 180000'); +$node->start; +$node->safe_psql('postgres', q(CREATE TABLE tbl(i int))); +$node->safe_psql( + 'postgres', q( + CREATE FUNCTION heapallindexed() RETURNS void AS $$ + DECLARE + count_seqscan int; + count_idxscan int; + BEGIN + count_seqscan := (SELECT count(*) FROM tbl); + SET enable_seqscan = off; + count_idxscan := (SELECT count(*) FROM tbl); + RESET enable_seqscan; + IF count_seqscan <> count_idxscan THEN + RAISE 'seqscan found % rows, but idxscan found % rows', + count_seqscan, count_idxscan; + END IF; + END + $$ LANGUAGE plpgsql; +)); + + +# +# Run 3 overlapping 2PC transactions with CIC +# +# We have two concurrent background psql processes: $main_h for INSERTs and +# $cic_h for CIC. Also, we use non-background psql for some COMMIT PREPARED +# statements. +# + +my $main_in = ''; +my $main_out = ''; +my $main_timer = IPC::Run::timeout(180); + +my $main_h = + $node->background_psql('postgres', \$main_in, \$main_out, + $main_timer, on_error_stop => 1); +$main_in .= q( +BEGIN; +INSERT INTO tbl VALUES(0); +\echo syncpoint1 +); +pump $main_h until $main_out =~ /syncpoint1/ || $main_timer->is_expired; + +my $cic_in = ''; +my $cic_out = ''; +my $cic_timer = IPC::Run::timeout(180); +my $cic_h = + $node->background_psql('postgres', \$cic_in, \$cic_out, + $cic_timer, on_error_stop => 1); +$cic_in .= q( +\echo start +CREATE INDEX CONCURRENTLY idx ON tbl(i); +); +pump $cic_h until $cic_out =~ /start/ || $cic_timer->is_expired; + +$main_in .= q( +PREPARE TRANSACTION 'a'; +); + +$main_in .= q( +BEGIN; +INSERT INTO tbl VALUES(0); +\echo syncpoint2 +); +pump $main_h until $main_out =~ /syncpoint2/ || $main_timer->is_expired; + +$node->safe_psql('postgres', q(COMMIT PREPARED 'a';)); + +$main_in .= q( +PREPARE TRANSACTION 'b'; +BEGIN; +INSERT INTO tbl VALUES(0); +\echo syncpoint3 +); +pump $main_h until $main_out =~ /syncpoint3/ || $main_timer->is_expired; + +$node->safe_psql('postgres', q(COMMIT PREPARED 'b';)); + +$main_in .= q( +PREPARE TRANSACTION 'c'; +COMMIT PREPARED 'c'; +); +$main_h->pump_nb; + +$main_h->finish; +$cic_h->finish; + +$result = $node->psql('postgres', + q(BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT heapallindexed())); +is($result, '0', 'all indexed after overlapping 2PC'); + + +# +# Server restart shall not change whether prepared xact blocks CIC +# + +$node->safe_psql( + 'postgres', q( +BEGIN; +INSERT INTO tbl VALUES(0); +PREPARE TRANSACTION 'spans_restart'; +BEGIN; +CREATE TABLE unused (); +PREPARE TRANSACTION 'persists_forever'; +)); +$node->restart; + +my $reindex_in = ''; +my $reindex_out = ''; +my $reindex_timer = IPC::Run::timeout(180); +my $reindex_h = + $node->background_psql('postgres', \$reindex_in, \$reindex_out, + $reindex_timer, on_error_stop => 1); +$reindex_in .= q( +\echo start +DROP INDEX CONCURRENTLY idx; +CREATE INDEX CONCURRENTLY idx ON tbl(i); +); +pump $reindex_h until $reindex_out =~ /start/ || $reindex_timer->is_expired; + +$node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'"); +$reindex_h->finish; +$result = $node->psql('postgres', + q(BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT heapallindexed())); +is($result, '0', 'all indexed after 2PC and restart'); + + +# +# Stress CIC+2PC with pgbench +# + +# Fix broken index first +$node->safe_psql('postgres', q(REINDEX TABLE tbl;)); + +# Run background pgbench with CIC. We cannot mix-in this script into single +# pgbench: CIC will deadlock with itself occasionally. +my $pgbench_out = ''; +my $pgbench_timer = IPC::Run::timeout(180); +my $pgbench_h = $node->background_pgbench( + '--no-vacuum --client=1 --transactions=100', + { + '002_pgbench_concurrent_cic' => q( + DROP INDEX CONCURRENTLY idx; + CREATE INDEX CONCURRENTLY idx ON tbl(i); + BEGIN ISOLATION LEVEL REPEATABLE READ; + SELECT heapallindexed(); + ROLLBACK; + ) + }, + \$pgbench_out, + $pgbench_timer); + +# Run pgbench. +$node->pgbench( + '--no-vacuum --client=5 --transactions=100', + 0, + [qr{actually processed}], + [qr{^$}], + 'concurrent INSERTs w/ 2PC', + { + '002_pgbench_concurrent_2pc' => q( + BEGIN; + INSERT INTO tbl VALUES(0); + PREPARE TRANSACTION 'c:client_id'; + COMMIT PREPARED 'c:client_id'; + ), + '002_pgbench_concurrent_2pc_savepoint' => q( + BEGIN; + SAVEPOINT s1; + INSERT INTO tbl VALUES(0); + PREPARE TRANSACTION 'c:client_id'; + COMMIT PREPARED 'c:client_id'; + ) + }); + +$pgbench_h->pump_nb; +$pgbench_h->finish(); +unlike($pgbench_out, qr/aborted in command/, "pgbench with CIC works"); + +# done +$node->stop; +done_testing(); diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 0e932daa484..d23e9b23511 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -34,6 +34,8 @@ extern void TwoPhaseShmemInit(void); extern void AtAbort_Twophase(void); extern void PostPrepare_Twophase(void); +extern TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, + bool *have_more); extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid); extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid); diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 57b3fb8b51a..23042fafc3a 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -47,10 +47,11 @@ extern bool Debug_deadlocks; /* * Top-level transactions are identified by VirtualTransactionIDs comprising - * PGPROC fields backendId and lxid. For prepared transactions, the - * LocalTransactionId is an ordinary XID. These are guaranteed unique over - * the short term, but will be reused after a database restart or XID - * wraparound; hence they should never be stored on disk. + * PGPROC fields backendId and lxid. For recovered prepared transactions, the + * LocalTransactionId is an ordinary XID; LOCKTAG_VIRTUALTRANSACTION never + * refers to that kind. These are guaranteed unique over the short term, but + * will be reused after a database restart or XID wraparound; hence they + * should never be stored on disk. * * Note that struct VirtualTransactionId can not be assumed to be atomically * assignable as a whole. However, type LocalTransactionId is assumed to @@ -70,7 +71,7 @@ typedef struct #define LocalTransactionIdIsValid(lxid) ((lxid) != InvalidLocalTransactionId) #define VirtualTransactionIdIsValid(vxid) \ (LocalTransactionIdIsValid((vxid).localTransactionId)) -#define VirtualTransactionIdIsPreparedXact(vxid) \ +#define VirtualTransactionIdIsRecoveredPreparedXact(vxid) \ ((vxid).backendId == InvalidBackendId) #define VirtualTransactionIdEquals(vxid1, vxid2) \ ((vxid1).backendId == (vxid2).backendId && \ diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index e0fa7917603..76ef3826d15 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -1481,6 +1481,96 @@ sub psql } } +=pod + +=item $node->background_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness + +Invoke B on B<$dbname> and return an IPC::Run harness object, which the +caller may use to send input to B. The process's stdin is sourced from +the $stdin scalar reference, and its stdout and stderr go to the $stdout +scalar reference. This allows the caller to act on other parts of the system +while idling this backend. + +The specified timer object is attached to the harness, as well. It's caller's +responsibility to select the timeout length, and to restart the timer after +each command if the timeout is per-command. + +psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc> +disabled. That may be overridden by passing extra psql parameters. + +Dies on failure to invoke psql, or if psql fails to connect. Errors occurring +later are the caller's problem. psql runs with on_error_stop by default so +that it will stop running sql and return 3 if passed SQL results in an error. + +Be sure to "finish" the harness when done with it. + +=over + +=item on_error_stop => 1 + +By default, the B method invokes the B program with ON_ERROR_STOP=1 +set, so SQL execution is stopped at the first error and exit code 3 is +returned. Set B to 0 to ignore errors instead. + +=item replication => B + +If set, add B to the conninfo string. +Passing the literal value C results in a logical replication +connection. + +=item extra_params => ['--single-transaction'] + +If given, it must be an array reference containing additional parameters to B. + +=back + +=cut + +sub background_psql +{ + my ($self, $dbname, $stdin, $stdout, $timer, %params) = @_; + + local $ENV{PGHOST} = $self->host; + local $ENV{PGPORT} = $self->port; + + my $replication = $params{replication}; + + my @psql_params = ( + 'psql', + '-XAtq', + '-d', + $self->connstr($dbname) + . (defined $replication ? " replication=$replication" : ""), + '-f', + '-'); + + $params{on_error_stop} = 1 unless defined $params{on_error_stop}; + + push @psql_params, '-v', 'ON_ERROR_STOP=1' if $params{on_error_stop}; + push @psql_params, @{ $params{extra_params} } + if defined $params{extra_params}; + + # Ensure there is no data waiting to be sent: + $$stdin = "" if ref($stdin); + # IPC::Run would otherwise append to existing contents: + $$stdout = "" if ref($stdout); + + my $harness = IPC::Run::start \@psql_params, + '<', $stdin, '>', $stdout, $timer; + + # Request some output, and pump until we see it. This means that psql + # connection failures are caught here, relieving callers of the need to + # handle those. (Right now, we have no particularly good handling for + # errors anyway, but that might be added later.) + my $banner = "background_psql: ready"; + $$stdin = "\\echo $banner\n"; + pump $harness until $$stdout =~ /$banner/ || $timer->is_expired; + + die "psql startup timed out" if $timer->is_expired; + + return $harness; +} + # Common sub of pgbench-invoking interfaces. Makes any requested script files # and returns pgbench command-line options causing use of those files. sub _pgbench_make_files