mirror of
https://github.com/postgres/postgres.git
synced 2025-06-11 20:28:21 +03:00
Directly modify foreign tables.
postgres_fdw can now sent an UPDATE or DELETE statement directly to the foreign server in simple cases, rather than sending a SELECT FOR UPDATE statement and then updating or deleting rows one-by-one. Etsuro Fujita, reviewed by Rushabh Lathia, Shigeru Hanada, Kyotaro Horiguchi, Albe Laurenz, Thom Brown, and me.
This commit is contained in:
@ -61,6 +61,8 @@ enum FdwScanPrivateIndex
|
||||
{
|
||||
/* SQL statement to execute remotely (as a String node) */
|
||||
FdwScanPrivateSelectSql,
|
||||
/* List of restriction clauses that can be executed remotely */
|
||||
FdwScanPrivateRemoteConds,
|
||||
/* Integer list of attribute numbers retrieved by the SELECT */
|
||||
FdwScanPrivateRetrievedAttrs,
|
||||
/* Integer representing the desired fetch_size */
|
||||
@ -97,6 +99,27 @@ enum FdwModifyPrivateIndex
|
||||
FdwModifyPrivateRetrievedAttrs
|
||||
};
|
||||
|
||||
/*
|
||||
* Similarly, this enum describes what's kept in the fdw_private list for
|
||||
* a ForeignScan node that modifies a foreign table directly. We store:
|
||||
*
|
||||
* 1) UPDATE/DELETE statement text to be sent to the remote server
|
||||
* 2) Boolean flag showing if the remote query has a RETURNING clause
|
||||
* 3) Integer list of attribute numbers retrieved by RETURNING, if any
|
||||
* 4) Boolean flag showing if we set the command es_processed
|
||||
*/
|
||||
enum FdwDirectModifyPrivateIndex
|
||||
{
|
||||
/* SQL statement to execute remotely (as a String node) */
|
||||
FdwDirectModifyPrivateUpdateSql,
|
||||
/* has-returning flag (as an integer Value node) */
|
||||
FdwDirectModifyPrivateHasReturning,
|
||||
/* Integer list of attribute numbers retrieved by RETURNING */
|
||||
FdwDirectModifyPrivateRetrievedAttrs,
|
||||
/* set-processed flag (as an integer Value node) */
|
||||
FdwDirectModifyPrivateSetProcessed
|
||||
};
|
||||
|
||||
/*
|
||||
* Execution state of a foreign scan using postgres_fdw.
|
||||
*/
|
||||
@ -163,6 +186,36 @@ typedef struct PgFdwModifyState
|
||||
MemoryContext temp_cxt; /* context for per-tuple temporary data */
|
||||
} PgFdwModifyState;
|
||||
|
||||
/*
|
||||
* Execution state of a foreign scan that modifies a foreign table directly.
|
||||
*/
|
||||
typedef struct PgFdwDirectModifyState
|
||||
{
|
||||
Relation rel; /* relcache entry for the foreign table */
|
||||
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
|
||||
|
||||
/* extracted fdw_private data */
|
||||
char *query; /* text of UPDATE/DELETE command */
|
||||
bool has_returning; /* is there a RETURNING clause? */
|
||||
List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
|
||||
bool set_processed; /* do we set the command es_processed? */
|
||||
|
||||
/* for remote query execution */
|
||||
PGconn *conn; /* connection for the update */
|
||||
int numParams; /* number of parameters passed to query */
|
||||
FmgrInfo *param_flinfo; /* output conversion functions for them */
|
||||
List *param_exprs; /* executable expressions for param values */
|
||||
const char **param_values; /* textual values of query parameters */
|
||||
|
||||
/* for storing result tuples */
|
||||
PGresult *result; /* result for query */
|
||||
int num_tuples; /* # of result tuples */
|
||||
int next_tuple; /* index of next one to return */
|
||||
|
||||
/* working memory context */
|
||||
MemoryContext temp_cxt; /* context for per-tuple temporary data */
|
||||
} PgFdwDirectModifyState;
|
||||
|
||||
/*
|
||||
* Workspace for analyzing a foreign table.
|
||||
*/
|
||||
@ -263,6 +316,13 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate,
|
||||
static void postgresEndForeignModify(EState *estate,
|
||||
ResultRelInfo *resultRelInfo);
|
||||
static int postgresIsForeignRelUpdatable(Relation rel);
|
||||
static bool postgresPlanDirectModify(PlannerInfo *root,
|
||||
ModifyTable *plan,
|
||||
Index resultRelation,
|
||||
int subplan_index);
|
||||
static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
|
||||
static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
|
||||
static void postgresEndDirectModify(ForeignScanState *node);
|
||||
static void postgresExplainForeignScan(ForeignScanState *node,
|
||||
ExplainState *es);
|
||||
static void postgresExplainForeignModify(ModifyTableState *mtstate,
|
||||
@ -270,6 +330,8 @@ static void postgresExplainForeignModify(ModifyTableState *mtstate,
|
||||
List *fdw_private,
|
||||
int subplan_index,
|
||||
ExplainState *es);
|
||||
static void postgresExplainDirectModify(ForeignScanState *node,
|
||||
ExplainState *es);
|
||||
static bool postgresAnalyzeForeignTable(Relation relation,
|
||||
AcquireSampleRowsFunc *func,
|
||||
BlockNumber *totalpages);
|
||||
@ -311,6 +373,18 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
|
||||
TupleTableSlot *slot);
|
||||
static void store_returning_result(PgFdwModifyState *fmstate,
|
||||
TupleTableSlot *slot, PGresult *res);
|
||||
static void execute_dml_stmt(ForeignScanState *node);
|
||||
static TupleTableSlot *get_returning_data(ForeignScanState *node);
|
||||
static void prepare_query_params(PlanState *node,
|
||||
List *fdw_exprs,
|
||||
int numParams,
|
||||
FmgrInfo **param_flinfo,
|
||||
List **param_exprs,
|
||||
const char ***param_values);
|
||||
static void process_query_params(ExprContext *econtext,
|
||||
FmgrInfo *param_flinfo,
|
||||
List *param_exprs,
|
||||
const char **param_values);
|
||||
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
||||
HeapTuple *rows, int targrows,
|
||||
double *totalrows,
|
||||
@ -362,12 +436,17 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
|
||||
routine->ExecForeignDelete = postgresExecForeignDelete;
|
||||
routine->EndForeignModify = postgresEndForeignModify;
|
||||
routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
|
||||
routine->PlanDirectModify = postgresPlanDirectModify;
|
||||
routine->BeginDirectModify = postgresBeginDirectModify;
|
||||
routine->IterateDirectModify = postgresIterateDirectModify;
|
||||
routine->EndDirectModify = postgresEndDirectModify;
|
||||
|
||||
/* Function for EvalPlanQual rechecks */
|
||||
routine->RecheckForeignScan = postgresRecheckForeignScan;
|
||||
/* Support functions for EXPLAIN */
|
||||
routine->ExplainForeignScan = postgresExplainForeignScan;
|
||||
routine->ExplainForeignModify = postgresExplainForeignModify;
|
||||
routine->ExplainDirectModify = postgresExplainDirectModify;
|
||||
|
||||
/* Support functions for ANALYZE */
|
||||
routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
|
||||
@ -1122,7 +1201,8 @@ postgresGetForeignPlan(PlannerInfo *root,
|
||||
* Build the fdw_private list that will be available to the executor.
|
||||
* Items in the list must match order in enum FdwScanPrivateIndex.
|
||||
*/
|
||||
fdw_private = list_make4(makeString(sql.data),
|
||||
fdw_private = list_make5(makeString(sql.data),
|
||||
remote_conds,
|
||||
retrieved_attrs,
|
||||
makeInteger(fpinfo->fetch_size),
|
||||
makeInteger(foreignrel->umid));
|
||||
@ -1159,8 +1239,6 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
|
||||
PgFdwScanState *fsstate;
|
||||
UserMapping *user;
|
||||
int numParams;
|
||||
int i;
|
||||
ListCell *lc;
|
||||
|
||||
/*
|
||||
* Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
|
||||
@ -1247,42 +1325,18 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
|
||||
|
||||
fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
|
||||
|
||||
/* Prepare for output conversion of parameters used in remote query. */
|
||||
/*
|
||||
* Prepare for processing of parameters used in remote query, if any.
|
||||
*/
|
||||
numParams = list_length(fsplan->fdw_exprs);
|
||||
fsstate->numParams = numParams;
|
||||
fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
|
||||
|
||||
i = 0;
|
||||
foreach(lc, fsplan->fdw_exprs)
|
||||
{
|
||||
Node *param_expr = (Node *) lfirst(lc);
|
||||
Oid typefnoid;
|
||||
bool isvarlena;
|
||||
|
||||
getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
|
||||
fmgr_info(typefnoid, &fsstate->param_flinfo[i]);
|
||||
i++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare remote-parameter expressions for evaluation. (Note: in
|
||||
* practice, we expect that all these expressions will be just Params, so
|
||||
* we could possibly do something more efficient than using the full
|
||||
* expression-eval machinery for this. But probably there would be little
|
||||
* benefit, and it'd require postgres_fdw to know more than is desirable
|
||||
* about Param evaluation.)
|
||||
*/
|
||||
fsstate->param_exprs = (List *)
|
||||
ExecInitExpr((Expr *) fsplan->fdw_exprs,
|
||||
(PlanState *) node);
|
||||
|
||||
/*
|
||||
* Allocate buffer for text form of query parameters, if any.
|
||||
*/
|
||||
if (numParams > 0)
|
||||
fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
|
||||
else
|
||||
fsstate->param_values = NULL;
|
||||
prepare_query_params((PlanState *) node,
|
||||
fsplan->fdw_exprs,
|
||||
numParams,
|
||||
&fsstate->param_flinfo,
|
||||
&fsstate->param_exprs,
|
||||
&fsstate->param_values);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1447,13 +1501,6 @@ postgresAddForeignUpdateTargets(Query *parsetree,
|
||||
/*
|
||||
* postgresPlanForeignModify
|
||||
* Plan an insert/update/delete operation on a foreign table
|
||||
*
|
||||
* Note: currently, the plan tree generated for UPDATE/DELETE will always
|
||||
* include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE)
|
||||
* and then the ModifyTable node will have to execute individual remote
|
||||
* UPDATE/DELETE commands. If there are no local conditions or joins
|
||||
* needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING
|
||||
* and then do nothing at ModifyTable. Room for future optimization ...
|
||||
*/
|
||||
static List *
|
||||
postgresPlanForeignModify(PlannerInfo *root,
|
||||
@ -1991,6 +2038,314 @@ postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresPlanDirectModify
|
||||
* Consider a direct foreign table modification
|
||||
*
|
||||
* Decide whether it is safe to modify a foreign table directly, and if so,
|
||||
* rewrite subplan accordingly.
|
||||
*/
|
||||
static bool
|
||||
postgresPlanDirectModify(PlannerInfo *root,
|
||||
ModifyTable *plan,
|
||||
Index resultRelation,
|
||||
int subplan_index)
|
||||
{
|
||||
CmdType operation = plan->operation;
|
||||
Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index);
|
||||
RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
|
||||
Relation rel;
|
||||
StringInfoData sql;
|
||||
ForeignScan *fscan;
|
||||
List *targetAttrs = NIL;
|
||||
List *remote_conds;
|
||||
List *params_list = NIL;
|
||||
List *returningList = NIL;
|
||||
List *retrieved_attrs = NIL;
|
||||
|
||||
/*
|
||||
* Decide whether it is safe to modify a foreign table directly.
|
||||
*/
|
||||
|
||||
/*
|
||||
* The table modification must be an UPDATE or DELETE.
|
||||
*/
|
||||
if (operation != CMD_UPDATE && operation != CMD_DELETE)
|
||||
return false;
|
||||
|
||||
/*
|
||||
* It's unsafe to modify a foreign table directly if there are any local
|
||||
* joins needed.
|
||||
*/
|
||||
if (!IsA(subplan, ForeignScan))
|
||||
return false;
|
||||
|
||||
/*
|
||||
* It's unsafe to modify a foreign table directly if there are any quals
|
||||
* that should be evaluated locally.
|
||||
*/
|
||||
if (subplan->qual != NIL)
|
||||
return false;
|
||||
|
||||
/*
|
||||
* We can't handle an UPDATE or DELETE on a foreign join for now.
|
||||
*/
|
||||
fscan = (ForeignScan *) subplan;
|
||||
if (fscan->scan.scanrelid == 0)
|
||||
return false;
|
||||
|
||||
/*
|
||||
* It's unsafe to update a foreign table directly, if any expressions to
|
||||
* assign to the target columns are unsafe to evaluate remotely.
|
||||
*/
|
||||
if (operation == CMD_UPDATE)
|
||||
{
|
||||
RelOptInfo *baserel = root->simple_rel_array[resultRelation];
|
||||
int col;
|
||||
|
||||
/*
|
||||
* We transmit only columns that were explicitly targets of the UPDATE,
|
||||
* so as to avoid unnecessary data transmission.
|
||||
*/
|
||||
col = -1;
|
||||
while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
|
||||
{
|
||||
/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
|
||||
AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
|
||||
TargetEntry *tle;
|
||||
|
||||
if (attno <= InvalidAttrNumber) /* shouldn't happen */
|
||||
elog(ERROR, "system-column update is not supported");
|
||||
|
||||
tle = get_tle_by_resno(subplan->targetlist, attno);
|
||||
|
||||
if (!is_foreign_expr(root, baserel, (Expr *) tle->expr))
|
||||
return false;
|
||||
|
||||
targetAttrs = lappend_int(targetAttrs, attno);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Ok, rewrite subplan so as to modify the foreign table directly.
|
||||
*/
|
||||
initStringInfo(&sql);
|
||||
|
||||
/*
|
||||
* Core code already has some lock on each rel being planned, so we can
|
||||
* use NoLock here.
|
||||
*/
|
||||
rel = heap_open(rte->relid, NoLock);
|
||||
|
||||
/*
|
||||
* Extract the baserestrictinfo clauses that can be evaluated remotely.
|
||||
*/
|
||||
remote_conds = (List *) list_nth(fscan->fdw_private,
|
||||
FdwScanPrivateRemoteConds);
|
||||
|
||||
/*
|
||||
* Extract the relevant RETURNING list if any.
|
||||
*/
|
||||
if (plan->returningLists)
|
||||
returningList = (List *) list_nth(plan->returningLists, subplan_index);
|
||||
|
||||
/*
|
||||
* Construct the SQL command string.
|
||||
*/
|
||||
switch (operation)
|
||||
{
|
||||
case CMD_UPDATE:
|
||||
deparseDirectUpdateSql(&sql, root, resultRelation, rel,
|
||||
((Plan *) fscan)->targetlist,
|
||||
targetAttrs,
|
||||
remote_conds, ¶ms_list,
|
||||
returningList, &retrieved_attrs);
|
||||
break;
|
||||
case CMD_DELETE:
|
||||
deparseDirectDeleteSql(&sql, root, resultRelation, rel,
|
||||
remote_conds, ¶ms_list,
|
||||
returningList, &retrieved_attrs);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected operation: %d", (int) operation);
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the operation info.
|
||||
*/
|
||||
fscan->operation = operation;
|
||||
|
||||
/*
|
||||
* Update the fdw_exprs list that will be available to the executor.
|
||||
*/
|
||||
fscan->fdw_exprs = params_list;
|
||||
|
||||
/*
|
||||
* Update the fdw_private list that will be available to the executor.
|
||||
* Items in the list must match enum FdwDirectModifyPrivateIndex, above.
|
||||
*/
|
||||
fscan->fdw_private = list_make4(makeString(sql.data),
|
||||
makeInteger((retrieved_attrs != NIL)),
|
||||
retrieved_attrs,
|
||||
makeInteger(plan->canSetTag));
|
||||
|
||||
heap_close(rel, NoLock);
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresBeginDirectModify
|
||||
* Prepare a direct foreign table modification
|
||||
*/
|
||||
static void
|
||||
postgresBeginDirectModify(ForeignScanState *node, int eflags)
|
||||
{
|
||||
ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
|
||||
EState *estate = node->ss.ps.state;
|
||||
PgFdwDirectModifyState *dmstate;
|
||||
RangeTblEntry *rte;
|
||||
Oid userid;
|
||||
ForeignTable *table;
|
||||
UserMapping *user;
|
||||
int numParams;
|
||||
|
||||
/*
|
||||
* Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
|
||||
*/
|
||||
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
|
||||
return;
|
||||
|
||||
/*
|
||||
* We'll save private state in node->fdw_state.
|
||||
*/
|
||||
dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
|
||||
node->fdw_state = (void *) dmstate;
|
||||
|
||||
/*
|
||||
* Identify which user to do the remote access as. This should match what
|
||||
* ExecCheckRTEPerms() does.
|
||||
*/
|
||||
rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
|
||||
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
|
||||
|
||||
/* Get info about foreign table. */
|
||||
dmstate->rel = node->ss.ss_currentRelation;
|
||||
table = GetForeignTable(RelationGetRelid(dmstate->rel));
|
||||
user = GetUserMapping(userid, table->serverid);
|
||||
|
||||
/*
|
||||
* Get connection to the foreign server. Connection manager will
|
||||
* establish new connection if necessary.
|
||||
*/
|
||||
dmstate->conn = GetConnection(user, false);
|
||||
|
||||
/* Initialize state variable */
|
||||
dmstate->num_tuples = -1; /* -1 means not set yet */
|
||||
|
||||
/* Get private info created by planner functions. */
|
||||
dmstate->query = strVal(list_nth(fsplan->fdw_private,
|
||||
FdwDirectModifyPrivateUpdateSql));
|
||||
dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
|
||||
FdwDirectModifyPrivateHasReturning));
|
||||
dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
|
||||
FdwDirectModifyPrivateRetrievedAttrs);
|
||||
dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
|
||||
FdwDirectModifyPrivateSetProcessed));
|
||||
|
||||
/* Create context for per-tuple temp workspace. */
|
||||
dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
|
||||
"postgres_fdw temporary data",
|
||||
ALLOCSET_SMALL_MINSIZE,
|
||||
ALLOCSET_SMALL_INITSIZE,
|
||||
ALLOCSET_SMALL_MAXSIZE);
|
||||
|
||||
/* Prepare for input conversion of RETURNING results. */
|
||||
if (dmstate->has_returning)
|
||||
dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
|
||||
|
||||
/*
|
||||
* Prepare for processing of parameters used in remote query, if any.
|
||||
*/
|
||||
numParams = list_length(fsplan->fdw_exprs);
|
||||
dmstate->numParams = numParams;
|
||||
if (numParams > 0)
|
||||
prepare_query_params((PlanState *) node,
|
||||
fsplan->fdw_exprs,
|
||||
numParams,
|
||||
&dmstate->param_flinfo,
|
||||
&dmstate->param_exprs,
|
||||
&dmstate->param_values);
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresIterateDirectModify
|
||||
* Execute a direct foreign table modification
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
postgresIterateDirectModify(ForeignScanState *node)
|
||||
{
|
||||
PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
|
||||
EState *estate = node->ss.ps.state;
|
||||
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
|
||||
|
||||
/*
|
||||
* If this is the first call after Begin, execute the statement.
|
||||
*/
|
||||
if (dmstate->num_tuples == -1)
|
||||
execute_dml_stmt(node);
|
||||
|
||||
/*
|
||||
* If the local query doesn't specify RETURNING, just clear tuple slot.
|
||||
*/
|
||||
if (!resultRelInfo->ri_projectReturning)
|
||||
{
|
||||
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
|
||||
Instrumentation *instr = node->ss.ps.instrument;
|
||||
|
||||
Assert(!dmstate->has_returning);
|
||||
|
||||
/* Increment the command es_processed count if necessary. */
|
||||
if (dmstate->set_processed)
|
||||
estate->es_processed += dmstate->num_tuples;
|
||||
|
||||
/* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
|
||||
if (instr)
|
||||
instr->tuplecount += dmstate->num_tuples;
|
||||
|
||||
return ExecClearTuple(slot);
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the next RETURNING tuple.
|
||||
*/
|
||||
return get_returning_data(node);
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresEndDirectModify
|
||||
* Finish a direct foreign table modification
|
||||
*/
|
||||
static void
|
||||
postgresEndDirectModify(ForeignScanState *node)
|
||||
{
|
||||
PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
|
||||
|
||||
/* if dmstate is NULL, we are in EXPLAIN; nothing to do */
|
||||
if (dmstate == NULL)
|
||||
return;
|
||||
|
||||
/* Release PGresult */
|
||||
if (dmstate->result)
|
||||
PQclear(dmstate->result);
|
||||
|
||||
/* Release remote connection */
|
||||
ReleaseConnection(dmstate->conn);
|
||||
dmstate->conn = NULL;
|
||||
|
||||
/* MemoryContext will be deleted automatically. */
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresExplainForeignScan
|
||||
* Produce extra output for EXPLAIN of a ForeignScan on a foreign table
|
||||
@ -2044,6 +2399,25 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresExplainDirectModify
|
||||
* Produce extra output for EXPLAIN of a ForeignScan that modifies a
|
||||
* foreign table directly
|
||||
*/
|
||||
static void
|
||||
postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
|
||||
{
|
||||
List *fdw_private;
|
||||
char *sql;
|
||||
|
||||
if (es->verbose)
|
||||
{
|
||||
fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
|
||||
sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
|
||||
ExplainPropertyText("Remote SQL", sql, es);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* estimate_path_cost_size
|
||||
@ -2419,38 +2793,14 @@ create_cursor(ForeignScanState *node)
|
||||
*/
|
||||
if (numParams > 0)
|
||||
{
|
||||
int nestlevel;
|
||||
MemoryContext oldcontext;
|
||||
int i;
|
||||
ListCell *lc;
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
|
||||
|
||||
nestlevel = set_transmission_modes();
|
||||
|
||||
i = 0;
|
||||
foreach(lc, fsstate->param_exprs)
|
||||
{
|
||||
ExprState *expr_state = (ExprState *) lfirst(lc);
|
||||
Datum expr_value;
|
||||
bool isNull;
|
||||
|
||||
/* Evaluate the parameter expression */
|
||||
expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
|
||||
|
||||
/*
|
||||
* Get string representation of each parameter value by invoking
|
||||
* type-specific output function, unless the value is null.
|
||||
*/
|
||||
if (isNull)
|
||||
values[i] = NULL;
|
||||
else
|
||||
values[i] = OutputFunctionCall(&fsstate->param_flinfo[i],
|
||||
expr_value);
|
||||
i++;
|
||||
}
|
||||
|
||||
reset_transmission_modes(nestlevel);
|
||||
process_query_params(econtext,
|
||||
fsstate->param_flinfo,
|
||||
fsstate->param_exprs,
|
||||
values);
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
@ -2770,6 +3120,197 @@ store_returning_result(PgFdwModifyState *fmstate,
|
||||
PG_END_TRY();
|
||||
}
|
||||
|
||||
/*
|
||||
* Execute a direct UPDATE/DELETE statement.
|
||||
*/
|
||||
static void
|
||||
execute_dml_stmt(ForeignScanState *node)
|
||||
{
|
||||
PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
|
||||
ExprContext *econtext = node->ss.ps.ps_ExprContext;
|
||||
int numParams = dmstate->numParams;
|
||||
const char **values = dmstate->param_values;
|
||||
|
||||
/*
|
||||
* Construct array of query parameter values in text format.
|
||||
*/
|
||||
if (numParams > 0)
|
||||
process_query_params(econtext,
|
||||
dmstate->param_flinfo,
|
||||
dmstate->param_exprs,
|
||||
values);
|
||||
|
||||
/*
|
||||
* Notice that we pass NULL for paramTypes, thus forcing the remote server
|
||||
* to infer types for all parameters. Since we explicitly cast every
|
||||
* parameter (see deparse.c), the "inference" is trivial and will produce
|
||||
* the desired result. This allows us to avoid assuming that the remote
|
||||
* server has the same OIDs we do for the parameters' types.
|
||||
*
|
||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
||||
* without releasing the PGresult.
|
||||
*/
|
||||
dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
|
||||
numParams, NULL, values, NULL, NULL, 0);
|
||||
if (PQresultStatus(dmstate->result) !=
|
||||
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
|
||||
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
|
||||
dmstate->query);
|
||||
|
||||
/* Get the number of rows affected. */
|
||||
if (dmstate->has_returning)
|
||||
dmstate->num_tuples = PQntuples(dmstate->result);
|
||||
else
|
||||
dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the result of a RETURNING clause.
|
||||
*/
|
||||
static TupleTableSlot *
|
||||
get_returning_data(ForeignScanState *node)
|
||||
{
|
||||
PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
|
||||
EState *estate = node->ss.ps.state;
|
||||
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
|
||||
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
|
||||
|
||||
Assert(resultRelInfo->ri_projectReturning);
|
||||
|
||||
/* If we didn't get any tuples, must be end of data. */
|
||||
if (dmstate->next_tuple >= dmstate->num_tuples)
|
||||
return ExecClearTuple(slot);
|
||||
|
||||
/* Increment the command es_processed count if necessary. */
|
||||
if (dmstate->set_processed)
|
||||
estate->es_processed += 1;
|
||||
|
||||
/*
|
||||
* Store a RETURNING tuple. If has_returning is false, just emit a dummy
|
||||
* tuple. (has_returning is false when the local query is of the form
|
||||
* "UPDATE/DELETE .. RETURNING 1" for example.)
|
||||
*/
|
||||
if (!dmstate->has_returning)
|
||||
ExecStoreAllNullTuple(slot);
|
||||
else
|
||||
{
|
||||
/*
|
||||
* On error, be sure to release the PGresult on the way out. Callers
|
||||
* do not have PG_TRY blocks to ensure this happens.
|
||||
*/
|
||||
PG_TRY();
|
||||
{
|
||||
HeapTuple newtup;
|
||||
|
||||
newtup = make_tuple_from_result_row(dmstate->result,
|
||||
dmstate->next_tuple,
|
||||
dmstate->rel,
|
||||
dmstate->attinmeta,
|
||||
dmstate->retrieved_attrs,
|
||||
NULL,
|
||||
dmstate->temp_cxt);
|
||||
ExecStoreTuple(newtup, slot, InvalidBuffer, false);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
if (dmstate->result)
|
||||
PQclear(dmstate->result);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
}
|
||||
dmstate->next_tuple++;
|
||||
|
||||
/* Make slot available for evaluation of the local query RETURNING list. */
|
||||
resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
|
||||
|
||||
return slot;
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare for processing of parameters used in remote query.
|
||||
*/
|
||||
static void
|
||||
prepare_query_params(PlanState *node,
|
||||
List *fdw_exprs,
|
||||
int numParams,
|
||||
FmgrInfo **param_flinfo,
|
||||
List **param_exprs,
|
||||
const char ***param_values)
|
||||
{
|
||||
int i;
|
||||
ListCell *lc;
|
||||
|
||||
Assert(numParams > 0);
|
||||
|
||||
/* Prepare for output conversion of parameters used in remote query. */
|
||||
*param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
|
||||
|
||||
i = 0;
|
||||
foreach(lc, fdw_exprs)
|
||||
{
|
||||
Node *param_expr = (Node *) lfirst(lc);
|
||||
Oid typefnoid;
|
||||
bool isvarlena;
|
||||
|
||||
getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
|
||||
fmgr_info(typefnoid, &(*param_flinfo)[i]);
|
||||
i++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare remote-parameter expressions for evaluation. (Note: in
|
||||
* practice, we expect that all these expressions will be just Params, so
|
||||
* we could possibly do something more efficient than using the full
|
||||
* expression-eval machinery for this. But probably there would be little
|
||||
* benefit, and it'd require postgres_fdw to know more than is desirable
|
||||
* about Param evaluation.)
|
||||
*/
|
||||
*param_exprs = (List *) ExecInitExpr((Expr *) fdw_exprs, node);
|
||||
|
||||
/* Allocate buffer for text form of query parameters. */
|
||||
*param_values = (const char **) palloc0(numParams * sizeof(char *));
|
||||
}
|
||||
|
||||
/*
|
||||
* Construct array of query parameter values in text format.
|
||||
*/
|
||||
static void
|
||||
process_query_params(ExprContext *econtext,
|
||||
FmgrInfo *param_flinfo,
|
||||
List *param_exprs,
|
||||
const char **param_values)
|
||||
{
|
||||
int nestlevel;
|
||||
int i;
|
||||
ListCell *lc;
|
||||
|
||||
nestlevel = set_transmission_modes();
|
||||
|
||||
i = 0;
|
||||
foreach(lc, param_exprs)
|
||||
{
|
||||
ExprState *expr_state = (ExprState *) lfirst(lc);
|
||||
Datum expr_value;
|
||||
bool isNull;
|
||||
|
||||
/* Evaluate the parameter expression */
|
||||
expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
|
||||
|
||||
/*
|
||||
* Get string representation of each parameter value by invoking
|
||||
* type-specific output function, unless the value is null.
|
||||
*/
|
||||
if (isNull)
|
||||
param_values[i] = NULL;
|
||||
else
|
||||
param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value);
|
||||
i++;
|
||||
}
|
||||
|
||||
reset_transmission_modes(nestlevel);
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresAnalyzeForeignTable
|
||||
* Test whether analyzing this foreign table is supported
|
||||
|
Reference in New Issue
Block a user