mirror of
https://github.com/postgres/postgres.git
synced 2025-07-27 12:41:57 +03:00
Support writable foreign tables.
This patch adds the core-system infrastructure needed to support updates on foreign tables, and extends contrib/postgres_fdw to allow updates against remote Postgres servers. There's still a great deal of room for improvement in optimization of remote updates, but at least there's basic functionality there now. KaiGai Kohei, reviewed by Alexander Korotkov and Laurenz Albe, and rather heavily revised by Tom Lane.
This commit is contained in:
@ -47,6 +47,8 @@ typedef struct ConnCacheEntry
|
||||
PGconn *conn; /* connection to foreign server, or NULL */
|
||||
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
|
||||
* one level of subxact open, etc */
|
||||
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
|
||||
bool have_error; /* have any subxacts aborted in this xact? */
|
||||
} ConnCacheEntry;
|
||||
|
||||
/*
|
||||
@ -54,8 +56,9 @@ typedef struct ConnCacheEntry
|
||||
*/
|
||||
static HTAB *ConnectionHash = NULL;
|
||||
|
||||
/* for assigning cursor numbers */
|
||||
/* for assigning cursor numbers and prepared statement numbers */
|
||||
static unsigned int cursor_number = 0;
|
||||
static unsigned int prep_stmt_number = 0;
|
||||
|
||||
/* tracks whether any work is needed in callback functions */
|
||||
static bool xact_got_connection = false;
|
||||
@ -78,6 +81,10 @@ static void pgfdw_subxact_callback(SubXactEvent event,
|
||||
* if we don't already have a suitable one, and a transaction is opened at
|
||||
* the right subtransaction nesting depth if we didn't do that already.
|
||||
*
|
||||
* will_prep_stmt must be true if caller intends to create any prepared
|
||||
* statements. Since those don't go away automatically at transaction end
|
||||
* (not even on error), we need this flag to cue manual cleanup.
|
||||
*
|
||||
* XXX Note that caching connections theoretically requires a mechanism to
|
||||
* detect change of FDW objects to invalidate already established connections.
|
||||
* We could manage that by watching for invalidation events on the relevant
|
||||
@ -86,7 +93,8 @@ static void pgfdw_subxact_callback(SubXactEvent event,
|
||||
* mid-transaction anyway.
|
||||
*/
|
||||
PGconn *
|
||||
GetConnection(ForeignServer *server, UserMapping *user)
|
||||
GetConnection(ForeignServer *server, UserMapping *user,
|
||||
bool will_prep_stmt)
|
||||
{
|
||||
bool found;
|
||||
ConnCacheEntry *entry;
|
||||
@ -131,6 +139,8 @@ GetConnection(ForeignServer *server, UserMapping *user)
|
||||
/* initialize new hashtable entry (key is already filled in) */
|
||||
entry->conn = NULL;
|
||||
entry->xact_depth = 0;
|
||||
entry->have_prep_stmt = false;
|
||||
entry->have_error = false;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -147,6 +157,8 @@ GetConnection(ForeignServer *server, UserMapping *user)
|
||||
if (entry->conn == NULL)
|
||||
{
|
||||
entry->xact_depth = 0; /* just to be sure */
|
||||
entry->have_prep_stmt = false;
|
||||
entry->have_error = false;
|
||||
entry->conn = connect_pg_server(server, user);
|
||||
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
|
||||
entry->conn, server->servername);
|
||||
@ -157,6 +169,9 @@ GetConnection(ForeignServer *server, UserMapping *user)
|
||||
*/
|
||||
begin_remote_xact(entry);
|
||||
|
||||
/* Remember if caller will prepare statements */
|
||||
entry->have_prep_stmt |= will_prep_stmt;
|
||||
|
||||
return entry->conn;
|
||||
}
|
||||
|
||||
@ -393,6 +408,20 @@ GetCursorNumber(PGconn *conn)
|
||||
return ++cursor_number;
|
||||
}
|
||||
|
||||
/*
|
||||
* Assign a "unique" number for a prepared statement.
|
||||
*
|
||||
* This works much like GetCursorNumber, except that we never reset the counter
|
||||
* within a session. That's because we can't be 100% sure we've gotten rid
|
||||
* of all prepared statements on all connections, and it's not really worth
|
||||
* increasing the risk of prepared-statement name collisions by resetting.
|
||||
*/
|
||||
unsigned int
|
||||
GetPrepStmtNumber(PGconn *conn)
|
||||
{
|
||||
return ++prep_stmt_number;
|
||||
}
|
||||
|
||||
/*
|
||||
* Report an error we got from the remote server.
|
||||
*
|
||||
@ -400,6 +429,10 @@ GetCursorNumber(PGconn *conn)
|
||||
* res: PGresult containing the error
|
||||
* clear: if true, PQclear the result (otherwise caller will handle it)
|
||||
* sql: NULL, or text of remote command we tried to execute
|
||||
*
|
||||
* Note: callers that choose not to throw ERROR for a remote error are
|
||||
* responsible for making sure that the associated ConnCacheEntry gets
|
||||
* marked with have_error = true.
|
||||
*/
|
||||
void
|
||||
pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
|
||||
@ -480,6 +513,22 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
pgfdw_report_error(ERROR, res, true, "COMMIT TRANSACTION");
|
||||
PQclear(res);
|
||||
|
||||
/*
|
||||
* If there were any errors in subtransactions, and we made
|
||||
* prepared statements, do a DEALLOCATE ALL to make sure we
|
||||
* get rid of all prepared statements. This is annoying and
|
||||
* not terribly bulletproof, but it's probably not worth
|
||||
* trying harder. We intentionally ignore any errors in the
|
||||
* DEALLOCATE.
|
||||
*/
|
||||
if (entry->have_prep_stmt && entry->have_error)
|
||||
{
|
||||
res = PQexec(entry->conn, "DEALLOCATE ALL");
|
||||
PQclear(res);
|
||||
}
|
||||
entry->have_prep_stmt = false;
|
||||
entry->have_error = false;
|
||||
break;
|
||||
case XACT_EVENT_PRE_PREPARE:
|
||||
|
||||
@ -502,6 +551,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
||||
elog(ERROR, "missed cleaning up connection during pre-commit");
|
||||
break;
|
||||
case XACT_EVENT_ABORT:
|
||||
/* Assume we might have lost track of prepared statements */
|
||||
entry->have_error = true;
|
||||
/* If we're aborting, abort all remote transactions too */
|
||||
res = PQexec(entry->conn, "ABORT TRANSACTION");
|
||||
/* Note: can't throw ERROR, it would be infinite loop */
|
||||
@ -509,7 +560,17 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
||||
pgfdw_report_error(WARNING, res, true,
|
||||
"ABORT TRANSACTION");
|
||||
else
|
||||
{
|
||||
PQclear(res);
|
||||
/* As above, make sure we've cleared any prepared stmts */
|
||||
if (entry->have_prep_stmt && entry->have_error)
|
||||
{
|
||||
res = PQexec(entry->conn, "DEALLOCATE ALL");
|
||||
PQclear(res);
|
||||
}
|
||||
entry->have_prep_stmt = false;
|
||||
entry->have_error = false;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@ -593,6 +654,8 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Assume we might have lost track of prepared statements */
|
||||
entry->have_error = true;
|
||||
/* Rollback all remote subtransactions during abort */
|
||||
snprintf(sql, sizeof(sql),
|
||||
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
|
||||
|
@ -25,6 +25,7 @@
|
||||
|
||||
#include "postgres_fdw.h"
|
||||
|
||||
#include "access/heapam.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "access/sysattr.h"
|
||||
#include "access/transam.h"
|
||||
@ -66,6 +67,14 @@ static bool is_builtin(Oid procid);
|
||||
/*
|
||||
* Functions to construct string representation of a node tree.
|
||||
*/
|
||||
static void deparseTargetList(StringInfo buf,
|
||||
PlannerInfo *root,
|
||||
Index rtindex,
|
||||
Relation rel,
|
||||
Bitmapset *attrs_used);
|
||||
static void deparseReturningList(StringInfo buf, PlannerInfo *root,
|
||||
Index rtindex, Relation rel,
|
||||
List *returningList);
|
||||
static void deparseColumnRef(StringInfo buf, int varno, int varattno,
|
||||
PlannerInfo *root);
|
||||
static void deparseRelation(StringInfo buf, Oid relid);
|
||||
@ -349,80 +358,104 @@ is_builtin(Oid oid)
|
||||
|
||||
|
||||
/*
|
||||
* Construct a simple SELECT statement that retrieves interesting columns
|
||||
* Construct a simple SELECT statement that retrieves desired columns
|
||||
* of the specified foreign table, and append it to "buf". The output
|
||||
* contains just "SELECT ... FROM tablename".
|
||||
*
|
||||
* "Interesting" columns are those appearing in the rel's targetlist or
|
||||
* in local_conds (conditions which can't be executed remotely).
|
||||
*/
|
||||
void
|
||||
deparseSimpleSql(StringInfo buf,
|
||||
deparseSelectSql(StringInfo buf,
|
||||
PlannerInfo *root,
|
||||
RelOptInfo *baserel,
|
||||
List *local_conds)
|
||||
Bitmapset *attrs_used)
|
||||
{
|
||||
RangeTblEntry *rte = root->simple_rte_array[baserel->relid];
|
||||
Bitmapset *attrs_used = NULL;
|
||||
RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
|
||||
Relation rel;
|
||||
|
||||
/*
|
||||
* Core code already has some lock on each rel being planned, so we can
|
||||
* use NoLock here.
|
||||
*/
|
||||
rel = heap_open(rte->relid, NoLock);
|
||||
|
||||
/*
|
||||
* Construct SELECT list
|
||||
*/
|
||||
appendStringInfoString(buf, "SELECT ");
|
||||
deparseTargetList(buf, root, baserel->relid, rel, attrs_used);
|
||||
|
||||
/*
|
||||
* Construct FROM clause
|
||||
*/
|
||||
appendStringInfoString(buf, " FROM ");
|
||||
deparseRelation(buf, RelationGetRelid(rel));
|
||||
|
||||
heap_close(rel, NoLock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Emit a target list that retrieves the columns specified in attrs_used.
|
||||
* This is used for both SELECT and RETURNING targetlists.
|
||||
*
|
||||
* We list attributes in order of the foreign table's columns, but replace
|
||||
* any attributes that need not be fetched with NULL constants. (We can't
|
||||
* just omit such attributes, or we'll lose track of which columns are
|
||||
* which at runtime.) Note however that any dropped columns are ignored.
|
||||
* Also, if ctid needs to be retrieved, it's added at the end.
|
||||
*/
|
||||
static void
|
||||
deparseTargetList(StringInfo buf,
|
||||
PlannerInfo *root,
|
||||
Index rtindex,
|
||||
Relation rel,
|
||||
Bitmapset *attrs_used)
|
||||
{
|
||||
TupleDesc tupdesc = RelationGetDescr(rel);
|
||||
bool have_wholerow;
|
||||
bool first;
|
||||
AttrNumber attr;
|
||||
ListCell *lc;
|
||||
|
||||
/* Collect all the attributes needed for joins or final output. */
|
||||
pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
|
||||
&attrs_used);
|
||||
|
||||
/* Add all the attributes used by local_conds. */
|
||||
foreach(lc, local_conds)
|
||||
{
|
||||
RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
|
||||
|
||||
pull_varattnos((Node *) rinfo->clause, baserel->relid,
|
||||
&attrs_used);
|
||||
}
|
||||
int i;
|
||||
|
||||
/* If there's a whole-row reference, we'll need all the columns. */
|
||||
have_wholerow = bms_is_member(0 - FirstLowInvalidHeapAttributeNumber,
|
||||
attrs_used);
|
||||
|
||||
/*
|
||||
* Construct SELECT list
|
||||
*
|
||||
* We list attributes in order of the foreign table's columns, but replace
|
||||
* any attributes that need not be fetched with NULL constants. (We can't
|
||||
* just omit such attributes, or we'll lose track of which columns are
|
||||
* which at runtime.) Note however that any dropped columns are ignored.
|
||||
*/
|
||||
appendStringInfo(buf, "SELECT ");
|
||||
first = true;
|
||||
for (attr = 1; attr <= baserel->max_attr; attr++)
|
||||
for (i = 1; i <= tupdesc->natts; i++)
|
||||
{
|
||||
Form_pg_attribute attr = tupdesc->attrs[i - 1];
|
||||
|
||||
/* Ignore dropped attributes. */
|
||||
if (get_rte_attribute_is_dropped(rte, attr))
|
||||
if (attr->attisdropped)
|
||||
continue;
|
||||
|
||||
if (!first)
|
||||
appendStringInfo(buf, ", ");
|
||||
appendStringInfoString(buf, ", ");
|
||||
first = false;
|
||||
|
||||
if (have_wholerow ||
|
||||
bms_is_member(attr - FirstLowInvalidHeapAttributeNumber,
|
||||
bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
|
||||
attrs_used))
|
||||
deparseColumnRef(buf, baserel->relid, attr, root);
|
||||
deparseColumnRef(buf, rtindex, i, root);
|
||||
else
|
||||
appendStringInfo(buf, "NULL");
|
||||
appendStringInfoString(buf, "NULL");
|
||||
}
|
||||
|
||||
/*
|
||||
* Add ctid if needed. We currently don't support retrieving any other
|
||||
* system columns.
|
||||
*/
|
||||
if (bms_is_member(SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber,
|
||||
attrs_used))
|
||||
{
|
||||
if (!first)
|
||||
appendStringInfoString(buf, ", ");
|
||||
first = false;
|
||||
|
||||
appendStringInfoString(buf, "ctid");
|
||||
}
|
||||
|
||||
/* Don't generate bad syntax if no undropped columns */
|
||||
if (first)
|
||||
appendStringInfo(buf, "NULL");
|
||||
|
||||
/*
|
||||
* Construct FROM clause
|
||||
*/
|
||||
appendStringInfo(buf, " FROM ");
|
||||
deparseRelation(buf, rte->relid);
|
||||
appendStringInfoString(buf, "NULL");
|
||||
}
|
||||
|
||||
/*
|
||||
@ -432,9 +465,9 @@ deparseSimpleSql(StringInfo buf,
|
||||
*/
|
||||
void
|
||||
appendWhereClause(StringInfo buf,
|
||||
bool is_first,
|
||||
PlannerInfo *root,
|
||||
List *exprs,
|
||||
PlannerInfo *root)
|
||||
bool is_first)
|
||||
{
|
||||
ListCell *lc;
|
||||
|
||||
@ -444,9 +477,9 @@ appendWhereClause(StringInfo buf,
|
||||
|
||||
/* Connect expressions with "AND" and parenthesize each condition. */
|
||||
if (is_first)
|
||||
appendStringInfo(buf, " WHERE ");
|
||||
appendStringInfoString(buf, " WHERE ");
|
||||
else
|
||||
appendStringInfo(buf, " AND ");
|
||||
appendStringInfoString(buf, " AND ");
|
||||
|
||||
appendStringInfoChar(buf, '(');
|
||||
deparseExpr(buf, ri->clause, root);
|
||||
@ -456,6 +489,147 @@ appendWhereClause(StringInfo buf,
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* deparse remote INSERT statement
|
||||
*/
|
||||
void
|
||||
deparseInsertSql(StringInfo buf, PlannerInfo *root, Index rtindex,
|
||||
List *targetAttrs, List *returningList)
|
||||
{
|
||||
RangeTblEntry *rte = planner_rt_fetch(rtindex, root);
|
||||
Relation rel = heap_open(rte->relid, NoLock);
|
||||
TupleDesc tupdesc = RelationGetDescr(rel);
|
||||
AttrNumber pindex;
|
||||
bool first;
|
||||
ListCell *lc;
|
||||
|
||||
appendStringInfoString(buf, "INSERT INTO ");
|
||||
deparseRelation(buf, rte->relid);
|
||||
appendStringInfoString(buf, "(");
|
||||
|
||||
first = true;
|
||||
foreach(lc, targetAttrs)
|
||||
{
|
||||
int attnum = lfirst_int(lc);
|
||||
Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
|
||||
|
||||
Assert(!attr->attisdropped);
|
||||
|
||||
if (!first)
|
||||
appendStringInfoString(buf, ", ");
|
||||
first = false;
|
||||
|
||||
deparseColumnRef(buf, rtindex, attnum, root);
|
||||
}
|
||||
|
||||
appendStringInfoString(buf, ") VALUES (");
|
||||
|
||||
pindex = 1;
|
||||
first = true;
|
||||
foreach(lc, targetAttrs)
|
||||
{
|
||||
if (!first)
|
||||
appendStringInfoString(buf, ", ");
|
||||
first = false;
|
||||
|
||||
appendStringInfo(buf, "$%d", pindex);
|
||||
pindex++;
|
||||
}
|
||||
|
||||
appendStringInfoString(buf, ")");
|
||||
|
||||
if (returningList)
|
||||
deparseReturningList(buf, root, rtindex, rel, returningList);
|
||||
|
||||
heap_close(rel, NoLock);
|
||||
}
|
||||
|
||||
/*
|
||||
* deparse remote UPDATE statement
|
||||
*/
|
||||
void
|
||||
deparseUpdateSql(StringInfo buf, PlannerInfo *root, Index rtindex,
|
||||
List *targetAttrs, List *returningList)
|
||||
{
|
||||
RangeTblEntry *rte = planner_rt_fetch(rtindex, root);
|
||||
Relation rel = heap_open(rte->relid, NoLock);
|
||||
TupleDesc tupdesc = RelationGetDescr(rel);
|
||||
AttrNumber pindex;
|
||||
bool first;
|
||||
ListCell *lc;
|
||||
|
||||
appendStringInfoString(buf, "UPDATE ");
|
||||
deparseRelation(buf, rte->relid);
|
||||
appendStringInfoString(buf, " SET ");
|
||||
|
||||
pindex = 2; /* ctid is always the first param */
|
||||
first = true;
|
||||
foreach(lc, targetAttrs)
|
||||
{
|
||||
int attnum = lfirst_int(lc);
|
||||
Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
|
||||
|
||||
Assert(!attr->attisdropped);
|
||||
|
||||
if (!first)
|
||||
appendStringInfoString(buf, ", ");
|
||||
first = false;
|
||||
|
||||
deparseColumnRef(buf, rtindex, attnum, root);
|
||||
appendStringInfo(buf, " = $%d", pindex);
|
||||
pindex++;
|
||||
}
|
||||
appendStringInfoString(buf, " WHERE ctid = $1");
|
||||
|
||||
if (returningList)
|
||||
deparseReturningList(buf, root, rtindex, rel, returningList);
|
||||
|
||||
heap_close(rel, NoLock);
|
||||
}
|
||||
|
||||
/*
|
||||
* deparse remote DELETE statement
|
||||
*/
|
||||
void
|
||||
deparseDeleteSql(StringInfo buf, PlannerInfo *root, Index rtindex,
|
||||
List *returningList)
|
||||
{
|
||||
RangeTblEntry *rte = planner_rt_fetch(rtindex, root);
|
||||
|
||||
appendStringInfoString(buf, "DELETE FROM ");
|
||||
deparseRelation(buf, rte->relid);
|
||||
appendStringInfoString(buf, " WHERE ctid = $1");
|
||||
|
||||
if (returningList)
|
||||
{
|
||||
Relation rel = heap_open(rte->relid, NoLock);
|
||||
|
||||
deparseReturningList(buf, root, rtindex, rel, returningList);
|
||||
heap_close(rel, NoLock);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* deparse RETURNING clause of INSERT/UPDATE/DELETE
|
||||
*/
|
||||
static void
|
||||
deparseReturningList(StringInfo buf, PlannerInfo *root,
|
||||
Index rtindex, Relation rel,
|
||||
List *returningList)
|
||||
{
|
||||
Bitmapset *attrs_used;
|
||||
|
||||
/*
|
||||
* We need the attrs mentioned in the query's RETURNING list.
|
||||
*/
|
||||
attrs_used = NULL;
|
||||
pull_varattnos((Node *) returningList, rtindex,
|
||||
&attrs_used);
|
||||
|
||||
appendStringInfoString(buf, " RETURNING ");
|
||||
deparseTargetList(buf, root, rtindex, rel, attrs_used);
|
||||
}
|
||||
|
||||
/*
|
||||
* Construct SELECT statement to acquire size in blocks of given relation.
|
||||
*
|
||||
@ -495,13 +669,17 @@ deparseAnalyzeSql(StringInfo buf, Relation rel)
|
||||
ListCell *lc;
|
||||
bool first = true;
|
||||
|
||||
appendStringInfo(buf, "SELECT ");
|
||||
appendStringInfoString(buf, "SELECT ");
|
||||
for (i = 0; i < tupdesc->natts; i++)
|
||||
{
|
||||
/* Ignore dropped columns. */
|
||||
if (tupdesc->attrs[i]->attisdropped)
|
||||
continue;
|
||||
|
||||
if (!first)
|
||||
appendStringInfoString(buf, ", ");
|
||||
first = false;
|
||||
|
||||
/* Use attribute name or column_name option. */
|
||||
colname = NameStr(tupdesc->attrs[i]->attname);
|
||||
options = GetForeignColumnOptions(relid, i + 1);
|
||||
@ -517,20 +695,17 @@ deparseAnalyzeSql(StringInfo buf, Relation rel)
|
||||
}
|
||||
}
|
||||
|
||||
if (!first)
|
||||
appendStringInfo(buf, ", ");
|
||||
appendStringInfoString(buf, quote_identifier(colname));
|
||||
first = false;
|
||||
}
|
||||
|
||||
/* Don't generate bad syntax for zero-column relation. */
|
||||
if (first)
|
||||
appendStringInfo(buf, "NULL");
|
||||
appendStringInfoString(buf, "NULL");
|
||||
|
||||
/*
|
||||
* Construct FROM clause
|
||||
*/
|
||||
appendStringInfo(buf, " FROM ");
|
||||
appendStringInfoString(buf, " FROM ");
|
||||
deparseRelation(buf, relid);
|
||||
}
|
||||
|
||||
@ -547,10 +722,10 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, PlannerInfo *root)
|
||||
ListCell *lc;
|
||||
|
||||
/* varno must not be any of OUTER_VAR, INNER_VAR and INDEX_VAR. */
|
||||
Assert(varno >= 1 && varno <= root->simple_rel_array_size);
|
||||
Assert(!IS_SPECIAL_VARNO(varno));
|
||||
|
||||
/* Get RangeTblEntry from array in PlannerInfo. */
|
||||
rte = root->simple_rte_array[varno];
|
||||
rte = planner_rt_fetch(varno, root);
|
||||
|
||||
/*
|
||||
* If it's a column of a foreign table, and it has the column_name FDW
|
||||
@ -608,8 +783,8 @@ deparseRelation(StringInfo buf, Oid relid)
|
||||
}
|
||||
|
||||
/*
|
||||
* Note: we could skip printing the schema name if it's pg_catalog,
|
||||
* but that doesn't seem worth the trouble.
|
||||
* Note: we could skip printing the schema name if it's pg_catalog, but
|
||||
* that doesn't seem worth the trouble.
|
||||
*/
|
||||
if (nspname == NULL)
|
||||
nspname = get_namespace_name(get_rel_namespace(relid));
|
||||
@ -1059,7 +1234,7 @@ deparseDistinctExpr(StringInfo buf, DistinctExpr *node, PlannerInfo *root)
|
||||
|
||||
appendStringInfoChar(buf, '(');
|
||||
deparseExpr(buf, linitial(node->args), root);
|
||||
appendStringInfo(buf, " IS DISTINCT FROM ");
|
||||
appendStringInfoString(buf, " IS DISTINCT FROM ");
|
||||
deparseExpr(buf, lsecond(node->args), root);
|
||||
appendStringInfoChar(buf, ')');
|
||||
}
|
||||
@ -1146,7 +1321,7 @@ deparseBoolExpr(StringInfo buf, BoolExpr *node, PlannerInfo *root)
|
||||
op = "OR";
|
||||
break;
|
||||
case NOT_EXPR:
|
||||
appendStringInfo(buf, "(NOT ");
|
||||
appendStringInfoString(buf, "(NOT ");
|
||||
deparseExpr(buf, linitial(node->args), root);
|
||||
appendStringInfoChar(buf, ')');
|
||||
return;
|
||||
@ -1173,9 +1348,9 @@ deparseNullTest(StringInfo buf, NullTest *node, PlannerInfo *root)
|
||||
appendStringInfoChar(buf, '(');
|
||||
deparseExpr(buf, node->arg, root);
|
||||
if (node->nulltesttype == IS_NULL)
|
||||
appendStringInfo(buf, " IS NULL)");
|
||||
appendStringInfoString(buf, " IS NULL)");
|
||||
else
|
||||
appendStringInfo(buf, " IS NOT NULL)");
|
||||
appendStringInfoString(buf, " IS NOT NULL)");
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1187,11 +1362,11 @@ deparseArrayExpr(StringInfo buf, ArrayExpr *node, PlannerInfo *root)
|
||||
bool first = true;
|
||||
ListCell *lc;
|
||||
|
||||
appendStringInfo(buf, "ARRAY[");
|
||||
appendStringInfoString(buf, "ARRAY[");
|
||||
foreach(lc, node->elements)
|
||||
{
|
||||
if (!first)
|
||||
appendStringInfo(buf, ", ");
|
||||
appendStringInfoString(buf, ", ");
|
||||
deparseExpr(buf, lfirst(lc), root);
|
||||
first = false;
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -21,9 +21,11 @@
|
||||
#include "libpq-fe.h"
|
||||
|
||||
/* in connection.c */
|
||||
extern PGconn *GetConnection(ForeignServer *server, UserMapping *user);
|
||||
extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
|
||||
bool will_prep_stmt);
|
||||
extern void ReleaseConnection(PGconn *conn);
|
||||
extern unsigned int GetCursorNumber(PGconn *conn);
|
||||
extern unsigned int GetPrepStmtNumber(PGconn *conn);
|
||||
extern void pgfdw_report_error(int elevel, PGresult *res, bool clear,
|
||||
const char *sql);
|
||||
|
||||
@ -39,14 +41,20 @@ extern void classifyConditions(PlannerInfo *root,
|
||||
List **param_conds,
|
||||
List **local_conds,
|
||||
List **param_numbers);
|
||||
extern void deparseSimpleSql(StringInfo buf,
|
||||
extern void deparseSelectSql(StringInfo buf,
|
||||
PlannerInfo *root,
|
||||
RelOptInfo *baserel,
|
||||
List *local_conds);
|
||||
Bitmapset *attrs_used);
|
||||
extern void appendWhereClause(StringInfo buf,
|
||||
bool has_where,
|
||||
PlannerInfo *root,
|
||||
List *exprs,
|
||||
PlannerInfo *root);
|
||||
bool is_first);
|
||||
extern void deparseInsertSql(StringInfo buf, PlannerInfo *root, Index rtindex,
|
||||
List *targetAttrs, List *returningList);
|
||||
extern void deparseUpdateSql(StringInfo buf, PlannerInfo *root, Index rtindex,
|
||||
List *targetAttrs, List *returningList);
|
||||
extern void deparseDeleteSql(StringInfo buf, PlannerInfo *root, Index rtindex,
|
||||
List *returningList);
|
||||
extern void deparseAnalyzeSizeSql(StringInfo buf, Relation rel);
|
||||
extern void deparseAnalyzeSql(StringInfo buf, Relation rel);
|
||||
|
||||
|
@ -273,3 +273,77 @@ ROLLBACK TO s;
|
||||
FETCH c;
|
||||
SELECT * FROM ft1 ORDER BY c1 LIMIT 1;
|
||||
COMMIT;
|
||||
|
||||
-- ===================================================================
|
||||
-- test writable foreign table stuff
|
||||
-- ===================================================================
|
||||
EXPLAIN (verbose, costs off)
|
||||
INSERT INTO ft2 (c1,c2,c3) SELECT c1+1000,c2+100, c3 || c3 FROM ft2 LIMIT 20;
|
||||
INSERT INTO ft2 (c1,c2,c3) SELECT c1+1000,c2+100, c3 || c3 FROM ft2 LIMIT 20;
|
||||
INSERT INTO ft2 (c1,c2,c3)
|
||||
VALUES (1101,201,'aaa'), (1102,202,'bbb'), (1103,203,'ccc') RETURNING *;
|
||||
INSERT INTO ft2 (c1,c2,c3) VALUES (1104,204,'ddd'), (1105,205,'eee');
|
||||
UPDATE ft2 SET c2 = c2 + 300, c3 = c3 || '_update3' WHERE c1 % 10 = 3;
|
||||
UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING *;
|
||||
EXPLAIN (verbose, costs off)
|
||||
UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9'
|
||||
FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
|
||||
UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9'
|
||||
FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
|
||||
DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING *;
|
||||
EXPLAIN (verbose, costs off)
|
||||
DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;
|
||||
DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2;
|
||||
SELECT c1,c2,c3,c4 FROM ft2 ORDER BY c1;
|
||||
|
||||
-- Test that defaults and triggers on remote table work as expected
|
||||
ALTER TABLE "S 1"."T 1" ALTER c6 SET DEFAULT '(^-^;)';
|
||||
CREATE OR REPLACE FUNCTION "S 1".F_BRTRIG() RETURNS trigger AS $$
|
||||
BEGIN
|
||||
NEW.c3 = NEW.c3 || '_trig_update';
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE TRIGGER t1_br_insert BEFORE INSERT OR UPDATE
|
||||
ON "S 1"."T 1" FOR EACH ROW EXECUTE PROCEDURE "S 1".F_BRTRIG();
|
||||
|
||||
INSERT INTO ft2 (c1,c2,c3) VALUES (1208, 218, 'fff') RETURNING *;
|
||||
INSERT INTO ft2 (c1,c2,c3,c6) VALUES (1218, 218, 'ggg', '(--;') RETURNING *;
|
||||
UPDATE ft2 SET c2 = c2 + 600 WHERE c1 % 10 = 8 RETURNING *;
|
||||
|
||||
-- Test errors thrown on remote side during update
|
||||
ALTER TABLE "S 1"."T 1" ADD CONSTRAINT c2positive CHECK (c2 >= 0);
|
||||
|
||||
INSERT INTO ft1(c1, c2) VALUES(11, 12); -- duplicate key
|
||||
INSERT INTO ft1(c1, c2) VALUES(1111, -2); -- c2positive
|
||||
UPDATE ft1 SET c2 = -c2 WHERE c1 = 1; -- c2positive
|
||||
|
||||
-- Test savepoint/rollback behavior
|
||||
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
|
||||
select c2, count(*) from "S 1"."T 1" where c2 < 500 group by 1 order by 1;
|
||||
begin;
|
||||
update ft2 set c2 = 42 where c2 = 0;
|
||||
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
|
||||
savepoint s1;
|
||||
update ft2 set c2 = 44 where c2 = 4;
|
||||
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
|
||||
release savepoint s1;
|
||||
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
|
||||
savepoint s2;
|
||||
update ft2 set c2 = 46 where c2 = 6;
|
||||
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
|
||||
rollback to savepoint s2;
|
||||
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
|
||||
release savepoint s2;
|
||||
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
|
||||
savepoint s3;
|
||||
update ft2 set c2 = -2 where c2 = 42; -- fail on remote side
|
||||
rollback to savepoint s3;
|
||||
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
|
||||
release savepoint s3;
|
||||
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
|
||||
-- none of the above is committed yet remotely
|
||||
select c2, count(*) from "S 1"."T 1" where c2 < 500 group by 1 order by 1;
|
||||
commit;
|
||||
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
|
||||
select c2, count(*) from "S 1"."T 1" where c2 < 500 group by 1 order by 1;
|
||||
|
Reference in New Issue
Block a user