1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Parallel Hash Full Join.

Full and right outer joins were not supported in the initial
implementation of Parallel Hash Join because of deadlock hazards (see
discussion).  Therefore FULL JOIN inhibited parallelism, as the other
join strategies can't do that in parallel either.

Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on
the inner side of one batch's hash table.  For now, sidestep the
deadlock problem by terminating parallelism there.  The last process to
arrive at that phase emits the unmatched tuples, while others detach and
are free to go and work on other batches, if there are any, but
otherwise they finish the join early.

That unfairness is considered acceptable for now, because it's better
than no parallelism at all.  The build and probe phases are run in
parallel, and the new scan-for-unmatched phase, while serial, is usually
applied to the smaller of the two relations and is either limited by
some multiple of work_mem, or it's too big and is partitioned into
batches and then the situation is improved by batch-level parallelism.

Author: Melanie Plageman <melanieplageman@gmail.com>
Author: Thomas Munro <thomas.munro@gmail.com>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
This commit is contained in:
Thomas Munro
2023-03-31 11:01:51 +13:00
parent ca7b3c4c00
commit 11c2d6fdf5
7 changed files with 323 additions and 48 deletions

View File

@@ -2071,6 +2071,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
hjstate->hj_CurTuple = NULL;
}
/*
* Decide if this process is allowed to run the unmatched scan. If so, the
* batch barrier is advanced to PHJ_BATCH_SCAN and true is returned.
* Otherwise the batch is detached and false is returned.
*/
bool
ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
int curbatch = hashtable->curbatch;
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE);
/*
* It would not be deadlock-free to wait on the batch barrier, because it
* is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
* already emitted tuples. Therefore, we'll hold a wait-free election:
* only one process can continue to the next phase, and all others detach
* from this batch. They can still go any work on other batches, if there
* are any.
*/
if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier))
{
/* This process considers the batch to be done. */
hashtable->batches[hashtable->curbatch].done = true;
/* Make sure any temporary files are closed. */
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
/*
* Track largest batch we've seen, which would normally happen in
* ExecHashTableDetachBatch().
*/
hashtable->spacePeak =
Max(hashtable->spacePeak,
batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
hashtable->curbatch = -1;
return false;
}
/* Now we are alone with this batch. */
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
Assert(BarrierParticipants(&batch->batch_barrier) == 1);
/*
* Has another process decided to give up early and command all processes
* to skip the unmatched scan?
*/
if (batch->skip_unmatched)
{
hashtable->batches[hashtable->curbatch].done = true;
ExecHashTableDetachBatch(hashtable);
return false;
}
/* Now prepare the process local state, just as for non-parallel join. */
ExecPrepHashTableForUnmatched(hjstate);
return true;
}
/*
* ExecScanHashTableForUnmatched
* scan the hash table for unmatched inner tuples
@@ -2145,6 +2208,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
return false;
}
/*
* ExecParallelScanHashTableForUnmatched
* scan the hash table for unmatched inner tuples, in parallel join
*
* On success, the inner tuple is stored into hjstate->hj_CurTuple and
* econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
* for the latter.
*/
bool
ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
HashJoinTuple hashTuple = hjstate->hj_CurTuple;
for (;;)
{
/*
* hj_CurTuple is the address of the tuple last returned from the
* current bucket, or NULL if it's time to start scanning a new
* bucket.
*/
if (hashTuple != NULL)
hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
hashTuple = ExecParallelHashFirstTuple(hashtable,
hjstate->hj_CurBucketNo++);
else
break; /* finished all buckets */
while (hashTuple != NULL)
{
if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
{
TupleTableSlot *inntuple;
/* insert hashtable's tuple into exec slot */
inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
hjstate->hj_HashTupleSlot,
false); /* do not pfree */
econtext->ecxt_innertuple = inntuple;
/*
* Reset temp memory each time; although this function doesn't
* do any qual eval, the caller will, so let's keep it
* parallel to ExecScanHashBucket.
*/
ResetExprContext(econtext);
hjstate->hj_CurTuple = hashTuple;
return true;
}
hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
}
/* allow this loop to be cancellable */
CHECK_FOR_INTERRUPTS();
}
/*
* no more unmatched tuples
*/
return false;
}
/*
* ExecHashTableReset
*
@@ -3088,6 +3217,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
accessor->shared = shared;
accessor->preallocated = 0;
accessor->done = false;
accessor->outer_eof = false;
accessor->inner_tuples =
sts_attach(ParallelHashJoinBatchInner(shared),
ParallelWorkerNumber + 1,
@@ -3133,18 +3263,53 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
{
int curbatch = hashtable->curbatch;
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
bool attached = true;
/* Make sure any temporary files are closed. */
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
/* Detach from the batch we were last working on. */
if (BarrierArriveAndDetach(&batch->batch_barrier))
/* After attaching we always get at least to PHJ_BATCH_PROBE. */
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
/*
* If we're abandoning the PHJ_BATCH_PROBE phase early without having
* reached the end of it, it means the plan doesn't want any more
* tuples, and it is happy to abandon any tuples buffered in this
* process's subplans. For correctness, we can't allow any process to
* execute the PHJ_BATCH_SCAN phase, because we will never have the
* complete set of match bits. Therefore we skip emitting unmatched
* tuples in all backends (if this is a full/right join), as if those
* tuples were all due to be emitted by this process and it has
* abandoned them too.
*/
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
!hashtable->batches[curbatch].outer_eof)
{
/*
* Technically we shouldn't access the barrier because we're no
* longer attached, but since there is no way it's moving after
* this point it seems safe to make the following assertion.
* This flag may be written to by multiple backends during
* PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
* phase so requires no extra locking.
*/
batch->skip_unmatched = true;
}
/*
* Even if we aren't doing a full/right outer join, we'll step through
* the PHJ_BATCH_SCAN phase just to maintain the invariant that
* freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
*/
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)
attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
{
/*
* We are not longer attached to the batch barrier, but we're the
* process that was chosen to free resources and it's safe to
* assert the current phase. The ParallelHashJoinBatch can't go
* away underneath us while we are attached to the build barrier,
* making this access safe.
*/
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);