1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-13 16:22:44 +03:00

Allow foreign tables to participate in inheritance.

Foreign tables can now be inheritance children, or parents.  Much of the
system was already ready for this, but we had to fix a few things of
course, mostly in the area of planner and executor handling of row locks.

As side effects of this, allow foreign tables to have NOT VALID CHECK
constraints (and hence to accept ALTER ... VALIDATE CONSTRAINT), and to
accept ALTER SET STORAGE and ALTER SET WITH/WITHOUT OIDS.  Continuing to
disallow these things would've required bizarre and inconsistent special
cases in inheritance behavior.  Since foreign tables don't enforce CHECK
constraints anyway, a NOT VALID one is a complete no-op, but that doesn't
mean we shouldn't allow it.  And it's possible that some FDWs might have
use for SET STORAGE or SET WITH OIDS, though doubtless they will be no-ops
for most.

An additional change in support of this is that when a ModifyTable node
has multiple target tables, they will all now be explicitly identified
in EXPLAIN output, for example:

 Update on pt1  (cost=0.00..321.05 rows=3541 width=46)
   Update on pt1
   Foreign Update on ft1
   Foreign Update on ft2
   Update on child3
   ->  Seq Scan on pt1  (cost=0.00..0.00 rows=1 width=46)
   ->  Foreign Scan on ft1  (cost=100.00..148.03 rows=1170 width=46)
   ->  Foreign Scan on ft2  (cost=100.00..148.03 rows=1170 width=46)
   ->  Seq Scan on child3  (cost=0.00..25.00 rows=1200 width=46)

This was done mainly to provide an unambiguous place to attach "Remote SQL"
fields, but it is useful for inherited updates even when no foreign tables
are involved.

Shigeru Hanada and Etsuro Fujita, reviewed by Ashutosh Bapat and Kyotaro
Horiguchi, some additional hacking by me
This commit is contained in:
Tom Lane
2015-03-22 13:53:11 -04:00
parent 8ac356cde3
commit cb1ca4d800
29 changed files with 1764 additions and 188 deletions

View File

@@ -2253,13 +2253,6 @@ AddRelationNewConstraints(Relation rel,
expr = stringToNode(cdef->cooked_expr);
}
/* Don't allow NOT VALID for foreign tables */
if (cdef->skip_validation &&
rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CHECK constraints on foreign tables cannot be marked NOT VALID")));
/*
* Check name uniqueness, or generate a name if none was given.
*/

View File

@@ -297,9 +297,8 @@ analyze_rel(Oid relid, RangeVar *relation, int options, List *va_cols,
* do_analyze_rel() -- analyze one relation, recursively or not
*
* Note that "acquirefunc" is only relevant for the non-inherited case.
* If we supported foreign tables in inheritance trees,
* acquire_inherited_sample_rows would need to determine the appropriate
* acquirefunc for each child table.
* For the inherited case, acquire_inherited_sample_rows() determines the
* appropriate acquirefunc for each child table.
*/
static void
do_analyze_rel(Relation onerel, int options, List *va_cols,
@@ -1448,7 +1447,8 @@ compare_rows(const void *a, const void *b)
*
* This has the same API as acquire_sample_rows, except that rows are
* collected from all inheritance children as well as the specified table.
* We fail and return zero if there are no inheritance children.
* We fail and return zero if there are no inheritance children, or if all
* children are foreign tables that don't support ANALYZE.
*/
static int
acquire_inherited_sample_rows(Relation onerel, int elevel,
@@ -1457,6 +1457,7 @@ acquire_inherited_sample_rows(Relation onerel, int elevel,
{
List *tableOIDs;
Relation *rels;
AcquireSampleRowsFunc *acquirefuncs;
double *relblocks;
double totalblocks;
int numrows,
@@ -1491,10 +1492,12 @@ acquire_inherited_sample_rows(Relation onerel, int elevel,
}
/*
* Count the blocks in all the relations. The result could overflow
* BlockNumber, so we use double arithmetic.
* Identify acquirefuncs to use, and count blocks in all the relations.
* The result could overflow BlockNumber, so we use double arithmetic.
*/
rels = (Relation *) palloc(list_length(tableOIDs) * sizeof(Relation));
acquirefuncs = (AcquireSampleRowsFunc *)
palloc(list_length(tableOIDs) * sizeof(AcquireSampleRowsFunc));
relblocks = (double *) palloc(list_length(tableOIDs) * sizeof(double));
totalblocks = 0;
nrels = 0;
@@ -1502,6 +1505,8 @@ acquire_inherited_sample_rows(Relation onerel, int elevel,
{
Oid childOID = lfirst_oid(lc);
Relation childrel;
AcquireSampleRowsFunc acquirefunc = NULL;
BlockNumber relpages = 0;
/* We already got the needed lock */
childrel = heap_open(childOID, NoLock);
@@ -1515,12 +1520,66 @@ acquire_inherited_sample_rows(Relation onerel, int elevel,
continue;
}
/* Check table type (MATVIEW can't happen, but might as well allow) */
if (childrel->rd_rel->relkind == RELKIND_RELATION ||
childrel->rd_rel->relkind == RELKIND_MATVIEW)
{
/* Regular table, so use the regular row acquisition function */
acquirefunc = acquire_sample_rows;
relpages = RelationGetNumberOfBlocks(childrel);
}
else if (childrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
{
/*
* For a foreign table, call the FDW's hook function to see
* whether it supports analysis.
*/
FdwRoutine *fdwroutine;
bool ok = false;
fdwroutine = GetFdwRoutineForRelation(childrel, false);
if (fdwroutine->AnalyzeForeignTable != NULL)
ok = fdwroutine->AnalyzeForeignTable(childrel,
&acquirefunc,
&relpages);
if (!ok)
{
/* ignore, but release the lock on it */
Assert(childrel != onerel);
heap_close(childrel, AccessShareLock);
continue;
}
}
else
{
/* ignore, but release the lock on it */
Assert(childrel != onerel);
heap_close(childrel, AccessShareLock);
continue;
}
/* OK, we'll process this child */
rels[nrels] = childrel;
relblocks[nrels] = (double) RelationGetNumberOfBlocks(childrel);
totalblocks += relblocks[nrels];
acquirefuncs[nrels] = acquirefunc;
relblocks[nrels] = (double) relpages;
totalblocks += (double) relpages;
nrels++;
}
/*
* If we don't have at least two tables to consider, fail.
*/
if (nrels < 2)
{
ereport(elevel,
(errmsg("skipping analyze of \"%s.%s\" inheritance tree --- this inheritance tree contains no analyzable child tables",
get_namespace_name(RelationGetNamespace(onerel)),
RelationGetRelationName(onerel))));
return 0;
}
/*
* Now sample rows from each relation, proportionally to its fraction of
* the total block count. (This might be less than desirable if the child
@@ -1533,6 +1592,7 @@ acquire_inherited_sample_rows(Relation onerel, int elevel,
for (i = 0; i < nrels; i++)
{
Relation childrel = rels[i];
AcquireSampleRowsFunc acquirefunc = acquirefuncs[i];
double childblocks = relblocks[i];
if (childblocks > 0)
@@ -1549,12 +1609,9 @@ acquire_inherited_sample_rows(Relation onerel, int elevel,
tdrows;
/* Fetch a random sample of the child's rows */
childrows = acquire_sample_rows(childrel,
elevel,
rows + numrows,
childtargrows,
&trows,
&tdrows);
childrows = (*acquirefunc) (childrel, elevel,
rows + numrows, childtargrows,
&trows, &tdrows);
/* We may need to convert from child's rowtype to parent's */
if (childrows > 0 &&

View File

@@ -2187,6 +2187,10 @@ ExplainScanTarget(Scan *plan, ExplainState *es)
/*
* Show the target of a ModifyTable node
*
* Here we show the nominal target (ie, the relation that was named in the
* original query). If the actual target(s) is/are different, we'll show them
* in show_modifytable_info().
*/
static void
ExplainModifyTarget(ModifyTable *plan, ExplainState *es)
@@ -2303,30 +2307,106 @@ ExplainTargetRel(Plan *plan, Index rti, ExplainState *es)
/*
* Show extra information for a ModifyTable node
*
* We have two objectives here. First, if there's more than one target table
* or it's different from the nominal target, identify the actual target(s).
* Second, give FDWs a chance to display extra info about foreign targets.
*/
static void
show_modifytable_info(ModifyTableState *mtstate, ExplainState *es)
{
FdwRoutine *fdwroutine = mtstate->resultRelInfo->ri_FdwRoutine;
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
const char *operation;
const char *foperation;
bool labeltargets;
int j;
/*
* If the first target relation is a foreign table, call its FDW to
* display whatever additional fields it wants to. For now, we ignore the
* possibility of other targets being foreign tables, although the API for
* ExplainForeignModify is designed to allow them to be processed.
*/
if (fdwroutine != NULL &&
fdwroutine->ExplainForeignModify != NULL)
switch (node->operation)
{
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
List *fdw_private = (List *) linitial(node->fdwPrivLists);
fdwroutine->ExplainForeignModify(mtstate,
mtstate->resultRelInfo,
fdw_private,
0,
es);
case CMD_INSERT:
operation = "Insert";
foperation = "Foreign Insert";
break;
case CMD_UPDATE:
operation = "Update";
foperation = "Foreign Update";
break;
case CMD_DELETE:
operation = "Delete";
foperation = "Foreign Delete";
break;
default:
operation = "???";
foperation = "Foreign ???";
break;
}
/* Should we explicitly label target relations? */
labeltargets = (mtstate->mt_nplans > 1 ||
(mtstate->mt_nplans == 1 &&
mtstate->resultRelInfo->ri_RangeTableIndex != node->nominalRelation));
if (labeltargets)
ExplainOpenGroup("Target Tables", "Target Tables", false, es);
for (j = 0; j < mtstate->mt_nplans; j++)
{
ResultRelInfo *resultRelInfo = mtstate->resultRelInfo + j;
FdwRoutine *fdwroutine = resultRelInfo->ri_FdwRoutine;
if (labeltargets)
{
/* Open a group for this target */
ExplainOpenGroup("Target Table", NULL, true, es);
/*
* In text mode, decorate each target with operation type, so that
* ExplainTargetRel's output of " on foo" will read nicely.
*/
if (es->format == EXPLAIN_FORMAT_TEXT)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfoString(es->str,
fdwroutine ? foperation : operation);
}
/* Identify target */
ExplainTargetRel((Plan *) node,
resultRelInfo->ri_RangeTableIndex,
es);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
appendStringInfoChar(es->str, '\n');
es->indent++;
}
}
/* Give FDW a chance */
if (fdwroutine && fdwroutine->ExplainForeignModify != NULL)
{
List *fdw_private = (List *) list_nth(node->fdwPrivLists, j);
fdwroutine->ExplainForeignModify(mtstate,
resultRelInfo,
fdw_private,
j,
es);
}
if (labeltargets)
{
/* Undo the indentation we added in text format */
if (es->format == EXPLAIN_FORMAT_TEXT)
es->indent--;
/* Close the group */
ExplainCloseGroup("Target Table", NULL, true, es);
}
}
if (labeltargets)
ExplainCloseGroup("Target Tables", "Target Tables", false, es);
}
/*

View File

@@ -1504,10 +1504,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
*/
relation = heap_openrv(parent, ShareUpdateExclusiveLock);
if (relation->rd_rel->relkind != RELKIND_RELATION)
if (relation->rd_rel->relkind != RELKIND_RELATION &&
relation->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("inherited relation \"%s\" is not a table",
errmsg("inherited relation \"%s\" is not a table or foreign table",
parent->relname)));
/* Permanent rels cannot inherit from temporary ones */
if (relpersistence != RELPERSISTENCE_TEMP &&
@@ -3157,7 +3158,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
pass = AT_PASS_MISC;
break;
case AT_SetStorage: /* ALTER COLUMN SET STORAGE */
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW);
ATSimplePermissions(rel, ATT_TABLE | ATT_MATVIEW | ATT_FOREIGN_TABLE);
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */
pass = AT_PASS_MISC;
@@ -3245,14 +3246,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
pass = AT_PASS_MISC;
break;
case AT_AddOids: /* SET WITH OIDS */
ATSimplePermissions(rel, ATT_TABLE);
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
if (!rel->rd_rel->relhasoids || recursing)
ATPrepAddOids(wqueue, rel, recurse, cmd, lockmode);
/* Recursion occurs during execution phase */
pass = AT_PASS_ADD_COL;
break;
case AT_DropOids: /* SET WITHOUT OIDS */
ATSimplePermissions(rel, ATT_TABLE);
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/* Performs own recursion */
if (rel->rd_rel->relhasoids)
{
@@ -3280,17 +3281,23 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
pass = AT_PASS_MISC;
break;
case AT_AddInherit: /* INHERIT */
ATSimplePermissions(rel, ATT_TABLE);
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/* This command never recurses */
ATPrepAddInherit(rel);
pass = AT_PASS_MISC;
break;
case AT_DropInherit: /* NO INHERIT */
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/* This command never recurses */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
case AT_AlterConstraint: /* ALTER CONSTRAINT */
ATSimplePermissions(rel, ATT_TABLE);
pass = AT_PASS_MISC;
break;
case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
ATSimplePermissions(rel, ATT_TABLE);
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/* Recursion occurs during execution phase */
/* No command-specific prep needed except saving recurse flag */
if (recurse)
@@ -3318,7 +3325,6 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_EnableAlwaysRule:
case AT_EnableReplicaRule:
case AT_DisableRule:
case AT_DropInherit: /* NO INHERIT */
case AT_AddOf: /* OF */
case AT_DropOf: /* NOT OF */
case AT_EnableRowSecurity:
@@ -4637,7 +4643,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
ATSimplePermissions(rel, ATT_TABLE);
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
@@ -5533,7 +5539,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
ATSimplePermissions(rel, ATT_TABLE);
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/*
* get the number of the attribute
@@ -5926,7 +5932,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
ATSimplePermissions(rel, ATT_TABLE);
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/*
* Call AddRelationNewConstraints to do the work, making sure it works on
@@ -7084,6 +7090,10 @@ validateCheckConstraint(Relation rel, HeapTuple constrtup)
bool isnull;
Snapshot snapshot;
/* VALIDATE CONSTRAINT is a no-op for foreign tables */
if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
return;
constrForm = (Form_pg_constraint) GETSTRUCT(constrtup);
estate = CreateExecutorState();
@@ -7426,7 +7436,7 @@ ATExecDropConstraint(Relation rel, const char *constrName,
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
ATSimplePermissions(rel, ATT_TABLE);
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
@@ -9681,7 +9691,7 @@ ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode)
* Must be owner of both parent and child -- child was checked by
* ATSimplePermissions call in ATPrepCmd
*/
ATSimplePermissions(parent_rel, ATT_TABLE);
ATSimplePermissions(parent_rel, ATT_TABLE | ATT_FOREIGN_TABLE);
/* Permanent rels cannot inherit from temporary ones */
if (parent_rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP &&

View File

@@ -106,7 +106,7 @@ execCurrentOf(CurrentOfExpr *cexpr,
if (!RowMarkRequiresRowShareLock(thiserm->markType))
continue; /* ignore non-FOR UPDATE/SHARE items */
if (RelationGetRelid(thiserm->relation) == table_oid)
if (thiserm->relid == table_oid)
{
if (erm)
ereport(ERROR,

View File

@@ -814,21 +814,22 @@ InitPlan(QueryDesc *queryDesc, int eflags)
if (rc->isParent)
continue;
/* get relation's OID (will produce InvalidOid if subquery) */
relid = getrelid(rc->rti, rangeTable);
switch (rc->markType)
{
case ROW_MARK_EXCLUSIVE:
case ROW_MARK_NOKEYEXCLUSIVE:
case ROW_MARK_SHARE:
case ROW_MARK_KEYSHARE:
relid = getrelid(rc->rti, rangeTable);
relation = heap_open(relid, RowShareLock);
break;
case ROW_MARK_REFERENCE:
relid = getrelid(rc->rti, rangeTable);
relation = heap_open(relid, AccessShareLock);
break;
case ROW_MARK_COPY:
/* there's no real table here ... */
/* no physical table access is required */
relation = NULL;
break;
default:
@@ -843,6 +844,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
erm = (ExecRowMark *) palloc(sizeof(ExecRowMark));
erm->relation = relation;
erm->relid = relid;
erm->rti = rc->rti;
erm->prti = rc->prti;
erm->rowmarkId = rc->rowmarkId;
@@ -1911,21 +1913,9 @@ ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist)
aerm->rowmark = erm;
/* Look up the resjunk columns associated with this rowmark */
if (erm->relation)
if (erm->markType != ROW_MARK_COPY)
{
Assert(erm->markType != ROW_MARK_COPY);
/* if child rel, need tableoid */
if (erm->rti != erm->prti)
{
snprintf(resname, sizeof(resname), "tableoid%u", erm->rowmarkId);
aerm->toidAttNo = ExecFindJunkAttributeInTlist(targetlist,
resname);
if (!AttributeNumberIsValid(aerm->toidAttNo))
elog(ERROR, "could not find junk %s column", resname);
}
/* always need ctid for real relations */
/* need ctid for all methods other than COPY */
snprintf(resname, sizeof(resname), "ctid%u", erm->rowmarkId);
aerm->ctidAttNo = ExecFindJunkAttributeInTlist(targetlist,
resname);
@@ -1934,8 +1924,7 @@ ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist)
}
else
{
Assert(erm->markType == ROW_MARK_COPY);
/* need wholerow if COPY */
snprintf(resname, sizeof(resname), "wholerow%u", erm->rowmarkId);
aerm->wholeAttNo = ExecFindJunkAttributeInTlist(targetlist,
resname);
@@ -1943,6 +1932,16 @@ ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist)
elog(ERROR, "could not find junk %s column", resname);
}
/* if child rel, need tableoid */
if (erm->rti != erm->prti)
{
snprintf(resname, sizeof(resname), "tableoid%u", erm->rowmarkId);
aerm->toidAttNo = ExecFindJunkAttributeInTlist(targetlist,
resname);
if (!AttributeNumberIsValid(aerm->toidAttNo))
elog(ERROR, "could not find junk %s column", resname);
}
return aerm;
}
@@ -2375,31 +2374,32 @@ EvalPlanQualFetchRowMarks(EPQState *epqstate)
/* clear any leftover test tuple for this rel */
EvalPlanQualSetTuple(epqstate, erm->rti, NULL);
if (erm->relation)
/* if child rel, must check whether it produced this row */
if (erm->rti != erm->prti)
{
Oid tableoid;
datum = ExecGetJunkAttribute(epqstate->origslot,
aerm->toidAttNo,
&isNull);
/* non-locked rels could be on the inside of outer joins */
if (isNull)
continue;
tableoid = DatumGetObjectId(datum);
Assert(OidIsValid(erm->relid));
if (tableoid != erm->relid)
{
/* this child is inactive right now */
continue;
}
}
if (erm->markType == ROW_MARK_REFERENCE)
{
Buffer buffer;
Assert(erm->markType == ROW_MARK_REFERENCE);
/* if child rel, must check whether it produced this row */
if (erm->rti != erm->prti)
{
Oid tableoid;
datum = ExecGetJunkAttribute(epqstate->origslot,
aerm->toidAttNo,
&isNull);
/* non-locked rels could be on the inside of outer joins */
if (isNull)
continue;
tableoid = DatumGetObjectId(datum);
if (tableoid != RelationGetRelid(erm->relation))
{
/* this child is inactive right now */
continue;
}
}
Assert(erm->relation != NULL);
/* fetch the tuple's ctid */
datum = ExecGetJunkAttribute(epqstate->origslot,
@@ -2439,8 +2439,7 @@ EvalPlanQualFetchRowMarks(EPQState *epqstate)
tuple.t_len = HeapTupleHeaderGetDatumLength(td);
ItemPointerSetInvalid(&(tuple.t_self));
/* relation might be a foreign table, if so provide tableoid */
tuple.t_tableOid = getrelid(erm->rti,
epqstate->estate->es_range_table);
tuple.t_tableOid = erm->relid;
tuple.t_data = td;
/* copy and store tuple */

View File

@@ -93,7 +93,8 @@ lnext:
elog(ERROR, "tableoid is NULL");
tableoid = DatumGetObjectId(datum);
if (tableoid != RelationGetRelid(erm->relation))
Assert(OidIsValid(erm->relid));
if (tableoid != erm->relid)
{
/* this child is inactive right now */
ItemPointerSetInvalid(&(erm->curCtid));
@@ -174,8 +175,9 @@ lnext:
}
/* updated, so fetch and lock the updated version */
copyTuple = EvalPlanQualFetch(estate, erm->relation, lockmode,
erm->waitPolicy, &hufd.ctid, hufd.xmax);
copyTuple = EvalPlanQualFetch(estate, erm->relation,
lockmode, erm->waitPolicy,
&hufd.ctid, hufd.xmax);
if (copyTuple == NULL)
{

View File

@@ -2224,35 +2224,7 @@ preprocess_rowmarks(PlannerInfo *root)
newrc = makeNode(PlanRowMark);
newrc->rti = newrc->prti = rc->rti;
newrc->rowmarkId = ++(root->glob->lastRowMarkId);
if (rte->relkind == RELKIND_FOREIGN_TABLE)
{
/* For now, we force all foreign tables to use ROW_MARK_COPY */
newrc->markType = ROW_MARK_COPY;
}
else
{
/* regular table, apply the appropriate lock type */
switch (rc->strength)
{
case LCS_NONE:
/* we intentionally throw an error for LCS_NONE */
elog(ERROR, "unrecognized LockClauseStrength %d",
(int) rc->strength);
break;
case LCS_FORKEYSHARE:
newrc->markType = ROW_MARK_KEYSHARE;
break;
case LCS_FORSHARE:
newrc->markType = ROW_MARK_SHARE;
break;
case LCS_FORNOKEYUPDATE:
newrc->markType = ROW_MARK_NOKEYEXCLUSIVE;
break;
case LCS_FORUPDATE:
newrc->markType = ROW_MARK_EXCLUSIVE;
break;
}
}
newrc->markType = select_rowmark_type(rte, rc->strength);
newrc->allMarkTypes = (1 << newrc->markType);
newrc->strength = rc->strength;
newrc->waitPolicy = rc->waitPolicy;
@@ -2277,12 +2249,7 @@ preprocess_rowmarks(PlannerInfo *root)
newrc = makeNode(PlanRowMark);
newrc->rti = newrc->prti = i;
newrc->rowmarkId = ++(root->glob->lastRowMarkId);
/* real tables support REFERENCE, anything else needs COPY */
if (rte->rtekind == RTE_RELATION &&
rte->relkind != RELKIND_FOREIGN_TABLE)
newrc->markType = ROW_MARK_REFERENCE;
else
newrc->markType = ROW_MARK_COPY;
newrc->markType = select_rowmark_type(rte, LCS_NONE);
newrc->allMarkTypes = (1 << newrc->markType);
newrc->strength = LCS_NONE;
newrc->waitPolicy = LockWaitBlock; /* doesn't matter */
@@ -2294,6 +2261,49 @@ preprocess_rowmarks(PlannerInfo *root)
root->rowMarks = prowmarks;
}
/*
* Select RowMarkType to use for a given table
*/
RowMarkType
select_rowmark_type(RangeTblEntry *rte, LockClauseStrength strength)
{
if (rte->rtekind != RTE_RELATION)
{
/* If it's not a table at all, use ROW_MARK_COPY */
return ROW_MARK_COPY;
}
else if (rte->relkind == RELKIND_FOREIGN_TABLE)
{
/* For now, we force all foreign tables to use ROW_MARK_COPY */
return ROW_MARK_COPY;
}
else
{
/* Regular table, apply the appropriate lock type */
switch (strength)
{
case LCS_NONE:
/* don't need tuple lock, only ability to re-fetch the row */
return ROW_MARK_REFERENCE;
break;
case LCS_FORKEYSHARE:
return ROW_MARK_KEYSHARE;
break;
case LCS_FORSHARE:
return ROW_MARK_SHARE;
break;
case LCS_FORNOKEYUPDATE:
return ROW_MARK_NOKEYEXCLUSIVE;
break;
case LCS_FORUPDATE:
return ROW_MARK_EXCLUSIVE;
break;
}
elog(ERROR, "unrecognized LockClauseStrength %d", (int) strength);
return ROW_MARK_EXCLUSIVE; /* keep compiler quiet */
}
}
/*
* preprocess_limit - do pre-estimation for LIMIT and/or OFFSET clauses
*

View File

@@ -1337,12 +1337,13 @@ expand_inherited_rtentry(PlannerInfo *root, RangeTblEntry *rte, Index rti)
/*
* Build an RTE for the child, and attach to query's rangetable list.
* We copy most fields of the parent's RTE, but replace relation OID,
* and set inh = false. Also, set requiredPerms to zero since all
* required permissions checks are done on the original RTE.
* We copy most fields of the parent's RTE, but replace relation OID
* and relkind, and set inh = false. Also, set requiredPerms to zero
* since all required permissions checks are done on the original RTE.
*/
childrte = copyObject(rte);
childrte->relid = childOID;
childrte->relkind = newrelation->rd_rel->relkind;
childrte->inh = false;
childrte->requiredPerms = 0;
parse->rtable = lappend(parse->rtable, childrte);
@@ -1388,7 +1389,8 @@ expand_inherited_rtentry(PlannerInfo *root, RangeTblEntry *rte, Index rti)
newrc->rti = childRTindex;
newrc->prti = rti;
newrc->rowmarkId = oldrc->rowmarkId;
newrc->markType = oldrc->markType;
/* Reselect rowmark type, because relkind might not match parent */
newrc->markType = select_rowmark_type(childrte, oldrc->strength);
newrc->allMarkTypes = (1 << newrc->markType);
newrc->strength = oldrc->strength;
newrc->waitPolicy = oldrc->waitPolicy;

View File

@@ -2762,6 +2762,7 @@ CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')'
n->relation = $4;
n->tableElts = $6;
n->inhRelations = $8;
n->ofTypename = NULL;
n->constraints = NIL;
n->options = $9;
n->oncommit = $10;
@@ -2778,6 +2779,7 @@ CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')'
n->relation = $7;
n->tableElts = $9;
n->inhRelations = $11;
n->ofTypename = NULL;
n->constraints = NIL;
n->options = $12;
n->oncommit = $13;
@@ -2792,6 +2794,7 @@ CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')'
$4->relpersistence = $2;
n->relation = $4;
n->tableElts = $7;
n->inhRelations = NIL;
n->ofTypename = makeTypeNameFromNameList($6);
n->ofTypename->location = @6;
n->constraints = NIL;
@@ -2808,6 +2811,7 @@ CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')'
$7->relpersistence = $2;
n->relation = $7;
n->tableElts = $10;
n->inhRelations = NIL;
n->ofTypename = makeTypeNameFromNameList($9);
n->ofTypename->location = @9;
n->constraints = NIL;
@@ -4360,32 +4364,42 @@ AlterForeignServerStmt: ALTER SERVER name foreign_server_version alter_generic_o
CreateForeignTableStmt:
CREATE FOREIGN TABLE qualified_name
'(' OptTableElementList ')'
SERVER name create_generic_options
OptInherit SERVER name create_generic_options
{
CreateForeignTableStmt *n = makeNode(CreateForeignTableStmt);
$4->relpersistence = RELPERSISTENCE_PERMANENT;
n->base.relation = $4;
n->base.tableElts = $6;
n->base.inhRelations = NIL;
n->base.inhRelations = $8;
n->base.ofTypename = NULL;
n->base.constraints = NIL;
n->base.options = NIL;
n->base.oncommit = ONCOMMIT_NOOP;
n->base.tablespacename = NULL;
n->base.if_not_exists = false;
/* FDW-specific data */
n->servername = $9;
n->options = $10;
n->servername = $10;
n->options = $11;
$$ = (Node *) n;
}
| CREATE FOREIGN TABLE IF_P NOT EXISTS qualified_name
'(' OptTableElementList ')'
SERVER name create_generic_options
OptInherit SERVER name create_generic_options
{
CreateForeignTableStmt *n = makeNode(CreateForeignTableStmt);
$7->relpersistence = RELPERSISTENCE_PERMANENT;
n->base.relation = $7;
n->base.tableElts = $9;
n->base.inhRelations = NIL;
n->base.inhRelations = $11;
n->base.ofTypename = NULL;
n->base.constraints = NIL;
n->base.options = NIL;
n->base.oncommit = ONCOMMIT_NOOP;
n->base.tablespacename = NULL;
n->base.if_not_exists = true;
/* FDW-specific data */
n->servername = $12;
n->options = $13;
n->servername = $13;
n->options = $14;
$$ = (Node *) n;
}
;