1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-28 23:42:10 +03:00

Implement IMPORT FOREIGN SCHEMA.

This command provides an automated way to create foreign table definitions
that match remote tables, thereby reducing tedium and chances for error.
In this patch, we provide the necessary core-server infrastructure and
implement the feature fully in the postgres_fdw foreign-data wrapper.
Other wrappers will throw a "feature not supported" error until/unless
they are updated.

Ronan Dunklau and Michael Paquier, additional work by me
This commit is contained in:
Tom Lane
2014-07-10 15:01:31 -04:00
parent 6a605cd6bd
commit 59efda3e50
29 changed files with 1239 additions and 15 deletions

View File

@ -116,7 +116,6 @@ static void deparseReturningList(StringInfo buf, PlannerInfo *root,
static void deparseColumnRef(StringInfo buf, int varno, int varattno,
PlannerInfo *root);
static void deparseRelation(StringInfo buf, Relation rel);
static void deparseStringLiteral(StringInfo buf, const char *val);
static void deparseExpr(Expr *expr, deparse_expr_cxt *context);
static void deparseVar(Var *node, deparse_expr_cxt *context);
static void deparseConst(Const *node, deparse_expr_cxt *context);
@ -1160,7 +1159,7 @@ deparseRelation(StringInfo buf, Relation rel)
/*
* Append a SQL string literal representing "val" to buf.
*/
static void
void
deparseStringLiteral(StringInfo buf, const char *val)
{
const char *valptr;

View File

@ -2834,3 +2834,233 @@ NOTICE: NEW: (13,"test triggered !")
(0,27)
(1 row)
-- ===================================================================
-- test IMPORT FOREIGN SCHEMA
-- ===================================================================
CREATE SCHEMA import_source;
CREATE TABLE import_source.t1 (c1 int, c2 varchar NOT NULL);
CREATE TABLE import_source.t2 (c1 int default 42, c2 varchar NULL, c3 text collate "POSIX");
CREATE TYPE typ1 AS (m1 int, m2 varchar);
CREATE TABLE import_source.t3 (c1 timestamptz default now(), c2 typ1);
CREATE TABLE import_source."x 4" (c1 float8, "C 2" text, c3 varchar(42));
CREATE TABLE import_source."x 5" (c1 float8);
ALTER TABLE import_source."x 5" DROP COLUMN c1;
CREATE SCHEMA import_dest1;
IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_dest1;
\det+ import_dest1
List of foreign tables
Schema | Table | Server | FDW Options | Description
--------------+-------+----------+-------------------------------------------------+-------------
import_dest1 | t1 | loopback | (schema_name 'import_source', table_name 't1') |
import_dest1 | t2 | loopback | (schema_name 'import_source', table_name 't2') |
import_dest1 | t3 | loopback | (schema_name 'import_source', table_name 't3') |
import_dest1 | x 4 | loopback | (schema_name 'import_source', table_name 'x 4') |
import_dest1 | x 5 | loopback | (schema_name 'import_source', table_name 'x 5') |
(5 rows)
\d import_dest1.*
Foreign table "import_dest1.t1"
Column | Type | Modifiers | FDW Options
--------+-------------------+-----------+--------------------
c1 | integer | | (column_name 'c1')
c2 | character varying | not null | (column_name 'c2')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 't1')
Foreign table "import_dest1.t2"
Column | Type | Modifiers | FDW Options
--------+-------------------+---------------+--------------------
c1 | integer | | (column_name 'c1')
c2 | character varying | | (column_name 'c2')
c3 | text | collate POSIX | (column_name 'c3')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 't2')
Foreign table "import_dest1.t3"
Column | Type | Modifiers | FDW Options
--------+--------------------------+-----------+--------------------
c1 | timestamp with time zone | | (column_name 'c1')
c2 | typ1 | | (column_name 'c2')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 't3')
Foreign table "import_dest1.x 4"
Column | Type | Modifiers | FDW Options
--------+-----------------------+-----------+---------------------
c1 | double precision | | (column_name 'c1')
C 2 | text | | (column_name 'C 2')
c3 | character varying(42) | | (column_name 'c3')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 'x 4')
Foreign table "import_dest1.x 5"
Column | Type | Modifiers | FDW Options
--------+------+-----------+-------------
Server: loopback
FDW Options: (schema_name 'import_source', table_name 'x 5')
-- Options
CREATE SCHEMA import_dest2;
IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_dest2
OPTIONS (import_default 'true');
\det+ import_dest2
List of foreign tables
Schema | Table | Server | FDW Options | Description
--------------+-------+----------+-------------------------------------------------+-------------
import_dest2 | t1 | loopback | (schema_name 'import_source', table_name 't1') |
import_dest2 | t2 | loopback | (schema_name 'import_source', table_name 't2') |
import_dest2 | t3 | loopback | (schema_name 'import_source', table_name 't3') |
import_dest2 | x 4 | loopback | (schema_name 'import_source', table_name 'x 4') |
import_dest2 | x 5 | loopback | (schema_name 'import_source', table_name 'x 5') |
(5 rows)
\d import_dest2.*
Foreign table "import_dest2.t1"
Column | Type | Modifiers | FDW Options
--------+-------------------+-----------+--------------------
c1 | integer | | (column_name 'c1')
c2 | character varying | not null | (column_name 'c2')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 't1')
Foreign table "import_dest2.t2"
Column | Type | Modifiers | FDW Options
--------+-------------------+---------------+--------------------
c1 | integer | default 42 | (column_name 'c1')
c2 | character varying | | (column_name 'c2')
c3 | text | collate POSIX | (column_name 'c3')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 't2')
Foreign table "import_dest2.t3"
Column | Type | Modifiers | FDW Options
--------+--------------------------+---------------+--------------------
c1 | timestamp with time zone | default now() | (column_name 'c1')
c2 | typ1 | | (column_name 'c2')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 't3')
Foreign table "import_dest2.x 4"
Column | Type | Modifiers | FDW Options
--------+-----------------------+-----------+---------------------
c1 | double precision | | (column_name 'c1')
C 2 | text | | (column_name 'C 2')
c3 | character varying(42) | | (column_name 'c3')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 'x 4')
Foreign table "import_dest2.x 5"
Column | Type | Modifiers | FDW Options
--------+------+-----------+-------------
Server: loopback
FDW Options: (schema_name 'import_source', table_name 'x 5')
CREATE SCHEMA import_dest3;
IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_dest3
OPTIONS (import_collate 'false', import_not_null 'false');
\det+ import_dest3
List of foreign tables
Schema | Table | Server | FDW Options | Description
--------------+-------+----------+-------------------------------------------------+-------------
import_dest3 | t1 | loopback | (schema_name 'import_source', table_name 't1') |
import_dest3 | t2 | loopback | (schema_name 'import_source', table_name 't2') |
import_dest3 | t3 | loopback | (schema_name 'import_source', table_name 't3') |
import_dest3 | x 4 | loopback | (schema_name 'import_source', table_name 'x 4') |
import_dest3 | x 5 | loopback | (schema_name 'import_source', table_name 'x 5') |
(5 rows)
\d import_dest3.*
Foreign table "import_dest3.t1"
Column | Type | Modifiers | FDW Options
--------+-------------------+-----------+--------------------
c1 | integer | | (column_name 'c1')
c2 | character varying | | (column_name 'c2')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 't1')
Foreign table "import_dest3.t2"
Column | Type | Modifiers | FDW Options
--------+-------------------+-----------+--------------------
c1 | integer | | (column_name 'c1')
c2 | character varying | | (column_name 'c2')
c3 | text | | (column_name 'c3')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 't2')
Foreign table "import_dest3.t3"
Column | Type | Modifiers | FDW Options
--------+--------------------------+-----------+--------------------
c1 | timestamp with time zone | | (column_name 'c1')
c2 | typ1 | | (column_name 'c2')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 't3')
Foreign table "import_dest3.x 4"
Column | Type | Modifiers | FDW Options
--------+-----------------------+-----------+---------------------
c1 | double precision | | (column_name 'c1')
C 2 | text | | (column_name 'C 2')
c3 | character varying(42) | | (column_name 'c3')
Server: loopback
FDW Options: (schema_name 'import_source', table_name 'x 4')
Foreign table "import_dest3.x 5"
Column | Type | Modifiers | FDW Options
--------+------+-----------+-------------
Server: loopback
FDW Options: (schema_name 'import_source', table_name 'x 5')
-- Check LIMIT TO and EXCEPT
CREATE SCHEMA import_dest4;
IMPORT FOREIGN SCHEMA import_source LIMIT TO (t1, nonesuch)
FROM SERVER loopback INTO import_dest4;
\det+ import_dest4
List of foreign tables
Schema | Table | Server | FDW Options | Description
--------------+-------+----------+------------------------------------------------+-------------
import_dest4 | t1 | loopback | (schema_name 'import_source', table_name 't1') |
(1 row)
IMPORT FOREIGN SCHEMA import_source EXCEPT (t1, "x 4", nonesuch)
FROM SERVER loopback INTO import_dest4;
\det+ import_dest4
List of foreign tables
Schema | Table | Server | FDW Options | Description
--------------+-------+----------+-------------------------------------------------+-------------
import_dest4 | t1 | loopback | (schema_name 'import_source', table_name 't1') |
import_dest4 | t2 | loopback | (schema_name 'import_source', table_name 't2') |
import_dest4 | t3 | loopback | (schema_name 'import_source', table_name 't3') |
import_dest4 | x 5 | loopback | (schema_name 'import_source', table_name 'x 5') |
(4 rows)
-- Assorted error cases
IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_dest4;
ERROR: relation "t1" already exists
CONTEXT: importing foreign table "t1"
IMPORT FOREIGN SCHEMA nonesuch FROM SERVER loopback INTO import_dest4;
ERROR: schema "nonesuch" is not present on foreign server "loopback"
IMPORT FOREIGN SCHEMA nonesuch FROM SERVER loopback INTO notthere;
ERROR: schema "notthere" does not exist
IMPORT FOREIGN SCHEMA nonesuch FROM SERVER nowhere INTO notthere;
ERROR: server "nowhere" does not exist
-- Check case of a type present only on the remote server.
-- We can fake this by dropping the type locally in our transaction.
CREATE TYPE "Colors" AS ENUM ('red', 'green', 'blue');
CREATE TABLE import_source.t5 (c1 int, c2 text collate "C", "Col" "Colors");
CREATE SCHEMA import_dest5;
BEGIN;
DROP TYPE "Colors" CASCADE;
NOTICE: drop cascades to table import_source.t5 column Col
IMPORT FOREIGN SCHEMA import_source LIMIT TO (t5)
FROM SERVER loopback INTO import_dest5; -- ERROR
ERROR: type "public.Colors" does not exist
LINE 4: "Col" public."Colors" OPTIONS (column_name 'Col')
^
QUERY: CREATE FOREIGN TABLE t5 (
c1 integer OPTIONS (column_name 'c1'),
c2 text OPTIONS (column_name 'c2') COLLATE pg_catalog."C",
"Col" public."Colors" OPTIONS (column_name 'Col')
) SERVER loopback
OPTIONS (schema_name 'import_source', table_name 't5');
CONTEXT: importing foreign table "t5"
ROLLBACK;

View File

@ -285,6 +285,8 @@ static void postgresExplainForeignModify(ModifyTableState *mtstate,
static bool postgresAnalyzeForeignTable(Relation relation,
AcquireSampleRowsFunc *func,
BlockNumber *totalpages);
static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
Oid serverOid);
/*
* Helper functions
@ -362,6 +364,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for ANALYZE */
routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
/* Support functions for IMPORT FOREIGN SCHEMA */
routine->ImportForeignSchema = postgresImportForeignSchema;
PG_RETURN_POINTER(routine);
}
@ -2563,6 +2568,270 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
}
}
/*
* Import a foreign schema
*/
static List *
postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
{
List *commands = NIL;
bool import_collate = true;
bool import_default = false;
bool import_not_null = true;
ForeignServer *server;
UserMapping *mapping;
PGconn *conn;
StringInfoData buf;
PGresult *volatile res = NULL;
int numrows,
i;
ListCell *lc;
/* Parse statement options */
foreach(lc, stmt->options)
{
DefElem *def = (DefElem *) lfirst(lc);
if (strcmp(def->defname, "import_collate") == 0)
import_collate = defGetBoolean(def);
else if (strcmp(def->defname, "import_default") == 0)
import_default = defGetBoolean(def);
else if (strcmp(def->defname, "import_not_null") == 0)
import_not_null = defGetBoolean(def);
else
ereport(ERROR,
(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
errmsg("invalid option \"%s\"", def->defname)));
}
/*
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
conn = GetConnection(server, mapping, false);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
import_collate = false;
/* Create workspace for strings */
initStringInfo(&buf);
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
/* Check that the schema really exists */
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
res = PQexec(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
if (PQntuples(res) != 1)
ereport(ERROR,
(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
errmsg("schema \"%s\" is not present on foreign server \"%s\"",
stmt->remote_schema, server->servername)));
PQclear(res);
res = NULL;
resetStringInfo(&buf);
/*
* Fetch all table data from this schema, possibly restricted by
* EXCEPT or LIMIT TO.
*
* Note: because we run the connection with search_path restricted to
* pg_catalog, the format_type() and pg_get_expr() outputs will always
* include a schema name for types/functions in other schemas, which
* is what we want.
*/
if (import_collate)
appendStringInfoString(&buf,
"SELECT relname, "
" attname, "
" format_type(atttypid, atttypmod), "
" attnotnull, "
" pg_get_expr(adbin, adrelid), "
" collname, "
" collnsp.nspname "
"FROM pg_class c "
" JOIN pg_namespace n ON "
" relnamespace = n.oid "
" LEFT JOIN pg_attribute a ON "
" attrelid = c.oid AND attnum > 0 "
" AND NOT attisdropped "
" LEFT JOIN pg_attrdef ad ON "
" adrelid = c.oid AND adnum = attnum "
" LEFT JOIN pg_collation coll ON "
" coll.oid = attcollation "
" LEFT JOIN pg_namespace collnsp ON "
" collnsp.oid = collnamespace ");
else
appendStringInfoString(&buf,
"SELECT relname, "
" attname, "
" format_type(atttypid, atttypmod), "
" attnotnull, "
" pg_get_expr(adbin, adrelid), "
" NULL, NULL "
"FROM pg_class c "
" JOIN pg_namespace n ON "
" relnamespace = n.oid "
" LEFT JOIN pg_attribute a ON "
" attrelid = c.oid AND attnum > 0 "
" AND NOT attisdropped "
" LEFT JOIN pg_attrdef ad ON "
" adrelid = c.oid AND adnum = attnum ");
appendStringInfoString(&buf,
"WHERE c.relkind IN ('r', 'v', 'f', 'm') "
" AND n.nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
/* Apply restrictions for LIMIT TO and EXCEPT */
if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
{
bool first_item = true;
appendStringInfoString(&buf, " AND c.relname ");
if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
appendStringInfoString(&buf, "NOT ");
appendStringInfoString(&buf, "IN (");
/* Append list of table names within IN clause */
foreach(lc, stmt->table_list)
{
RangeVar *rv = (RangeVar *) lfirst(lc);
if (first_item)
first_item = false;
else
appendStringInfoString(&buf, ", ");
deparseStringLiteral(&buf, rv->relname);
}
appendStringInfoString(&buf, ")");
}
/* Append ORDER BY at the end of query to ensure output ordering */
appendStringInfo(&buf, " ORDER BY c.relname, a.attnum");
/* Fetch the data */
res = PQexec(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
/* Process results */
numrows = PQntuples(res);
/* note: incrementation of i happens in inner loop's while() test */
for (i = 0; i < numrows;)
{
char *tablename = PQgetvalue(res, i, 0);
bool first_item = true;
resetStringInfo(&buf);
appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
quote_identifier(tablename));
/* Scan all rows for this table */
do
{
char *attname;
char *typename;
char *attnotnull;
char *attdefault;
char *collname;
char *collnamespace;
/* If table has no columns, we'll see nulls here */
if (PQgetisnull(res, i, 1))
continue;
attname = PQgetvalue(res, i, 1);
typename = PQgetvalue(res, i, 2);
attnotnull = PQgetvalue(res, i, 3);
attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
PQgetvalue(res, i, 4);
collname = PQgetisnull(res, i, 5) ? (char *) NULL :
PQgetvalue(res, i, 5);
collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
PQgetvalue(res, i, 6);
if (first_item)
first_item = false;
else
appendStringInfoString(&buf, ",\n");
/* Print column name and type */
appendStringInfo(&buf, " %s %s",
quote_identifier(attname),
typename);
/*
* Add column_name option so that renaming the foreign table's
* column doesn't break the association to the underlying
* column.
*/
appendStringInfoString(&buf, " OPTIONS (column_name ");
deparseStringLiteral(&buf, attname);
appendStringInfoString(&buf, ")");
/* Add COLLATE if needed */
if (import_collate && collname != NULL && collnamespace != NULL)
appendStringInfo(&buf, " COLLATE %s.%s",
quote_identifier(collnamespace),
quote_identifier(collname));
/* Add DEFAULT if needed */
if (import_default && attdefault != NULL)
appendStringInfo(&buf, " DEFAULT %s", attdefault);
/* Add NOT NULL if needed */
if (import_not_null && attnotnull[0] == 't')
appendStringInfoString(&buf, " NOT NULL");
}
while (++i < numrows &&
strcmp(PQgetvalue(res, i, 0), tablename) == 0);
/*
* Add server name and table-level options. We specify remote
* schema and table name as options (the latter to ensure that
* renaming the foreign table doesn't break the association).
*/
appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
quote_identifier(server->servername));
appendStringInfoString(&buf, "schema_name ");
deparseStringLiteral(&buf, stmt->remote_schema);
appendStringInfoString(&buf, ", table_name ");
deparseStringLiteral(&buf, tablename);
appendStringInfoString(&buf, ");");
commands = lappend(commands, pstrdup(buf.data));
}
/* Clean up */
PQclear(res);
res = NULL;
}
PG_CATCH();
{
if (res)
PQclear(res);
PG_RE_THROW();
}
PG_END_TRY();
ReleaseConnection(conn);
return commands;
}
/*
* Create a tuple from the specified row of the PGresult.
*

View File

@ -73,5 +73,6 @@ extern void deparseDeleteSql(StringInfo buf, PlannerInfo *root,
extern void deparseAnalyzeSizeSql(StringInfo buf, Relation rel);
extern void deparseAnalyzeSql(StringInfo buf, Relation rel,
List **retrieved_attrs);
extern void deparseStringLiteral(StringInfo buf, const char *val);
#endif /* POSTGRES_FDW_H */

View File

@ -609,3 +609,60 @@ UPDATE rem1 SET f2 = 'testo';
-- Test returning a system attribute
INSERT INTO rem1(f2) VALUES ('test') RETURNING ctid;
-- ===================================================================
-- test IMPORT FOREIGN SCHEMA
-- ===================================================================
CREATE SCHEMA import_source;
CREATE TABLE import_source.t1 (c1 int, c2 varchar NOT NULL);
CREATE TABLE import_source.t2 (c1 int default 42, c2 varchar NULL, c3 text collate "POSIX");
CREATE TYPE typ1 AS (m1 int, m2 varchar);
CREATE TABLE import_source.t3 (c1 timestamptz default now(), c2 typ1);
CREATE TABLE import_source."x 4" (c1 float8, "C 2" text, c3 varchar(42));
CREATE TABLE import_source."x 5" (c1 float8);
ALTER TABLE import_source."x 5" DROP COLUMN c1;
CREATE SCHEMA import_dest1;
IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_dest1;
\det+ import_dest1
\d import_dest1.*
-- Options
CREATE SCHEMA import_dest2;
IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_dest2
OPTIONS (import_default 'true');
\det+ import_dest2
\d import_dest2.*
CREATE SCHEMA import_dest3;
IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_dest3
OPTIONS (import_collate 'false', import_not_null 'false');
\det+ import_dest3
\d import_dest3.*
-- Check LIMIT TO and EXCEPT
CREATE SCHEMA import_dest4;
IMPORT FOREIGN SCHEMA import_source LIMIT TO (t1, nonesuch)
FROM SERVER loopback INTO import_dest4;
\det+ import_dest4
IMPORT FOREIGN SCHEMA import_source EXCEPT (t1, "x 4", nonesuch)
FROM SERVER loopback INTO import_dest4;
\det+ import_dest4
-- Assorted error cases
IMPORT FOREIGN SCHEMA import_source FROM SERVER loopback INTO import_dest4;
IMPORT FOREIGN SCHEMA nonesuch FROM SERVER loopback INTO import_dest4;
IMPORT FOREIGN SCHEMA nonesuch FROM SERVER loopback INTO notthere;
IMPORT FOREIGN SCHEMA nonesuch FROM SERVER nowhere INTO notthere;
-- Check case of a type present only on the remote server.
-- We can fake this by dropping the type locally in our transaction.
CREATE TYPE "Colors" AS ENUM ('red', 'green', 'blue');
CREATE TABLE import_source.t5 (c1 int, c2 text collate "C", "Col" "Colors");
CREATE SCHEMA import_dest5;
BEGIN;
DROP TYPE "Colors" CASCADE;
IMPORT FOREIGN SCHEMA import_source LIMIT TO (t5)
FROM SERVER loopback INTO import_dest5; -- ERROR
ROLLBACK;