1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-18 12:22:09 +03:00

Revert support for ALTER TABLE ... MERGE/SPLIT PARTITION(S) commands

This commit reverts 1adf16b8fb, 87c21bb941, and subsequent fixes and
improvements including df64c81ca9, c99ef1811a, 9dfcac8e15, 885742b9f8,
842c9b2705, fcf80c5d5f, 96c7381c4c, f4fc7cb54b, 60ae37a8bc, 259c96fa8f,
449cdcd486, 3ca43dbbb6, 2a679ae94e, 3a82c689fd, fbd4321fd5, d53a4286d7,
c086896625, 4e5d6c4091, 04158e7fa3.

The reason for reverting is security issues related to repeatable name lookups
(CVE-2014-0062).  Even though 04158e7fa3 solved part of the problem, there
are still remaining issues, which aren't feasible to even carefully analyze
before the RC deadline.

Reported-by: Noah Misch, Robert Haas
Discussion: https://postgr.es/m/20240808171351.a9.nmisch%40google.com
Backpatch-through: 17
This commit is contained in:
Alexander Korotkov
2024-08-24 18:48:48 +03:00
parent 29e1253198
commit 84f594da35
25 changed files with 40 additions and 6822 deletions

View File

@@ -32,7 +32,6 @@
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_am.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_constraint.h"
@@ -59,8 +58,6 @@
#include "parser/parse_type.h"
#include "parser/parse_utilcmd.h"
#include "parser/parser.h"
#include "partitioning/partdesc.h"
#include "partitioning/partbounds.h"
#include "rewrite/rewriteManip.h"
#include "utils/acl.h"
#include "utils/builtins.h"
@@ -136,7 +133,7 @@ static void transformConstraintAttrs(CreateStmtContext *cxt,
List *constraintList);
static void transformColumnType(CreateStmtContext *cxt, ColumnDef *column);
static void setSchemaName(const char *context_schema, char **stmt_schema_name);
static void transformPartitionCmd(CreateStmtContext *cxt, PartitionBoundSpec *bound);
static void transformPartitionCmd(CreateStmtContext *cxt, PartitionCmd *cmd);
static List *transformPartitionRangeBounds(ParseState *pstate, List *blist,
Relation parent);
static void validateInfiniteBounds(ParseState *pstate, List *blist);
@@ -3232,160 +3229,6 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString,
}
/*
* checkPartition
* Check that partRelOid is an oid of partition of the parent table rel
*/
static void
checkPartition(Relation rel, Oid partRelOid)
{
Relation partRel;
partRel = relation_open(partRelOid, AccessShareLock);
if (partRel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table",
RelationGetRelationName(partRel))));
if (!partRel->rd_rel->relispartition)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a partition",
RelationGetRelationName(partRel))));
if (get_partition_parent(partRelOid, false) != RelationGetRelid(rel))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" is not a partition of relation \"%s\"",
RelationGetRelationName(partRel),
RelationGetRelationName(rel))));
/* Permissions checks */
if (!object_ownercheck(RelationRelationId, RelationGetRelid(partRel), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(partRel->rd_rel->relkind),
RelationGetRelationName(partRel));
relation_close(partRel, AccessShareLock);
}
/*
* transformPartitionCmdForSplit
* Analyze the ALTER TABLE ... SPLIT PARTITION command
*
* For each new partition sps->bound is set to the transformed value of bound.
* Does checks for bounds of new partitions.
*/
static void
transformPartitionCmdForSplit(CreateStmtContext *cxt, PartitionCmd *partcmd)
{
Relation parent = cxt->rel;
Oid splitPartOid;
ListCell *listptr;
if (parent->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("\"%s\" is not a partitioned table", RelationGetRelationName(parent))));
/* Transform partition bounds for all partitions in the list: */
foreach(listptr, partcmd->partlist)
{
SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr);
cxt->partbound = NULL;
transformPartitionCmd(cxt, sps->bound);
/* Assign transformed value of the partition bound. */
sps->bound = cxt->partbound;
}
splitPartOid = RangeVarGetRelid(partcmd->name, NoLock, false);
checkPartition(parent, splitPartOid);
/* Then we should check partitions with transformed bounds. */
check_partitions_for_split(parent, splitPartOid, partcmd->name, partcmd->partlist, cxt->pstate);
}
/*
* transformPartitionCmdForMerge
* Analyze the ALTER TABLE ... MERGE PARTITIONS command
*
* Does simple checks for merged partitions. Calculates bound of resulting
* partition.
*/
static void
transformPartitionCmdForMerge(CreateStmtContext *cxt, PartitionCmd *partcmd)
{
Oid defaultPartOid;
Oid partOid;
Relation parent = cxt->rel;
PartitionKey key;
char strategy;
ListCell *listptr,
*listptr2;
bool isDefaultPart = false;
List *partOids = NIL;
if (parent->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("\"%s\" is not a partitioned table", RelationGetRelationName(parent))));
key = RelationGetPartitionKey(parent);
strategy = get_partition_strategy(key);
if (strategy == PARTITION_STRATEGY_HASH)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("partition of hash-partitioned table cannot be merged")));
/* Is current partition a DEFAULT partition? */
defaultPartOid = get_default_oid_from_partdesc(
RelationGetPartitionDesc(parent, true));
foreach(listptr, partcmd->partlist)
{
RangeVar *name = (RangeVar *) lfirst(listptr);
/* Partitions in the list should have different names. */
for_each_cell(listptr2, partcmd->partlist, lnext(partcmd->partlist, listptr))
{
RangeVar *name2 = (RangeVar *) lfirst(listptr2);
if (equal(name, name2))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("partition with name \"%s\" is already used", name->relname)),
parser_errposition(cxt->pstate, name2->location));
}
/* Search DEFAULT partition in the list. */
partOid = RangeVarGetRelid(name, NoLock, false);
if (partOid == defaultPartOid)
isDefaultPart = true;
checkPartition(parent, partOid);
partOids = lappend_oid(partOids, partOid);
}
/* Allocate bound of resulting partition. */
Assert(partcmd->bound == NULL);
partcmd->bound = makeNode(PartitionBoundSpec);
/* Fill partition bound. */
partcmd->bound->strategy = strategy;
partcmd->bound->location = -1;
partcmd->bound->is_default = isDefaultPart;
if (!isDefaultPart)
calculate_partition_bound_for_merge(parent, partcmd->partlist,
partOids, partcmd->bound,
cxt->pstate);
}
/*
* transformAlterTableStmt -
* parse analysis for ALTER TABLE
@@ -3654,7 +3497,7 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
{
PartitionCmd *partcmd = (PartitionCmd *) cmd->def;
transformPartitionCmd(&cxt, partcmd->bound);
transformPartitionCmd(&cxt, partcmd);
/* assign transformed value of the partition bound */
partcmd->bound = cxt.partbound;
}
@@ -3662,24 +3505,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
newcmds = lappend(newcmds, cmd);
break;
case AT_SplitPartition:
case AT_MergePartitions:
{
PartitionCmd *partcmd = (PartitionCmd *) cmd->def;
if (list_length(partcmd->partlist) < 2)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("list of new partitions should contain at least two items")));
if (cmd->subtype == AT_SplitPartition)
transformPartitionCmdForSplit(&cxt, partcmd);
else
transformPartitionCmdForMerge(&cxt, partcmd);
newcmds = lappend(newcmds, cmd);
break;
}
default:
/*
@@ -4070,13 +3895,13 @@ setSchemaName(const char *context_schema, char **stmt_schema_name)
/*
* transformPartitionCmd
* Analyze the ATTACH/DETACH/SPLIT PARTITION command
* Analyze the ATTACH/DETACH PARTITION command
*
* In case of the ATTACH/SPLIT PARTITION command, cxt->partbound is set to the
* transformed value of bound.
* In case of the ATTACH PARTITION command, cxt->partbound is set to the
* transformed value of cmd->bound.
*/
static void
transformPartitionCmd(CreateStmtContext *cxt, PartitionBoundSpec *bound)
transformPartitionCmd(CreateStmtContext *cxt, PartitionCmd *cmd)
{
Relation parentRel = cxt->rel;
@@ -4085,9 +3910,9 @@ transformPartitionCmd(CreateStmtContext *cxt, PartitionBoundSpec *bound)
case RELKIND_PARTITIONED_TABLE:
/* transform the partition bound, if any */
Assert(RelationGetPartitionKey(parentRel) != NULL);
if (bound != NULL)
if (cmd->bound != NULL)
cxt->partbound = transformPartitionBound(cxt->pstate, parentRel,
bound);
cmd->bound);
break;
case RELKIND_PARTITIONED_INDEX:
@@ -4095,7 +3920,7 @@ transformPartitionCmd(CreateStmtContext *cxt, PartitionBoundSpec *bound)
* A partitioned index cannot have a partition bound set. ALTER
* INDEX prevents that with its grammar, but not ALTER TABLE.
*/
if (bound != NULL)
if (cmd->bound != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("\"%s\" is not a partitioned table",