mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
Refactor PgFdwModifyState creation/destruction into separate functions.
Etsuro Fujita. The larger patch series of which this is a part has been reviewed by Amit Langote, David Fetter, Maksim Milyutin, Álvaro Herrera, Stephen Frost, and me. Discussion: http://postgr.es/m/5A95487E.9050808@lab.ntt.co.jp
This commit is contained in:
@ -376,12 +376,21 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
|
|||||||
static void create_cursor(ForeignScanState *node);
|
static void create_cursor(ForeignScanState *node);
|
||||||
static void fetch_more_data(ForeignScanState *node);
|
static void fetch_more_data(ForeignScanState *node);
|
||||||
static void close_cursor(PGconn *conn, unsigned int cursor_number);
|
static void close_cursor(PGconn *conn, unsigned int cursor_number);
|
||||||
|
static PgFdwModifyState *create_foreign_modify(EState *estate,
|
||||||
|
ResultRelInfo *resultRelInfo,
|
||||||
|
CmdType operation,
|
||||||
|
Plan *subplan,
|
||||||
|
char *query,
|
||||||
|
List *target_attrs,
|
||||||
|
bool has_returning,
|
||||||
|
List *retrieved_attrs);
|
||||||
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
|
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
|
||||||
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
|
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
|
||||||
ItemPointer tupleid,
|
ItemPointer tupleid,
|
||||||
TupleTableSlot *slot);
|
TupleTableSlot *slot);
|
||||||
static void store_returning_result(PgFdwModifyState *fmstate,
|
static void store_returning_result(PgFdwModifyState *fmstate,
|
||||||
TupleTableSlot *slot, PGresult *res);
|
TupleTableSlot *slot, PGresult *res);
|
||||||
|
static void finish_foreign_modify(PgFdwModifyState *fmstate);
|
||||||
static List *build_remote_returning(Index rtindex, Relation rel,
|
static List *build_remote_returning(Index rtindex, Relation rel,
|
||||||
List *returningList);
|
List *returningList);
|
||||||
static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
|
static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
|
||||||
@ -1681,18 +1690,10 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
|
|||||||
int eflags)
|
int eflags)
|
||||||
{
|
{
|
||||||
PgFdwModifyState *fmstate;
|
PgFdwModifyState *fmstate;
|
||||||
EState *estate = mtstate->ps.state;
|
char *query;
|
||||||
CmdType operation = mtstate->operation;
|
List *target_attrs;
|
||||||
Relation rel = resultRelInfo->ri_RelationDesc;
|
bool has_returning;
|
||||||
RangeTblEntry *rte;
|
List *retrieved_attrs;
|
||||||
Oid userid;
|
|
||||||
ForeignTable *table;
|
|
||||||
UserMapping *user;
|
|
||||||
AttrNumber n_params;
|
|
||||||
Oid typefnoid;
|
|
||||||
bool isvarlena;
|
|
||||||
ListCell *lc;
|
|
||||||
TupleDesc tupdesc = RelationGetDescr(rel);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
|
* Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
|
||||||
@ -1701,82 +1702,25 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
|
|||||||
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
|
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/* Begin constructing PgFdwModifyState. */
|
|
||||||
fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
|
|
||||||
fmstate->rel = rel;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Identify which user to do the remote access as. This should match what
|
|
||||||
* ExecCheckRTEPerms() does.
|
|
||||||
*/
|
|
||||||
rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
|
|
||||||
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
|
|
||||||
|
|
||||||
/* Get info about foreign table. */
|
|
||||||
table = GetForeignTable(RelationGetRelid(rel));
|
|
||||||
user = GetUserMapping(userid, table->serverid);
|
|
||||||
|
|
||||||
/* Open connection; report that we'll create a prepared statement. */
|
|
||||||
fmstate->conn = GetConnection(user, true);
|
|
||||||
fmstate->p_name = NULL; /* prepared statement not made yet */
|
|
||||||
|
|
||||||
/* Deconstruct fdw_private data. */
|
/* Deconstruct fdw_private data. */
|
||||||
fmstate->query = strVal(list_nth(fdw_private,
|
query = strVal(list_nth(fdw_private,
|
||||||
FdwModifyPrivateUpdateSql));
|
FdwModifyPrivateUpdateSql));
|
||||||
fmstate->target_attrs = (List *) list_nth(fdw_private,
|
target_attrs = (List *) list_nth(fdw_private,
|
||||||
FdwModifyPrivateTargetAttnums);
|
FdwModifyPrivateTargetAttnums);
|
||||||
fmstate->has_returning = intVal(list_nth(fdw_private,
|
has_returning = intVal(list_nth(fdw_private,
|
||||||
FdwModifyPrivateHasReturning));
|
FdwModifyPrivateHasReturning));
|
||||||
fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
|
retrieved_attrs = (List *) list_nth(fdw_private,
|
||||||
FdwModifyPrivateRetrievedAttrs);
|
FdwModifyPrivateRetrievedAttrs);
|
||||||
|
|
||||||
/* Create context for per-tuple temp workspace. */
|
/* Construct an execution state. */
|
||||||
fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
|
fmstate = create_foreign_modify(mtstate->ps.state,
|
||||||
"postgres_fdw temporary data",
|
resultRelInfo,
|
||||||
ALLOCSET_SMALL_SIZES);
|
mtstate->operation,
|
||||||
|
mtstate->mt_plans[subplan_index]->plan,
|
||||||
/* Prepare for input conversion of RETURNING results. */
|
query,
|
||||||
if (fmstate->has_returning)
|
target_attrs,
|
||||||
fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
|
has_returning,
|
||||||
|
retrieved_attrs);
|
||||||
/* Prepare for output conversion of parameters used in prepared stmt. */
|
|
||||||
n_params = list_length(fmstate->target_attrs) + 1;
|
|
||||||
fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
|
|
||||||
fmstate->p_nums = 0;
|
|
||||||
|
|
||||||
if (operation == CMD_UPDATE || operation == CMD_DELETE)
|
|
||||||
{
|
|
||||||
/* Find the ctid resjunk column in the subplan's result */
|
|
||||||
Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
|
|
||||||
|
|
||||||
fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
|
|
||||||
"ctid");
|
|
||||||
if (!AttributeNumberIsValid(fmstate->ctidAttno))
|
|
||||||
elog(ERROR, "could not find junk ctid column");
|
|
||||||
|
|
||||||
/* First transmittable parameter will be ctid */
|
|
||||||
getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
|
|
||||||
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
|
|
||||||
fmstate->p_nums++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (operation == CMD_INSERT || operation == CMD_UPDATE)
|
|
||||||
{
|
|
||||||
/* Set up for remaining transmittable parameters */
|
|
||||||
foreach(lc, fmstate->target_attrs)
|
|
||||||
{
|
|
||||||
int attnum = lfirst_int(lc);
|
|
||||||
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
|
|
||||||
|
|
||||||
Assert(!attr->attisdropped);
|
|
||||||
|
|
||||||
getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
|
|
||||||
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
|
|
||||||
fmstate->p_nums++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Assert(fmstate->p_nums <= n_params);
|
|
||||||
|
|
||||||
resultRelInfo->ri_FdwState = fmstate;
|
resultRelInfo->ri_FdwState = fmstate;
|
||||||
}
|
}
|
||||||
@ -2011,28 +1955,8 @@ postgresEndForeignModify(EState *estate,
|
|||||||
if (fmstate == NULL)
|
if (fmstate == NULL)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/* If we created a prepared statement, destroy it */
|
/* Destroy the execution state */
|
||||||
if (fmstate->p_name)
|
finish_foreign_modify(fmstate);
|
||||||
{
|
|
||||||
char sql[64];
|
|
||||||
PGresult *res;
|
|
||||||
|
|
||||||
snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
|
||||||
* without releasing the PGresult.
|
|
||||||
*/
|
|
||||||
res = pgfdw_exec_query(fmstate->conn, sql);
|
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
|
||||||
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
|
|
||||||
PQclear(res);
|
|
||||||
fmstate->p_name = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Release remote connection */
|
|
||||||
ReleaseConnection(fmstate->conn);
|
|
||||||
fmstate->conn = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -3228,6 +3152,109 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
|
|||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* create_foreign_modify
|
||||||
|
* Construct an execution state of a foreign insert/update/delete
|
||||||
|
* operation
|
||||||
|
*/
|
||||||
|
static PgFdwModifyState *
|
||||||
|
create_foreign_modify(EState *estate,
|
||||||
|
ResultRelInfo *resultRelInfo,
|
||||||
|
CmdType operation,
|
||||||
|
Plan *subplan,
|
||||||
|
char *query,
|
||||||
|
List *target_attrs,
|
||||||
|
bool has_returning,
|
||||||
|
List *retrieved_attrs)
|
||||||
|
{
|
||||||
|
PgFdwModifyState *fmstate;
|
||||||
|
Relation rel = resultRelInfo->ri_RelationDesc;
|
||||||
|
TupleDesc tupdesc = RelationGetDescr(rel);
|
||||||
|
RangeTblEntry *rte;
|
||||||
|
Oid userid;
|
||||||
|
ForeignTable *table;
|
||||||
|
UserMapping *user;
|
||||||
|
AttrNumber n_params;
|
||||||
|
Oid typefnoid;
|
||||||
|
bool isvarlena;
|
||||||
|
ListCell *lc;
|
||||||
|
|
||||||
|
/* Begin constructing PgFdwModifyState. */
|
||||||
|
fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
|
||||||
|
fmstate->rel = rel;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Identify which user to do the remote access as. This should match what
|
||||||
|
* ExecCheckRTEPerms() does.
|
||||||
|
*/
|
||||||
|
rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
|
||||||
|
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
|
||||||
|
|
||||||
|
/* Get info about foreign table. */
|
||||||
|
table = GetForeignTable(RelationGetRelid(rel));
|
||||||
|
user = GetUserMapping(userid, table->serverid);
|
||||||
|
|
||||||
|
/* Open connection; report that we'll create a prepared statement. */
|
||||||
|
fmstate->conn = GetConnection(user, true);
|
||||||
|
fmstate->p_name = NULL; /* prepared statement not made yet */
|
||||||
|
|
||||||
|
/* Set up remote query information. */
|
||||||
|
fmstate->query = query;
|
||||||
|
fmstate->target_attrs = target_attrs;
|
||||||
|
fmstate->has_returning = has_returning;
|
||||||
|
fmstate->retrieved_attrs = retrieved_attrs;
|
||||||
|
|
||||||
|
/* Create context for per-tuple temp workspace. */
|
||||||
|
fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
|
||||||
|
"postgres_fdw temporary data",
|
||||||
|
ALLOCSET_SMALL_SIZES);
|
||||||
|
|
||||||
|
/* Prepare for input conversion of RETURNING results. */
|
||||||
|
if (fmstate->has_returning)
|
||||||
|
fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
|
||||||
|
|
||||||
|
/* Prepare for output conversion of parameters used in prepared stmt. */
|
||||||
|
n_params = list_length(fmstate->target_attrs) + 1;
|
||||||
|
fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
|
||||||
|
fmstate->p_nums = 0;
|
||||||
|
|
||||||
|
if (operation == CMD_UPDATE || operation == CMD_DELETE)
|
||||||
|
{
|
||||||
|
Assert(subplan != NULL);
|
||||||
|
|
||||||
|
/* Find the ctid resjunk column in the subplan's result */
|
||||||
|
fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
|
||||||
|
"ctid");
|
||||||
|
if (!AttributeNumberIsValid(fmstate->ctidAttno))
|
||||||
|
elog(ERROR, "could not find junk ctid column");
|
||||||
|
|
||||||
|
/* First transmittable parameter will be ctid */
|
||||||
|
getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
|
||||||
|
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
|
||||||
|
fmstate->p_nums++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (operation == CMD_INSERT || operation == CMD_UPDATE)
|
||||||
|
{
|
||||||
|
/* Set up for remaining transmittable parameters */
|
||||||
|
foreach(lc, fmstate->target_attrs)
|
||||||
|
{
|
||||||
|
int attnum = lfirst_int(lc);
|
||||||
|
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
|
||||||
|
|
||||||
|
Assert(!attr->attisdropped);
|
||||||
|
|
||||||
|
getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
|
||||||
|
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
|
||||||
|
fmstate->p_nums++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert(fmstate->p_nums <= n_params);
|
||||||
|
|
||||||
|
return fmstate;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* prepare_foreign_modify
|
* prepare_foreign_modify
|
||||||
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
|
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
|
||||||
@ -3370,6 +3397,39 @@ store_returning_result(PgFdwModifyState *fmstate,
|
|||||||
PG_END_TRY();
|
PG_END_TRY();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* finish_foreign_modify
|
||||||
|
* Release resources for a foreign insert/update/delete operation
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
finish_foreign_modify(PgFdwModifyState *fmstate)
|
||||||
|
{
|
||||||
|
Assert(fmstate != NULL);
|
||||||
|
|
||||||
|
/* If we created a prepared statement, destroy it */
|
||||||
|
if (fmstate->p_name)
|
||||||
|
{
|
||||||
|
char sql[64];
|
||||||
|
PGresult *res;
|
||||||
|
|
||||||
|
snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't use a PG_TRY block here, so be careful not to throw error
|
||||||
|
* without releasing the PGresult.
|
||||||
|
*/
|
||||||
|
res = pgfdw_exec_query(fmstate->conn, sql);
|
||||||
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
|
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
|
||||||
|
PQclear(res);
|
||||||
|
fmstate->p_name = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Release remote connection */
|
||||||
|
ReleaseConnection(fmstate->conn);
|
||||||
|
fmstate->conn = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* build_remote_returning
|
* build_remote_returning
|
||||||
* Build a RETURNING targetlist of a remote query for performing an
|
* Build a RETURNING targetlist of a remote query for performing an
|
||||||
|
Reference in New Issue
Block a user