1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-02 09:02:37 +03:00

Checks for ALTER TABLE ... SPLIT/MERGE PARTITIONS ... commands

Check that the target partition actually belongs to the parent table.

Reported-by: Alexander Lakhin
Discussion: https://postgr.es/m/cd842601-cf1a-9806-f7b7-d2509b93ba61%40gmail.com
Author: Dmitry Koval
This commit is contained in:
Alexander Korotkov
2024-04-10 01:47:00 +03:00
parent b1b13d2b52
commit c99ef1811a
6 changed files with 111 additions and 21 deletions

View File

@ -21223,12 +21223,6 @@ ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, Relation rel,
*/
splitRel = table_openrv(cmd->name, AccessExclusiveLock);
if (splitRel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot split non-table partition \"%s\"",
RelationGetRelationName(splitRel))));
splitRelOid = RelationGetRelid(splitRel);
/* Check descriptions of new partitions. */
@ -21463,12 +21457,6 @@ ATExecMergePartitions(List **wqueue, AlteredTableInfo *tab, Relation rel,
*/
mergingPartition = table_openrv(name, AccessExclusiveLock);
if (mergingPartition->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot merge non-table partition \"%s\"",
RelationGetRelationName(mergingPartition))));
/*
* Checking that two partitions have the same name was before, in
* function transformPartitionCmdForMerge().

View File

@ -32,6 +32,7 @@
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_am.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_constraint.h"
@ -3415,6 +3416,39 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString,
}
/*
* checkPartition
* Check that partRelOid is an oid of partition of the parent table rel
*/
static void
checkPartition(Relation rel, Oid partRelOid)
{
Relation partRel;
partRel = relation_open(partRelOid, AccessShareLock);
if (partRel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table",
RelationGetRelationName(partRel))));
if (!partRel->rd_rel->relispartition)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a partition",
RelationGetRelationName(partRel))));
if (get_partition_parent(partRelOid, false) != RelationGetRelid(rel))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" is not a partition of relation \"%s\"",
RelationGetRelationName(partRel),
RelationGetRelationName(rel))));
relation_close(partRel, AccessShareLock);
}
/*
* transformPartitionCmdForSplit
* Analyze the ALTER TABLLE ... SPLIT PARTITION command
@ -3447,6 +3481,8 @@ transformPartitionCmdForSplit(CreateStmtContext *cxt, PartitionCmd *partcmd)
splitPartOid = RangeVarGetRelid(partcmd->name, NoLock, false);
checkPartition(parent, splitPartOid);
/* Then we should check partitions with transformed bounds. */
check_partitions_for_split(parent, splitPartOid, partcmd->name, partcmd->partlist, cxt->pstate);
}
@ -3509,6 +3545,9 @@ transformPartitionCmdForMerge(CreateStmtContext *cxt, PartitionCmd *partcmd)
partOid = RangeVarGetRelid(name, NoLock, false);
if (partOid == defaultPartOid)
isDefaultPart = true;
checkPartition(parent, partOid);
partOids = lappend_oid(partOids, partOid);
}