1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-21 00:42:43 +03:00

Add a Gather executor node.

A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream.  It can also run the plan itself, if the workers are
unavailable or haven't started up yet.  It is intended to work with
the Partial Seq Scan node which will be added in future commits.

It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used.  In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results.  So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes.  Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.

There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne.  But we're getting
close.

Amit Kapila.  Some designs suggestions were provided by me, and I also
reviewed the patch.  Single-copy mode, documentation, and other minor
changes also by me.
This commit is contained in:
Robert Haas
2015-09-30 19:23:36 -04:00
parent 227d57f358
commit 3bd909b220
26 changed files with 709 additions and 8 deletions

View File

@@ -11,6 +11,8 @@
* cpu_tuple_cost Cost of typical CPU time to process a tuple
* cpu_index_tuple_cost Cost of typical CPU time to process an index tuple
* cpu_operator_cost Cost of CPU time to execute an operator or function
* parallel_tuple_cost Cost of CPU time to pass a tuple from worker to master backend
* parallel_setup_cost Cost of setting up shared memory for parallelism
*
* We expect that the kernel will typically do some amount of read-ahead
* optimization; this in conjunction with seek costs means that seq_page_cost
@@ -102,11 +104,15 @@ double random_page_cost = DEFAULT_RANDOM_PAGE_COST;
double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST;
double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST;
double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST;
double parallel_tuple_cost = DEFAULT_PARALLEL_TUPLE_COST;
double parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST;
int effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE;
Cost disable_cost = 1.0e10;
int max_parallel_degree = 0;
bool enable_seqscan = true;
bool enable_indexscan = true;
bool enable_indexonlyscan = true;
@@ -289,6 +295,38 @@ cost_samplescan(Path *path, PlannerInfo *root,
path->total_cost = startup_cost + run_cost;
}
/*
* cost_gather
* Determines and returns the cost of gather path.
*
* 'rel' is the relation to be operated upon
* 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
*/
void
cost_gather(GatherPath *path, PlannerInfo *root,
RelOptInfo *rel, ParamPathInfo *param_info)
{
Cost startup_cost = 0;
Cost run_cost = 0;
/* Mark the path with the correct row estimate */
if (param_info)
path->path.rows = param_info->ppi_rows;
else
path->path.rows = rel->rows;
startup_cost = path->subpath->startup_cost;
run_cost = path->subpath->total_cost - path->subpath->startup_cost;
/* Parallel setup and communication cost. */
startup_cost += parallel_setup_cost;
run_cost += parallel_tuple_cost * rel->tuples;
path->path.startup_cost = startup_cost;
path->path.total_cost = (startup_cost + run_cost);
}
/*
* cost_index
* Determines and returns the cost of scanning a relation using an index.

View File

@@ -60,6 +60,8 @@ static SeqScan *create_seqscan_plan(PlannerInfo *root, Path *best_path,
List *tlist, List *scan_clauses);
static SampleScan *create_samplescan_plan(PlannerInfo *root, Path *best_path,
List *tlist, List *scan_clauses);
static Gather *create_gather_plan(PlannerInfo *root,
GatherPath *best_path);
static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path,
List *tlist, List *scan_clauses, bool indexonly);
static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root,
@@ -104,6 +106,8 @@ static void copy_plan_costsize(Plan *dest, Plan *src);
static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid);
static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid,
TableSampleClause *tsc);
static Gather *make_gather(List *qptlist, List *qpqual,
int nworkers, bool single_copy, Plan *subplan);
static IndexScan *make_indexscan(List *qptlist, List *qpqual, Index scanrelid,
Oid indexid, List *indexqual, List *indexqualorig,
List *indexorderby, List *indexorderbyorig,
@@ -273,6 +277,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path)
plan = create_unique_plan(root,
(UniquePath *) best_path);
break;
case T_Gather:
plan = (Plan *) create_gather_plan(root,
(GatherPath *) best_path);
break;
default:
elog(ERROR, "unrecognized node type: %d",
(int) best_path->pathtype);
@@ -1101,6 +1109,34 @@ create_unique_plan(PlannerInfo *root, UniquePath *best_path)
return plan;
}
/*
* create_gather_plan
*
* Create a Gather plan for 'best_path' and (recursively) plans
* for its subpaths.
*/
static Gather *
create_gather_plan(PlannerInfo *root, GatherPath *best_path)
{
Gather *gather_plan;
Plan *subplan;
subplan = create_plan_recurse(root, best_path->subpath);
gather_plan = make_gather(subplan->targetlist,
NIL,
best_path->num_workers,
best_path->single_copy,
subplan);
copy_path_costsize(&gather_plan->plan, &best_path->path);
/* use parallel mode for parallel plans. */
root->glob->parallelModeNeeded = true;
return gather_plan;
}
/*****************************************************************************
*
@@ -4735,6 +4771,27 @@ make_unique(Plan *lefttree, List *distinctList)
return node;
}
static Gather *
make_gather(List *qptlist,
List *qpqual,
int nworkers,
bool single_copy,
Plan *subplan)
{
Gather *node = makeNode(Gather);
Plan *plan = &node->plan;
/* cost should be inserted by caller */
plan->targetlist = qptlist;
plan->qual = qpqual;
plan->lefttree = subplan;
plan->righttree = NULL;
node->num_workers = nworkers;
node->single_copy = single_copy;
return node;
}
/*
* distinctList is a list of SortGroupClauses, identifying the targetlist
* items that should be considered by the SetOp filter. The input path must

View File

@@ -607,6 +607,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
case T_Sort:
case T_Unique:
case T_SetOp:
case T_Gather:
/*
* These plan types don't actually bother to evaluate their

View File

@@ -2584,6 +2584,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
case T_Material:
case T_Sort:
case T_Unique:
case T_Gather:
case T_SetOp:
case T_Group:
break;

View File

@@ -1307,6 +1307,32 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
return pathnode;
}
/*
* create_gather_path
*
* Creates a path corresponding to a gather scan, returning the
* pathnode.
*/
GatherPath *
create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
Relids required_outer, int nworkers)
{
GatherPath *pathnode = makeNode(GatherPath);
pathnode->path.pathtype = T_Gather;
pathnode->path.parent = rel;
pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
required_outer);
pathnode->path.pathkeys = NIL; /* Gather has unordered result */
pathnode->subpath = subpath;
pathnode->num_workers = nworkers;
cost_gather(pathnode, root, rel, pathnode->path.param_info);
return pathnode;
}
/*
* translate_sub_tlist - get subquery column numbers represented by tlist
*