1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-18 02:02:55 +03:00

Revert support for ALTER TABLE ... MERGE/SPLIT PARTITION(S) commands

This commit reverts 1adf16b8fb, 87c21bb941, and subsequent fixes and
improvements including df64c81ca9, c99ef1811a, 9dfcac8e15, 885742b9f8,
842c9b2705, fcf80c5d5f, 96c7381c4c, f4fc7cb54b, 60ae37a8bc, 259c96fa8f,
449cdcd486, 3ca43dbbb6, 2a679ae94e, 3a82c689fd, fbd4321fd5, d53a4286d7,
c086896625, 4e5d6c4091, 04158e7fa3.

The reason for reverting is security issues related to repeatable name lookups
(CVE-2014-0062).  Even though 04158e7fa3 solved part of the problem, there
are still remaining issues, which aren't feasible to even carefully analyze
before the RC deadline.

Reported-by: Noah Misch, Robert Haas
Discussion: https://postgr.es/m/20240808171351.a9.nmisch%40google.com
Backpatch-through: 17
This commit is contained in:
Alexander Korotkov
2024-08-24 18:48:48 +03:00
parent 6e8a0317b4
commit 3890d90c15
25 changed files with 36 additions and 6828 deletions

View File

@@ -3214,9 +3214,8 @@ check_new_partition_bound(char *relname, Relation parent,
PartitionRangeDatum *datum;
/*
* Point to problematic key in the list of lower
* datums; if we have equality, point to the first
* one.
* Point to problematic key in the lower datums list;
* if we have equality, point to the first one.
*/
datum = cmpval == 0 ? linitial(spec->lowerdatums) :
list_nth(spec->lowerdatums, abs(cmpval) - 1);
@@ -4978,899 +4977,3 @@ satisfies_hash_partition(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(rowHash % modulus == remainder);
}
/*
* check_two_partitions_bounds_range
*
* (function for BY RANGE partitioning)
*
* This is a helper function for check_partitions_for_split() and
* calculate_partition_bound_for_merge().
* This function compares upper bound of first_bound and lower bound of
* second_bound. These bounds should be equal except when
* "defaultPart == true" (this means that one of split partitions is DEFAULT).
* In this case upper bound of first_bound can be less than lower bound of
* second_bound because space between these bounds will be included in
* DEFAULT partition.
*
* parent: partitioned table
* first_name: name of first partition
* first_bound: bound of first partition
* second_name: name of second partition
* second_bound: bound of second partition
* defaultPart: true if one of split partitions is DEFAULT
* pstate: pointer to ParseState struct for determining error position
*/
static void
check_two_partitions_bounds_range(Relation parent,
RangeVar *first_name,
PartitionBoundSpec *first_bound,
RangeVar *second_name,
PartitionBoundSpec *second_bound,
bool defaultPart,
ParseState *pstate)
{
PartitionKey key = RelationGetPartitionKey(parent);
PartitionRangeBound *first_upper;
PartitionRangeBound *second_lower;
int cmpval;
Assert(key->strategy == PARTITION_STRATEGY_RANGE);
first_upper = make_one_partition_rbound(key, -1, first_bound->upperdatums, false);
second_lower = make_one_partition_rbound(key, -1, second_bound->lowerdatums, true);
/*
* lower1=false (the second to last argument) for correct comparison of
* lower and upper bounds.
*/
cmpval = partition_rbound_cmp(key->partnatts,
key->partsupfunc,
key->partcollation,
second_lower->datums, second_lower->kind,
false, first_upper);
if ((!defaultPart && cmpval) || (defaultPart && cmpval < 0))
{
PartitionRangeDatum *datum = linitial(second_bound->lowerdatums);
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("lower bound of partition \"%s\" conflicts with upper bound of previous partition \"%s\"",
second_name->relname, first_name->relname),
parser_errposition(pstate, datum->location)));
}
}
/*
* check_partitions_not_overlap_list
*
* (function for BY LIST partitioning)
*
* This is a helper function for check_partitions_for_split().
* Checks that the values of the new partitions do not overlap.
*
* parent: partitioned table
* parts: array of SinglePartitionSpec structs with info about split partitions
* nparts: size of array "parts"
*/
static void
check_partitions_not_overlap_list(Relation parent,
SinglePartitionSpec **parts,
int nparts,
ParseState *pstate)
{
PartitionKey key PG_USED_FOR_ASSERTS_ONLY = RelationGetPartitionKey(parent);
int overlap_location = -1;
int i,
j;
SinglePartitionSpec *sps1,
*sps2;
List *overlap;
Assert(key->strategy == PARTITION_STRATEGY_LIST);
for (i = 0; i < nparts; i++)
{
sps1 = parts[i];
for (j = i + 1; j < nparts; j++)
{
sps2 = parts[j];
/*
* Calculate intersection between values of two partitions.
*/
overlap = list_intersection(sps1->bound->listdatums,
sps2->bound->listdatums);
if (list_length(overlap) > 0)
{
Const *val = (Const *) lfirst(list_head(overlap));
overlap_location = val->location;
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("new partition \"%s\" would overlap with another new partition \"%s\"",
sps1->name->relname, sps2->name->relname),
parser_errposition(pstate, overlap_location)));
}
}
}
}
/*
* get_partition_bound_spec
*
* Returns description of partition with Oid "partOid" and name "name".
*
* partOid: partition Oid
* name: partition name
*/
static PartitionBoundSpec *
get_partition_bound_spec(Oid partOid, RangeVar *name)
{
HeapTuple tuple;
Datum datum;
bool isnull;
PartitionBoundSpec *boundspec = NULL;
/* Try fetching the tuple from the catcache, for speed. */
tuple = SearchSysCache1(RELOID, partOid);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation \"%s\"",
name->relname);
datum = SysCacheGetAttr(RELOID, tuple,
Anum_pg_class_relpartbound,
&isnull);
if (isnull)
elog(ERROR, "partition bound for relation \"%s\" is null",
name->relname);
boundspec = stringToNode(TextDatumGetCString(datum));
if (!IsA(boundspec, PartitionBoundSpec))
elog(ERROR, "expected PartitionBoundSpec for relation \"%s\"",
name->relname);
ReleaseSysCache(tuple);
return boundspec;
}
/*
* check_partition_bounds_for_split_range
*
* (function for BY RANGE partitioning)
*
* Checks that bounds of new partition "spec" are inside bounds of split
* partition (with Oid splitPartOid). If first=true (this means that "spec" is
* the first of new partitions) then lower bound of "spec" should be equal (or
* greater than or equal in case defaultPart=true) to lower bound of split
* partition. If last=true (this means that "spec" is the last of new
* partitions) then upper bound of "spec" should be equal (or less than or
* equal in case defaultPart=true) to upper bound of split partition.
*
* parent: partitioned table
* relname: name of the new partition
* spec: bounds specification of the new partition
* splitPartOid: split partition Oid
* splitPartName: split partition name
* first: true in case new partition "spec" is first of new partitions
* last: true in case new partition "spec" is last of new partitions
* defaultPart: true in case partitioned table has DEFAULT partition
* pstate: pointer to ParseState struct for determine error position
*/
static void
check_partition_bounds_for_split_range(Relation parent,
char *relname,
PartitionBoundSpec *spec,
Oid splitPartOid,
RangeVar *splitPartName,
bool first,
bool last,
bool defaultPart,
ParseState *pstate)
{
PartitionKey key = RelationGetPartitionKey(parent);
PartitionRangeBound *lower,
*upper;
int cmpval;
Assert(key->strategy == PARTITION_STRATEGY_RANGE);
Assert(spec->strategy == PARTITION_STRATEGY_RANGE);
lower = make_one_partition_rbound(key, -1, spec->lowerdatums, true);
upper = make_one_partition_rbound(key, -1, spec->upperdatums, false);
/*
* First check if the resulting range would be empty with specified lower
* and upper bounds. partition_rbound_cmp cannot return zero here, since
* the lower-bound flags are different.
*/
cmpval = partition_rbound_cmp(key->partnatts,
key->partsupfunc,
key->partcollation,
lower->datums, lower->kind,
true, upper);
Assert(cmpval != 0);
if (cmpval > 0)
{
/* Point to problematic key in the lower datums list. */
PartitionRangeDatum *datum = list_nth(spec->lowerdatums, cmpval - 1);
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("empty range bound specified for partition \"%s\"",
relname),
errdetail("Specified lower bound %s is greater than or equal to upper bound %s.",
get_range_partbound_string(spec->lowerdatums),
get_range_partbound_string(spec->upperdatums)),
parser_errposition(pstate, datum->location)));
}
/* Need to check first and last partitions (from set of new partitions) */
if (first || last)
{
PartitionBoundSpec *split_spec = get_partition_bound_spec(splitPartOid, splitPartName);
PartitionRangeDatum *datum;
if (first)
{
PartitionRangeBound *split_lower;
split_lower = make_one_partition_rbound(key, -1, split_spec->lowerdatums, true);
cmpval = partition_rbound_cmp(key->partnatts,
key->partsupfunc,
key->partcollation,
lower->datums, lower->kind,
true, split_lower);
/*
* Lower bound of "spec" should be equal (or greater than or equal
* in case defaultPart=true) to lower bound of split partition.
*/
if (!defaultPart)
{
if (cmpval != 0)
{
datum = list_nth(spec->lowerdatums, abs(cmpval) - 1);
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("lower bound of partition \"%s\" is not equal to lower bound of split partition",
relname),
parser_errposition(pstate, datum->location)));
}
}
else
{
if (cmpval < 0)
{
datum = list_nth(spec->lowerdatums, abs(cmpval) - 1);
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("lower bound of partition \"%s\" is less than lower bound of split partition",
relname),
parser_errposition(pstate, datum->location)));
}
}
}
else
{
PartitionRangeBound *split_upper;
split_upper = make_one_partition_rbound(key, -1, split_spec->upperdatums, false);
cmpval = partition_rbound_cmp(key->partnatts,
key->partsupfunc,
key->partcollation,
upper->datums, upper->kind,
false, split_upper);
/*
* Upper bound of "spec" should be equal (or less than or equal in
* case defaultPart=true) to upper bound of split partition.
*/
if (!defaultPart)
{
if (cmpval != 0)
{
datum = list_nth(spec->upperdatums, abs(cmpval) - 1);
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("upper bound of partition \"%s\" is not equal to upper bound of split partition",
relname),
parser_errposition(pstate, datum->location)));
}
}
else
{
if (cmpval > 0)
{
datum = list_nth(spec->upperdatums, abs(cmpval) - 1);
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("upper bound of partition \"%s\" is greater than upper bound of split partition",
relname),
parser_errposition(pstate, datum->location)));
}
}
}
}
}
/*
* check_partition_bounds_for_split_list
*
* (function for BY LIST partitioning)
*
* Checks that bounds of new partition are inside bounds of split partition
* (with Oid splitPartOid).
*
* parent: partitioned table
* relname: name of the new partition
* spec: bounds specification of the new partition
* splitPartOid: split partition Oid
* pstate: pointer to ParseState struct for determine error position
*/
static void
check_partition_bounds_for_split_list(Relation parent, char *relname,
PartitionBoundSpec *spec,
Oid splitPartOid,
ParseState *pstate)
{
PartitionKey key = RelationGetPartitionKey(parent);
PartitionDesc partdesc = RelationGetPartitionDesc(parent, false);
PartitionBoundInfo boundinfo = partdesc->boundinfo;
int with = -1;
bool overlap = false;
int overlap_location = -1;
ListCell *cell;
Assert(key->strategy == PARTITION_STRATEGY_LIST);
Assert(spec->strategy == PARTITION_STRATEGY_LIST);
Assert(boundinfo && boundinfo->strategy == PARTITION_STRATEGY_LIST);
/*
* Search each value of new partition "spec" in existing partitions. All
* of them should be in split partition (with Oid splitPartOid).
*/
foreach(cell, spec->listdatums)
{
Const *val = lfirst_node(Const, cell);
overlap_location = val->location;
if (!val->constisnull)
{
int offset;
bool equal;
offset = partition_list_bsearch(&key->partsupfunc[0],
key->partcollation,
boundinfo,
val->constvalue,
&equal);
if (offset >= 0 && equal)
{
with = boundinfo->indexes[offset];
if (partdesc->oids[with] != splitPartOid)
{
overlap = true;
break;
}
}
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("new partition \"%s\" cannot have this value because split partition does not have",
relname),
parser_errposition(pstate, overlap_location)));
}
else if (partition_bound_accepts_nulls(boundinfo))
{
with = boundinfo->null_index;
if (partdesc->oids[with] != splitPartOid)
{
overlap = true;
break;
}
}
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("new partition \"%s\" cannot have NULL value because split partition does not have",
relname),
parser_errposition(pstate, overlap_location)));
}
if (overlap)
{
Assert(with >= 0);
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("new partition \"%s\" would overlap with another (not split) partition \"%s\"",
relname, get_rel_name(partdesc->oids[with])),
parser_errposition(pstate, overlap_location)));
}
}
/*
* find_value_in_new_partitions_list
*
* (function for BY LIST partitioning)
*
* Function returns true in case any of new partitions contains value "value".
*
* partsupfunc: information about comparison function associated with the partition key
* partcollation: partitioning collation
* parts: pointer to array with new partitions descriptions
* nparts: number of new partitions
* value: the value that we are looking for
* isnull: true if the value that we are looking for is NULL
*/
static bool
find_value_in_new_partitions_list(FmgrInfo *partsupfunc,
Oid *partcollation,
SinglePartitionSpec **parts,
int nparts,
Datum value,
bool isnull)
{
ListCell *valptr;
int i;
for (i = 0; i < nparts; i++)
{
SinglePartitionSpec *sps = parts[i];
foreach(valptr, sps->bound->listdatums)
{
Const *val = lfirst_node(Const, valptr);
if (isnull && val->constisnull)
return true;
if (!isnull && !val->constisnull)
{
if (DatumGetInt32(FunctionCall2Coll(&partsupfunc[0],
partcollation[0],
val->constvalue,
value)) == 0)
return true;
}
}
}
return false;
}
/*
* check_parent_values_in_new_partitions
*
* (function for BY LIST partitioning)
*
* Checks that all values of split partition (with Oid partOid) contains in new
* partitions.
*
* parent: partitioned table
* partOid: split partition Oid
* parts: pointer to array with new partitions descriptions
* nparts: number of new partitions
* pstate: pointer to ParseState struct for determine error position
*/
static void
check_parent_values_in_new_partitions(Relation parent,
Oid partOid,
SinglePartitionSpec **parts,
int nparts,
ParseState *pstate)
{
PartitionKey key = RelationGetPartitionKey(parent);
PartitionDesc partdesc = RelationGetPartitionDesc(parent, false);
PartitionBoundInfo boundinfo = partdesc->boundinfo;
int i;
bool found = true;
bool searchNull = false;
Datum datum = PointerGetDatum(NULL);
Assert(key->strategy == PARTITION_STRATEGY_LIST);
/*
* Special processing for NULL value. Search NULL value if the split
* partition (partOid) contains it.
*/
if (partition_bound_accepts_nulls(boundinfo) &&
partdesc->oids[boundinfo->null_index] == partOid)
{
if (!find_value_in_new_partitions_list(&key->partsupfunc[0],
key->partcollation, parts, nparts, datum, true))
{
found = false;
searchNull = true;
}
}
/*
* Search all values of split partition with partOid in PartitionDesc of
* partitioned table.
*/
for (i = 0; i < boundinfo->ndatums; i++)
{
if (partdesc->oids[boundinfo->indexes[i]] == partOid)
{
/* We found value that split partition contains. */
datum = boundinfo->datums[i][0];
if (!find_value_in_new_partitions_list(&key->partsupfunc[0],
key->partcollation, parts, nparts, datum, false))
{
found = false;
break;
}
}
}
if (!found)
{
Const *notFoundVal;
if (!searchNull)
/*
* Make Const for getting string representation of not found
* value.
*/
notFoundVal = makeConst(key->parttypid[0],
key->parttypmod[0],
key->parttypcoll[0],
key->parttyplen[0],
datum,
false, /* isnull */
key->parttypbyval[0]);
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("new partitions do not have value %s but split partition does",
searchNull ? "NULL" : get_list_partvalue_string(notFoundVal))));
}
}
/*
* check_partitions_for_split
*
* Checks new partitions for SPLIT PARTITIONS command:
* 1. DEFAULT partition should be one.
* 2. New partitions should have different names
* (with existing partitions too).
* 3. Bounds of new partitions should not overlap with new and existing
* partitions.
* 4. In case split partition is DEFAULT partition, one of new partitions
* should be DEFAULT.
* 5. In case new partitions or existing partitions contains DEFAULT
* partition, new partitions can have any bounds inside split
* partition bound (can be spaces between partitions bounds).
* 6. In case partitioned table does not have DEFAULT partition, DEFAULT
* partition can be defined as one of new partition.
* 7. In case new partitions not contains DEFAULT partition and
* partitioned table does not have DEFAULT partition the following
* should be true: sum bounds of new partitions should be equal
* to bound of split partition.
*
* parent: partitioned table
* splitPartOid: split partition Oid
* splitPartName: split partition name
* list: list of new partitions
* pstate: pointer to ParseState struct for determine error position
*/
void
check_partitions_for_split(Relation parent,
Oid splitPartOid,
RangeVar *splitPartName,
List *partlist,
ParseState *pstate)
{
PartitionKey key;
char strategy;
Oid defaultPartOid;
bool isSplitPartDefault;
bool existsDefaultPart;
ListCell *listptr;
int default_index = -1;
int i,
j;
SinglePartitionSpec **new_parts;
SinglePartitionSpec *spsPrev = NULL;
int nparts = 0;
key = RelationGetPartitionKey(parent);
strategy = get_partition_strategy(key);
switch (strategy)
{
case PARTITION_STRATEGY_LIST:
case PARTITION_STRATEGY_RANGE:
{
/*
* Make array new_parts with new partitions except DEFAULT
* partition.
*/
new_parts = (SinglePartitionSpec **)
palloc0(list_length(partlist) * sizeof(SinglePartitionSpec *));
i = 0;
foreach(listptr, partlist)
{
SinglePartitionSpec *sps =
(SinglePartitionSpec *) lfirst(listptr);
if (sps->bound->is_default)
{
if (default_index >= 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("DEFAULT partition should be one")),
parser_errposition(pstate, sps->name->location));
default_index = i;
}
else
{
new_parts[nparts++] = sps;
}
i++;
}
}
break;
case PARTITION_STRATEGY_HASH:
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("partition of hash-partitioned table cannot be split")));
break;
default:
elog(ERROR, "unexpected partition strategy: %d",
(int) key->strategy);
break;
}
if (strategy == PARTITION_STRATEGY_RANGE)
{
PartitionRangeBound **lower_bounds;
SinglePartitionSpec **tmp_new_parts;
/*
* For simplify check for ranges of new partitions need to sort all
* partitions in ascending order of them bounds (we compare upper
* bound only).
*/
lower_bounds = (PartitionRangeBound **)
palloc0(nparts * sizeof(PartitionRangeBound *));
/* Create array of lower bounds. */
for (i = 0; i < nparts; i++)
{
lower_bounds[i] = make_one_partition_rbound(key, i,
new_parts[i]->bound->lowerdatums, true);
}
/* Sort array of lower bounds. */
qsort_arg(lower_bounds, nparts, sizeof(PartitionRangeBound *),
qsort_partition_rbound_cmp, (void *) key);
/* Reorder array of partitions. */
tmp_new_parts = new_parts;
new_parts = (SinglePartitionSpec **)
palloc0(nparts * sizeof(SinglePartitionSpec *));
for (i = 0; i < nparts; i++)
new_parts[i] = tmp_new_parts[lower_bounds[i]->index];
pfree(tmp_new_parts);
pfree(lower_bounds);
}
defaultPartOid =
get_default_oid_from_partdesc(RelationGetPartitionDesc(parent, true));
/* isSplitPartDefault flag: is split partition a DEFAULT partition? */
isSplitPartDefault = (defaultPartOid == splitPartOid);
if (isSplitPartDefault && default_index < 0)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("one partition in the list should be DEFAULT because split partition is DEFAULT")),
parser_errposition(pstate, ((SinglePartitionSpec *) linitial(partlist))->name->location));
}
else if (!isSplitPartDefault && (default_index >= 0) && OidIsValid(defaultPartOid))
{
SinglePartitionSpec *spsDef =
(SinglePartitionSpec *) list_nth(partlist, default_index);
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("new partition cannot be DEFAULT because DEFAULT partition already exists")),
parser_errposition(pstate, spsDef->name->location));
}
/* Indicator that partitioned table has (or will have) DEFAULT partition */
existsDefaultPart = OidIsValid(defaultPartOid) || (default_index >= 0);
for (i = 0; i < nparts; i++)
{
SinglePartitionSpec *sps = new_parts[i];
if (isSplitPartDefault)
{
/*
* In case split partition is DEFAULT partition we can use any
* free ranges - as when creating a new partition.
*/
check_new_partition_bound(sps->name->relname, parent, sps->bound,
pstate);
}
else
{
/*
* Checks that bound of current partition is inside bound of split
* partition. For range partitioning: checks that upper bound of
* previous partition is equal to lower bound of current
* partition. For list partitioning: checks that split partition
* contains all values of current partition.
*/
if (strategy == PARTITION_STRATEGY_RANGE)
{
bool first = (i == 0);
bool last = (i == (nparts - 1));
check_partition_bounds_for_split_range(parent, sps->name->relname, sps->bound,
splitPartOid, splitPartName,
first, last,
existsDefaultPart, pstate);
}
else
check_partition_bounds_for_split_list(parent, sps->name->relname,
sps->bound, splitPartOid, pstate);
}
/* Ranges of new partitions should not overlap. */
if (strategy == PARTITION_STRATEGY_RANGE && spsPrev)
check_two_partitions_bounds_range(parent, spsPrev->name, spsPrev->bound,
sps->name, sps->bound, existsDefaultPart, pstate);
spsPrev = sps;
/* Check: new partitions should have different names. */
for (j = i + 1; j < nparts; j++)
{
SinglePartitionSpec *sps2 = new_parts[j];
if (equal(sps->name, sps2->name))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("name \"%s\" is already used", sps2->name->relname)),
parser_errposition(pstate, sps2->name->location));
}
}
if (strategy == PARTITION_STRATEGY_LIST)
{
/* Values of new partitions should not overlap. */
check_partitions_not_overlap_list(parent, new_parts, nparts,
pstate);
/*
* Need to check that all values of split partition contains in new
* partitions. Skip this check if DEFAULT partition exists.
*/
if (!existsDefaultPart)
check_parent_values_in_new_partitions(parent, splitPartOid,
new_parts, nparts, pstate);
}
pfree(new_parts);
}
/*
* calculate_partition_bound_for_merge
*
* Calculates the bound of merged partition "spec" by using the bounds of
* partitions to be merged.
*
* parent: partitioned table
* partNames: names of partitions to be merged
* partOids: Oids of partitions to be merged
* spec (out): bounds specification of the merged partition
* pstate: pointer to ParseState struct for determine error position
*/
void
calculate_partition_bound_for_merge(Relation parent,
List *partNames,
List *partOids,
PartitionBoundSpec *spec,
ParseState *pstate)
{
PartitionKey key = RelationGetPartitionKey(parent);
PartitionBoundSpec *bound;
Assert(!spec->is_default);
switch (key->strategy)
{
case PARTITION_STRATEGY_RANGE:
{
int i;
PartitionRangeBound **lower_bounds;
int nparts = list_length(partOids);
List *bounds = NIL;
lower_bounds = (PartitionRangeBound **)
palloc0(nparts * sizeof(PartitionRangeBound *));
/*
* Create array of lower bounds and list of
* PartitionBoundSpec.
*/
for (i = 0; i < nparts; i++)
{
bound = get_partition_bound_spec(list_nth_oid(partOids, i),
(RangeVar *) list_nth(partNames, i));
lower_bounds[i] = make_one_partition_rbound(key, i, bound->lowerdatums, true);
bounds = lappend(bounds, bound);
}
/* Sort array of lower bounds. */
qsort_arg(lower_bounds, nparts, sizeof(PartitionRangeBound *),
qsort_partition_rbound_cmp, (void *) key);
/* Ranges of partitions should not overlap. */
for (i = 1; i < nparts; i++)
{
int index = lower_bounds[i]->index;
int prev_index = lower_bounds[i - 1]->index;
check_two_partitions_bounds_range(parent,
(RangeVar *) list_nth(partNames, prev_index),
(PartitionBoundSpec *) list_nth(bounds, prev_index),
(RangeVar *) list_nth(partNames, index),
(PartitionBoundSpec *) list_nth(bounds, index),
false, pstate);
}
/*
* Lower bound of first partition is the lower bound of merged
* partition.
*/
spec->lowerdatums =
((PartitionBoundSpec *) list_nth(bounds, lower_bounds[0]->index))->lowerdatums;
/*
* Upper bound of last partition is the upper bound of merged
* partition.
*/
spec->upperdatums =
((PartitionBoundSpec *) list_nth(bounds, lower_bounds[nparts - 1]->index))->upperdatums;
pfree(lower_bounds);
list_free(bounds);
break;
}
case PARTITION_STRATEGY_LIST:
{
ListCell *listptr,
*listptr2;
/* Consolidate bounds for all partitions in the list. */
forboth(listptr, partOids, listptr2, partNames)
{
RangeVar *name = (RangeVar *) lfirst(listptr2);
Oid curOid = lfirst_oid(listptr);
bound = get_partition_bound_spec(curOid, name);
spec->listdatums = list_concat(spec->listdatums, bound->listdatums);
}
break;
}
default:
elog(ERROR, "unexpected partition strategy: %d",
(int) key->strategy);
}
}