diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 748c9b00243..a45bd3a3156 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2071,6 +2071,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate) hjstate->hj_CurTuple = NULL; } +/* + * Decide if this process is allowed to run the unmatched scan. If so, the + * batch barrier is advanced to PHJ_BATCH_SCAN and true is returned. + * Otherwise the batch is detached and false is returned. + */ +bool +ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE); + + /* + * It would not be deadlock-free to wait on the batch barrier, because it + * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have + * already emitted tuples. Therefore, we'll hold a wait-free election: + * only one process can continue to the next phase, and all others detach + * from this batch. They can still go any work on other batches, if there + * are any. + */ + if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier)) + { + /* This process considers the batch to be done. */ + hashtable->batches[hashtable->curbatch].done = true; + + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + + /* + * Track largest batch we've seen, which would normally happen in + * ExecHashTableDetachBatch(). + */ + hashtable->spacePeak = + Max(hashtable->spacePeak, + batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + hashtable->curbatch = -1; + return false; + } + + /* Now we are alone with this batch. */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); + Assert(BarrierParticipants(&batch->batch_barrier) == 1); + + /* + * Has another process decided to give up early and command all processes + * to skip the unmatched scan? + */ + if (batch->skip_unmatched) + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + return false; + } + + /* Now prepare the process local state, just as for non-parallel join. */ + ExecPrepHashTableForUnmatched(hjstate); + + return true; +} + /* * ExecScanHashTableForUnmatched * scan the hash table for unmatched inner tuples @@ -2145,6 +2208,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) return false; } +/* + * ExecParallelScanHashTableForUnmatched + * scan the hash table for unmatched inner tuples, in parallel join + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; + + for (;;) + { + /* + * hj_CurTuple is the address of the tuple last returned from the + * current bucket, or NULL if it's time to start scanning a new + * bucket. + */ + if (hashTuple != NULL) + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + hashTuple = ExecParallelHashFirstTuple(hashtable, + hjstate->hj_CurBucketNo++); + else + break; /* finished all buckets */ + + while (hashTuple != NULL) + { + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot */ + inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it + * parallel to ExecScanHashBucket. + */ + ResetExprContext(econtext); + + hjstate->hj_CurTuple = hashTuple; + return true; + } + + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + } + + /* allow this loop to be cancellable */ + CHECK_FOR_INTERRUPTS(); + } + + /* + * no more unmatched tuples + */ + return false; +} + /* * ExecHashTableReset * @@ -3088,6 +3217,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) accessor->shared = shared; accessor->preallocated = 0; accessor->done = false; + accessor->outer_eof = false; accessor->inner_tuples = sts_attach(ParallelHashJoinBatchInner(shared), ParallelWorkerNumber + 1, @@ -3133,18 +3263,53 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) { int curbatch = hashtable->curbatch; ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + bool attached = true; /* Make sure any temporary files are closed. */ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); - /* Detach from the batch we were last working on. */ - if (BarrierArriveAndDetach(&batch->batch_barrier)) + /* After attaching we always get at least to PHJ_BATCH_PROBE. */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE || + BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); + + /* + * If we're abandoning the PHJ_BATCH_PROBE phase early without having + * reached the end of it, it means the plan doesn't want any more + * tuples, and it is happy to abandon any tuples buffered in this + * process's subplans. For correctness, we can't allow any process to + * execute the PHJ_BATCH_SCAN phase, because we will never have the + * complete set of match bits. Therefore we skip emitting unmatched + * tuples in all backends (if this is a full/right join), as if those + * tuples were all due to be emitted by this process and it has + * abandoned them too. + */ + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE && + !hashtable->batches[curbatch].outer_eof) { /* - * Technically we shouldn't access the barrier because we're no - * longer attached, but since there is no way it's moving after - * this point it seems safe to make the following assertion. + * This flag may be written to by multiple backends during + * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN + * phase so requires no extra locking. + */ + batch->skip_unmatched = true; + } + + /* + * Even if we aren't doing a full/right outer join, we'll step through + * the PHJ_BATCH_SCAN phase just to maintain the invariant that + * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free. + */ + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE) + attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier); + if (attached && BarrierArriveAndDetach(&batch->batch_barrier)) + { + /* + * We are not longer attached to the batch barrier, but we're the + * process that was chosen to free resources and it's safe to + * assert the current phase. The ParallelHashJoinBatch can't go + * away underneath us while we are attached to the build barrier, + * making this access safe. */ Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE); diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 2fc80808e30..52ed05c6f5a 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -86,6 +86,7 @@ * PHJ_BATCH_ALLOCATE* -- one allocates buckets * PHJ_BATCH_LOAD -- all load the hash table from disk * PHJ_BATCH_PROBE -- all probe + * PHJ_BATCH_SCAN* -- one does full/right unmatched scan * PHJ_BATCH_FREE* -- one frees memory * * Batch 0 is a special case, because it starts out in phase @@ -103,9 +104,10 @@ * to a barrier, unless the barrier has reached a phase that means that no * process will wait on it again. We emit tuples while attached to the build * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase - * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE - * respectively without waiting, using BarrierArriveAndDetach(). The last to - * detach receives a different return value so that it knows that it's safe to + * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN + * respectively without waiting, using BarrierArriveAndDetach() and + * BarrierArriveAndDetachExceptLast() respectively. The last to detach + * receives a different return value so that it knows that it's safe to * clean up. Any straggler process that attaches after that phase is reached * will see that it's too late to participate or access the relevant shared * memory objects. @@ -393,8 +395,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (HJ_FILL_INNER(node)) { /* set up to scan for unmatched inner tuples */ - ExecPrepHashTableForUnmatched(node); - node->hj_JoinState = HJ_FILL_INNER_TUPLES; + if (parallel) + { + /* + * Only one process is currently allow to handle + * each batch's unmatched tuples, in a parallel + * join. + */ + if (ExecParallelPrepHashTableForUnmatched(node)) + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + else + node->hj_JoinState = HJ_NEED_NEW_BATCH; + } + else + { + ExecPrepHashTableForUnmatched(node); + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + } } else node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -487,25 +504,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) { node->hj_MatchedOuter = true; - if (parallel) - { - /* - * Full/right outer joins are currently not supported - * for parallel joins, so we don't need to set the - * match bit. Experiments show that it's worth - * avoiding the shared memory traffic on large - * systems. - */ - Assert(!HJ_FILL_INNER(node)); - } - else - { - /* - * This is really only needed if HJ_FILL_INNER(node), - * but we'll avoid the branch and just set it always. - */ + + /* + * This is really only needed if HJ_FILL_INNER(node), but + * we'll avoid the branch and just set it always. + */ + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple))) HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); - } /* In an antijoin, we never return a matched tuple */ if (node->js.jointype == JOIN_ANTI) @@ -563,7 +568,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) * so any unmatched inner tuples in the hashtable have to be * emitted before we continue to the next batch. */ - if (!ExecScanHashTableForUnmatched(node, econtext)) + if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext) + : ExecScanHashTableForUnmatched(node, econtext))) { /* no more unmatched tuples */ node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -966,6 +972,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, } /* End of this batch */ + hashtable->batches[curbatch].outer_eof = true; + return NULL; } @@ -1197,13 +1205,32 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * hash table stays alive until everyone's finished * probing it, but no participant is allowed to wait at * this barrier again (or else a deadlock could occur). - * All attached participants must eventually call - * BarrierArriveAndDetach() so that the final phase - * PHJ_BATCH_FREE can be reached. + * All attached participants must eventually detach from + * the barrier and one worker must advance the phase so + * that the final phase is reached. */ ExecParallelHashTableSetCurrentBatch(hashtable, batchno); sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); + return true; + case PHJ_BATCH_SCAN: + + /* + * In principle, we could help scan for unmatched tuples, + * since that phase is already underway (the thing we + * can't do under current deadlock-avoidance rules is wait + * for others to arrive at PHJ_BATCH_SCAN, because + * PHJ_BATCH_PROBE emits tuples, but in this case we just + * got here without waiting). That is not yet done. For + * now, we just detach and go around again. We have to + * use ExecHashTableDetachBatch() because there's a small + * chance we'll be the last to detach, and then we're + * responsible for freeing memory. + */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + hashtable->batches[batchno].done = true; + ExecHashTableDetachBatch(hashtable); + break; case PHJ_BATCH_FREE: diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index e6ef0deb234..bd51e4f9724 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -2193,15 +2193,9 @@ hash_inner_and_outer(PlannerInfo *root, * able to properly guarantee uniqueness. Similarly, we can't handle * JOIN_FULL and JOIN_RIGHT, because they can produce false null * extended rows. Also, the resulting path must not be parameterized. - * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel - * Hash, since in that case we're back to a single hash table with a - * single set of match bits for each batch, but that will require - * figuring out a deadlock-free way to wait for the probe to finish. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && - save_jointype != JOIN_FULL && - save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) { @@ -2235,9 +2229,13 @@ hash_inner_and_outer(PlannerInfo *root, * total inner path will also be parallel-safe, but if not, we'll * have to search for the cheapest safe, unparameterized inner * path. If doing JOIN_UNIQUE_INNER, we can't use any alternative - * inner path. + * inner path. If full or right join, we can't use parallelism + * (building the hash table in each backend) because no one + * process has all the match bits. */ - if (cheapest_total_inner->parallel_safe) + if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT) + cheapest_safe_inner = NULL; + else if (cheapest_total_inner->parallel_safe) cheapest_safe_inner = cheapest_total_inner; else if (save_jointype != JOIN_UNIQUE_INNER) cheapest_safe_inner = diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index acb7592ca09..8ee59d2c710 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -159,6 +159,7 @@ typedef struct ParallelHashJoinBatch size_t ntuples; /* number of tuples loaded */ size_t old_ntuples; /* number of tuples before repartitioning */ bool space_exhausted; + bool skip_unmatched; /* whether to abandon unmatched scan */ /* * Variable-sized SharedTuplestore objects follow this struct in memory. @@ -203,7 +204,7 @@ typedef struct ParallelHashJoinBatchAccessor size_t estimated_size; /* size of partition on disk */ size_t old_ntuples; /* how many tuples before repartitioning? */ bool at_least_one_chunk; /* has this backend allocated a chunk? */ - + bool outer_eof; /* has this process hit end of batch? */ bool done; /* flag to remember that a batch is done */ SharedTuplestoreAccessor *inner_tuples; SharedTuplestoreAccessor *outer_tuples; @@ -266,7 +267,8 @@ typedef struct ParallelHashJoinState #define PHJ_BATCH_ALLOCATE 1 #define PHJ_BATCH_LOAD 2 #define PHJ_BATCH_PROBE 3 -#define PHJ_BATCH_FREE 4 +#define PHJ_BATCH_SCAN 4 +#define PHJ_BATCH_FREE 5 /* The phases of batch growth while hashing, for grow_batches_barrier. */ #define PHJ_GROW_BATCHES_ELECT 0 diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index d7634af05c0..56d5350c615 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); +extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate); extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext); +extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext); extern void ExecHashTableReset(HashJoinTable hashtable); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 3ec07bc1af4..09376514bbd 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -304,6 +304,13 @@ $$); t | f (1 row) +-- parallel full multi-batch hash join +select count(*) from simple r full outer join simple s using (id); + count +------- + 20000 +(1 row) + rollback to settings; -- The "bad" case: during execution we need to increase number of -- batches; in this case we plan for 1 batch, and increase at least a @@ -784,8 +791,9 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); @@ -806,7 +814,32 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- An full outer join where every record is not matched. +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r full outer join simple s using (id); + count +------- + 20000 +(1 row) + +rollback to settings; +-- A full outer join where every record is not matched. -- non-parallel savepoint settings; set local max_parallel_workers_per_gather = 0; @@ -829,8 +862,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); @@ -850,6 +884,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); 40000 (1 row) +rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: ((0 - s.id) = r.id) + -> Parallel Seq Scan on simple s + -> Parallel Hash + -> Parallel Seq Scan on simple r +(9 rows) + +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + count +------- + 40000 +(1 row) + rollback to settings; -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 77dbc182d53..179e94941ca 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -187,6 +187,8 @@ select original > 1 as initially_multibatch, final > original as increased_batch $$ select count(*) from simple r join simple s using (id); $$); +-- parallel full multi-batch hash join +select count(*) from simple r full outer join simple s using (id); rollback to settings; -- The "bad" case: during execution we need to increase number of @@ -435,7 +437,16 @@ explain (costs off) select count(*) from simple r full outer join simple s using (id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); +select count(*) from simple r full outer join simple s using (id); +rollback to settings; + +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -443,7 +454,7 @@ explain (costs off) select count(*) from simple r full outer join simple s using (id); rollback to settings; --- An full outer join where every record is not matched. +-- A full outer join where every record is not matched. -- non-parallel savepoint settings; @@ -453,7 +464,16 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +rollback to settings; + +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -461,6 +481,7 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; + -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into -- the hash table)