diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel index f09a5806345..85e5840feba 100644 --- a/src/backend/access/transam/README.parallel +++ b/src/backend/access/transam/README.parallel @@ -125,6 +125,10 @@ worker. This includes: - State related to pending REINDEX operations, which prevents access to an index that is currently being rebuilt. + - Active relmapper.c mapping state. This is needed to allow consistent + answers when fetching the current relfilenode for relation oids of + mapped relations. + To prevent unprincipled deadlocks when running in parallel mode, this code also arranges for the leader and all workers to participate in group locking. See src/backend/storage/lmgr/README for more details. diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 30ddf94c952..c1681184670 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -37,6 +37,7 @@ #include "utils/guc.h" #include "utils/inval.h" #include "utils/memutils.h" +#include "utils/relmapper.h" #include "utils/snapmgr.h" #include "utils/typcache.h" @@ -69,6 +70,7 @@ #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A) #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B) +#define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000C) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -205,6 +207,7 @@ InitializeParallelDSM(ParallelContext *pcxt) Size asnaplen = 0; Size tstatelen = 0; Size reindexlen = 0; + Size relmapperlen = 0; Size segsize = 0; int i; FixedParallelState *fps; @@ -256,8 +259,10 @@ InitializeParallelDSM(ParallelContext *pcxt) shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle)); reindexlen = EstimateReindexStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, reindexlen); + relmapperlen = EstimateRelationMapSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen); /* If you add more chunks here, you probably need to add keys. */ - shm_toc_estimate_keys(&pcxt->estimator, 8); + shm_toc_estimate_keys(&pcxt->estimator, 9); /* Estimate space need for error queues. */ StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == @@ -327,6 +332,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *asnapspace; char *tstatespace; char *reindexspace; + char *relmapperspace; char *error_queue_space; char *session_dsm_handle_space; char *entrypointstate; @@ -373,6 +379,12 @@ InitializeParallelDSM(ParallelContext *pcxt) SerializeReindexState(reindexlen, reindexspace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace); + /* Serialize relmapper state. */ + relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen); + SerializeRelationMap(relmapperlen, relmapperspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE, + relmapperspace); + /* Allocate space for worker information. */ pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers); @@ -1205,6 +1217,7 @@ ParallelWorkerMain(Datum main_arg) char *asnapspace; char *tstatespace; char *reindexspace; + char *relmapperspace; StringInfoData msgbuf; char *session_dsm_handle_space; @@ -1380,6 +1393,10 @@ ParallelWorkerMain(Datum main_arg) reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false); RestoreReindexState(reindexspace); + /* Restore relmapper state. */ + relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false); + RestoreRelationMap(relmapperspace); + /* * We've initialized all of our state now; nothing should change * hereafter. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 9aa63c8792b..cd8270d5fb0 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2018,7 +2018,7 @@ CommitTransaction(void) HOLD_INTERRUPTS(); /* Commit updates to the relation map --- do this as late as possible */ - AtEOXact_RelationMap(true); + AtEOXact_RelationMap(true, is_parallel_worker); /* * set the current transaction state information appropriately during @@ -2539,7 +2539,7 @@ AbortTransaction(void) AtAbort_Portals(); AtEOXact_LargeObject(false); AtAbort_Notify(); - AtEOXact_RelationMap(false); + AtEOXact_RelationMap(false, is_parallel_worker); AtAbort_Twophase(); /* diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c index c7f0e6f6d4a..905867dc767 100644 --- a/src/backend/utils/cache/relmapper.c +++ b/src/backend/utils/cache/relmapper.c @@ -91,6 +91,16 @@ typedef struct RelMapFile int32 pad; /* to make the struct size be 512 exactly */ } RelMapFile; +/* + * State for serializing local and shared relmappings for parallel workers + * (active states only). See notes on active_* and pending_* updates state. + */ +typedef struct SerializedActiveRelMaps +{ + RelMapFile active_shared_updates; + RelMapFile active_local_updates; +} SerializedActiveRelMaps; + /* * The currently known contents of the shared map file and our database's * local map file are stored here. These can be reloaded from disk @@ -111,6 +121,9 @@ static RelMapFile local_map; * they will become active at the next CommandCounterIncrement. This setup * lets map updates act similarly to updates of pg_class rows, ie, they * become visible only at the next CommandCounterIncrement boundary. + * + * Active shared and active local updates are serialized by the parallel + * infrastructure, and deserialized within parallel workers. */ static RelMapFile active_shared_updates; static RelMapFile active_local_updates; @@ -263,13 +276,16 @@ RelationMapUpdateMap(Oid relationId, Oid fileNode, bool shared, else { /* - * We don't currently support map changes within subtransactions. This - * could be done with more bookkeeping infrastructure, but it doesn't - * presently seem worth it. + * We don't currently support map changes within subtransactions, or + * when in parallel mode. This could be done with more bookkeeping + * infrastructure, but it doesn't presently seem worth it. */ if (GetCurrentTransactionNestLevel() > 1) elog(ERROR, "cannot change relation mapping within subtransaction"); + if (IsInParallelMode()) + elog(ERROR, "cannot change relation mapping in parallel mode"); + if (immediate) { /* Make it active, but only locally */ @@ -452,11 +468,14 @@ AtCCI_RelationMap(void) * * During abort, we just have to throw away any pending map changes. * Normal post-abort cleanup will take care of fixing relcache entries. + * Parallel worker commit/abort is handled by resetting active mappings + * that may have been received from the leader process. (There should be + * no pending updates in parallel workers.) */ void -AtEOXact_RelationMap(bool isCommit) +AtEOXact_RelationMap(bool isCommit, bool isParallelWorker) { - if (isCommit) + if (isCommit && !isParallelWorker) { /* * We should not get here with any "pending" updates. (We could @@ -482,7 +501,10 @@ AtEOXact_RelationMap(bool isCommit) } else { - /* Abort --- drop all local and pending updates */ + /* Abort or parallel worker --- drop all local and pending updates */ + Assert(!isParallelWorker || pending_shared_updates.num_mappings == 0); + Assert(!isParallelWorker || pending_local_updates.num_mappings == 0); + active_shared_updates.num_mappings = 0; active_local_updates.num_mappings = 0; pending_shared_updates.num_mappings = 0; @@ -614,6 +636,56 @@ RelationMapInitializePhase3(void) load_relmap_file(false); } +/* + * EstimateRelationMapSpace + * + * Estimate space needed to pass active shared and local relmaps to parallel + * workers. + */ +Size +EstimateRelationMapSpace(void) +{ + return sizeof(SerializedActiveRelMaps); +} + +/* + * SerializeRelationMap + * + * Serialize active shared and local relmap state for parallel workers. + */ +void +SerializeRelationMap(Size maxSize, char *startAddress) +{ + SerializedActiveRelMaps *relmaps; + + Assert(maxSize >= EstimateRelationMapSpace()); + + relmaps = (SerializedActiveRelMaps *) startAddress; + relmaps->active_shared_updates = active_shared_updates; + relmaps->active_local_updates = active_local_updates; +} + +/* + * RestoreRelationMap + * + * Restore active shared and local relmap state within a parallel worker. + */ +void +RestoreRelationMap(char *startAddress) +{ + SerializedActiveRelMaps *relmaps; + + if (active_shared_updates.num_mappings != 0 || + active_local_updates.num_mappings != 0 || + pending_shared_updates.num_mappings != 0 || + pending_local_updates.num_mappings != 0) + elog(ERROR, "parallel worker has existing mappings"); + + relmaps = (SerializedActiveRelMaps *) startAddress; + active_shared_updates = relmaps->active_shared_updates; + active_local_updates = relmaps->active_local_updates; +} + /* * load_relmap_file -- load data from the shared or local map file * diff --git a/src/include/utils/relmapper.h b/src/include/utils/relmapper.h index f69b1006bf2..fb51943498d 100644 --- a/src/include/utils/relmapper.h +++ b/src/include/utils/relmapper.h @@ -48,7 +48,7 @@ extern void RelationMapInvalidate(bool shared); extern void RelationMapInvalidateAll(void); extern void AtCCI_RelationMap(void); -extern void AtEOXact_RelationMap(bool isCommit); +extern void AtEOXact_RelationMap(bool isCommit, bool isParallelWorker); extern void AtPrepare_RelationMap(void); extern void CheckPointRelationMap(void); @@ -59,6 +59,10 @@ extern void RelationMapInitialize(void); extern void RelationMapInitializePhase2(void); extern void RelationMapInitializePhase3(void); +extern Size EstimateRelationMapSpace(void); +extern void SerializeRelationMap(Size maxSize, char *startAddress); +extern void RestoreRelationMap(char *startAddress); + extern void relmap_redo(XLogReaderState *record); extern void relmap_desc(StringInfo buf, XLogReaderState *record); extern const char *relmap_identify(uint8 info);