mirror of
https://github.com/postgres/postgres.git
synced 2025-06-14 18:42:34 +03:00
postgres_fdw: Allow fetch_size to be set per-table or per-server.
The default fetch size of 100 rows might not be right in every environment, so allow users to configure it. Corey Huinker, reviewed by Kyotaro Horiguchi, Andres Freund, and me.
This commit is contained in:
@ -3951,3 +3951,63 @@ QUERY: CREATE FOREIGN TABLE t5 (
|
|||||||
OPTIONS (schema_name 'import_source', table_name 't5');
|
OPTIONS (schema_name 'import_source', table_name 't5');
|
||||||
CONTEXT: importing foreign table "t5"
|
CONTEXT: importing foreign table "t5"
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
CREATE SERVER fetch101 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( fetch_size '101' );
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_foreign_server
|
||||||
|
WHERE srvname = 'fetch101'
|
||||||
|
AND srvoptions @> array['fetch_size=101'];
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER SERVER fetch101 OPTIONS( SET fetch_size '202' );
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_foreign_server
|
||||||
|
WHERE srvname = 'fetch101'
|
||||||
|
AND srvoptions @> array['fetch_size=101'];
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_foreign_server
|
||||||
|
WHERE srvname = 'fetch101'
|
||||||
|
AND srvoptions @> array['fetch_size=202'];
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE FOREIGN TABLE table30000 ( x int ) SERVER fetch101 OPTIONS ( fetch_size '30000' );
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM pg_foreign_table
|
||||||
|
WHERE ftrelid = 'table30000'::regclass
|
||||||
|
AND ftoptions @> array['fetch_size=30000'];
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER FOREIGN TABLE table30000 OPTIONS ( SET fetch_size '60000');
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM pg_foreign_table
|
||||||
|
WHERE ftrelid = 'table30000'::regclass
|
||||||
|
AND ftoptions @> array['fetch_size=30000'];
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM pg_foreign_table
|
||||||
|
WHERE ftrelid = 'table30000'::regclass
|
||||||
|
AND ftoptions @> array['fetch_size=60000'];
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
@ -131,6 +131,17 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
|
|||||||
/* check list syntax, warn about uninstalled extensions */
|
/* check list syntax, warn about uninstalled extensions */
|
||||||
(void) ExtractExtensionList(defGetString(def), true);
|
(void) ExtractExtensionList(defGetString(def), true);
|
||||||
}
|
}
|
||||||
|
else if (strcmp(def->defname, "fetch_size") == 0)
|
||||||
|
{
|
||||||
|
int fetch_size;
|
||||||
|
|
||||||
|
fetch_size = strtol(defGetString(def), NULL,10);
|
||||||
|
if (fetch_size <= 0)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
|
errmsg("%s requires a non-negative integer value",
|
||||||
|
def->defname)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
@ -162,6 +173,9 @@ InitPgFdwOptions(void)
|
|||||||
/* updatable is available on both server and table */
|
/* updatable is available on both server and table */
|
||||||
{"updatable", ForeignServerRelationId, false},
|
{"updatable", ForeignServerRelationId, false},
|
||||||
{"updatable", ForeignTableRelationId, false},
|
{"updatable", ForeignTableRelationId, false},
|
||||||
|
/* fetch_size is available on both server and table */
|
||||||
|
{"fetch_size", ForeignServerRelationId, false},
|
||||||
|
{"fetch_size", ForeignTableRelationId, false},
|
||||||
{NULL, InvalidOid, false}
|
{NULL, InvalidOid, false}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -68,7 +68,9 @@ enum FdwScanPrivateIndex
|
|||||||
/* SQL statement to execute remotely (as a String node) */
|
/* SQL statement to execute remotely (as a String node) */
|
||||||
FdwScanPrivateSelectSql,
|
FdwScanPrivateSelectSql,
|
||||||
/* Integer list of attribute numbers retrieved by the SELECT */
|
/* Integer list of attribute numbers retrieved by the SELECT */
|
||||||
FdwScanPrivateRetrievedAttrs
|
FdwScanPrivateRetrievedAttrs,
|
||||||
|
/* Integer representing the desired fetch_size */
|
||||||
|
FdwScanPrivateFetchSize
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -126,6 +128,8 @@ typedef struct PgFdwScanState
|
|||||||
/* working memory contexts */
|
/* working memory contexts */
|
||||||
MemoryContext batch_cxt; /* context holding current batch of tuples */
|
MemoryContext batch_cxt; /* context holding current batch of tuples */
|
||||||
MemoryContext temp_cxt; /* context for per-tuple temporary data */
|
MemoryContext temp_cxt; /* context for per-tuple temporary data */
|
||||||
|
|
||||||
|
int fetch_size; /* number of tuples per fetch */
|
||||||
} PgFdwScanState;
|
} PgFdwScanState;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -380,6 +384,7 @@ postgresGetForeignRelSize(PlannerInfo *root,
|
|||||||
fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
|
fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
|
||||||
fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
|
fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
|
||||||
fpinfo->shippable_extensions = NIL;
|
fpinfo->shippable_extensions = NIL;
|
||||||
|
fpinfo->fetch_size = 100;
|
||||||
|
|
||||||
foreach(lc, fpinfo->server->options)
|
foreach(lc, fpinfo->server->options)
|
||||||
{
|
{
|
||||||
@ -394,16 +399,17 @@ postgresGetForeignRelSize(PlannerInfo *root,
|
|||||||
else if (strcmp(def->defname, "extensions") == 0)
|
else if (strcmp(def->defname, "extensions") == 0)
|
||||||
fpinfo->shippable_extensions =
|
fpinfo->shippable_extensions =
|
||||||
ExtractExtensionList(defGetString(def), false);
|
ExtractExtensionList(defGetString(def), false);
|
||||||
|
else if (strcmp(def->defname, "fetch_size") == 0)
|
||||||
|
fpinfo->fetch_size = strtol(defGetString(def), NULL,10);
|
||||||
}
|
}
|
||||||
foreach(lc, fpinfo->table->options)
|
foreach(lc, fpinfo->table->options)
|
||||||
{
|
{
|
||||||
DefElem *def = (DefElem *) lfirst(lc);
|
DefElem *def = (DefElem *) lfirst(lc);
|
||||||
|
|
||||||
if (strcmp(def->defname, "use_remote_estimate") == 0)
|
if (strcmp(def->defname, "use_remote_estimate") == 0)
|
||||||
{
|
|
||||||
fpinfo->use_remote_estimate = defGetBoolean(def);
|
fpinfo->use_remote_estimate = defGetBoolean(def);
|
||||||
break; /* only need the one value */
|
else if (strcmp(def->defname, "fetch_size") == 0)
|
||||||
}
|
fpinfo->fetch_size = strtol(defGetString(def), NULL,10);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1012,6 +1018,9 @@ postgresGetForeignPlan(PlannerInfo *root,
|
|||||||
*/
|
*/
|
||||||
fdw_private = list_make2(makeString(sql.data),
|
fdw_private = list_make2(makeString(sql.data),
|
||||||
retrieved_attrs);
|
retrieved_attrs);
|
||||||
|
fdw_private = list_make3(makeString(sql.data),
|
||||||
|
retrieved_attrs,
|
||||||
|
makeInteger(fpinfo->fetch_size));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Create the ForeignScan node from target list, filtering expressions,
|
* Create the ForeignScan node from target list, filtering expressions,
|
||||||
@ -1088,6 +1097,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
|
|||||||
FdwScanPrivateSelectSql));
|
FdwScanPrivateSelectSql));
|
||||||
fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
|
fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
|
||||||
FdwScanPrivateRetrievedAttrs);
|
FdwScanPrivateRetrievedAttrs);
|
||||||
|
fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
|
||||||
|
FdwScanPrivateFetchSize));
|
||||||
|
|
||||||
/* Create contexts for batches of tuples and per-tuple temp workspace. */
|
/* Create contexts for batches of tuples and per-tuple temp workspace. */
|
||||||
fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
|
fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
|
||||||
@ -2214,15 +2225,11 @@ fetch_more_data(ForeignScanState *node)
|
|||||||
{
|
{
|
||||||
PGconn *conn = fsstate->conn;
|
PGconn *conn = fsstate->conn;
|
||||||
char sql[64];
|
char sql[64];
|
||||||
int fetch_size;
|
|
||||||
int numrows;
|
int numrows;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
/* The fetch size is arbitrary, but shouldn't be enormous. */
|
|
||||||
fetch_size = 100;
|
|
||||||
|
|
||||||
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
|
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
|
||||||
fetch_size, fsstate->cursor_number);
|
fsstate->fetch_size, fsstate->cursor_number);
|
||||||
|
|
||||||
res = PQexec(conn, sql);
|
res = PQexec(conn, sql);
|
||||||
/* On error, report the original query, not the FETCH. */
|
/* On error, report the original query, not the FETCH. */
|
||||||
@ -2250,7 +2257,7 @@ fetch_more_data(ForeignScanState *node)
|
|||||||
fsstate->fetch_ct_2++;
|
fsstate->fetch_ct_2++;
|
||||||
|
|
||||||
/* Must be EOF if we didn't get as many tuples as we asked for. */
|
/* Must be EOF if we didn't get as many tuples as we asked for. */
|
||||||
fsstate->eof_reached = (numrows < fetch_size);
|
fsstate->eof_reached = (numrows < fsstate->fetch_size);
|
||||||
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
res = NULL;
|
res = NULL;
|
||||||
@ -2563,6 +2570,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
|||||||
{
|
{
|
||||||
PgFdwAnalyzeState astate;
|
PgFdwAnalyzeState astate;
|
||||||
ForeignTable *table;
|
ForeignTable *table;
|
||||||
|
ForeignServer *server;
|
||||||
UserMapping *user;
|
UserMapping *user;
|
||||||
PGconn *conn;
|
PGconn *conn;
|
||||||
unsigned int cursor_number;
|
unsigned int cursor_number;
|
||||||
@ -2593,6 +2601,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
|||||||
* owner, even if the ANALYZE was started by some other user.
|
* owner, even if the ANALYZE was started by some other user.
|
||||||
*/
|
*/
|
||||||
table = GetForeignTable(RelationGetRelid(relation));
|
table = GetForeignTable(RelationGetRelid(relation));
|
||||||
|
server = GetForeignServer(table->serverid);
|
||||||
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
|
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
|
||||||
conn = GetConnection(user, false);
|
conn = GetConnection(user, false);
|
||||||
|
|
||||||
@ -2620,6 +2629,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
|||||||
int fetch_size;
|
int fetch_size;
|
||||||
int numrows;
|
int numrows;
|
||||||
int i;
|
int i;
|
||||||
|
ListCell *lc;
|
||||||
|
|
||||||
/* Allow users to cancel long query */
|
/* Allow users to cancel long query */
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
@ -2632,6 +2642,26 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
|||||||
|
|
||||||
/* The fetch size is arbitrary, but shouldn't be enormous. */
|
/* The fetch size is arbitrary, but shouldn't be enormous. */
|
||||||
fetch_size = 100;
|
fetch_size = 100;
|
||||||
|
foreach(lc, server->options)
|
||||||
|
{
|
||||||
|
DefElem *def = (DefElem *) lfirst(lc);
|
||||||
|
|
||||||
|
if (strcmp(def->defname, "fetch_size") == 0)
|
||||||
|
{
|
||||||
|
fetch_size = strtol(defGetString(def), NULL,10);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
foreach(lc, table->options)
|
||||||
|
{
|
||||||
|
DefElem *def = (DefElem *) lfirst(lc);
|
||||||
|
|
||||||
|
if (strcmp(def->defname, "fetch_size") == 0)
|
||||||
|
{
|
||||||
|
fetch_size = strtol(defGetString(def), NULL,10);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Fetch some rows */
|
/* Fetch some rows */
|
||||||
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
|
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
|
||||||
|
@ -53,6 +53,8 @@ typedef struct PgFdwRelationInfo
|
|||||||
ForeignTable *table;
|
ForeignTable *table;
|
||||||
ForeignServer *server;
|
ForeignServer *server;
|
||||||
UserMapping *user; /* only set in use_remote_estimate mode */
|
UserMapping *user; /* only set in use_remote_estimate mode */
|
||||||
|
|
||||||
|
int fetch_size; /* fetch size for this remote table */
|
||||||
} PgFdwRelationInfo;
|
} PgFdwRelationInfo;
|
||||||
|
|
||||||
/* in postgres_fdw.c */
|
/* in postgres_fdw.c */
|
||||||
|
@ -919,4 +919,48 @@ BEGIN;
|
|||||||
DROP TYPE "Colors" CASCADE;
|
DROP TYPE "Colors" CASCADE;
|
||||||
IMPORT FOREIGN SCHEMA import_source LIMIT TO (t5)
|
IMPORT FOREIGN SCHEMA import_source LIMIT TO (t5)
|
||||||
FROM SERVER loopback INTO import_dest5; -- ERROR
|
FROM SERVER loopback INTO import_dest5; -- ERROR
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE SERVER fetch101 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( fetch_size '101' );
|
||||||
|
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_foreign_server
|
||||||
|
WHERE srvname = 'fetch101'
|
||||||
|
AND srvoptions @> array['fetch_size=101'];
|
||||||
|
|
||||||
|
ALTER SERVER fetch101 OPTIONS( SET fetch_size '202' );
|
||||||
|
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_foreign_server
|
||||||
|
WHERE srvname = 'fetch101'
|
||||||
|
AND srvoptions @> array['fetch_size=101'];
|
||||||
|
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_foreign_server
|
||||||
|
WHERE srvname = 'fetch101'
|
||||||
|
AND srvoptions @> array['fetch_size=202'];
|
||||||
|
|
||||||
|
CREATE FOREIGN TABLE table30000 ( x int ) SERVER fetch101 OPTIONS ( fetch_size '30000' );
|
||||||
|
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM pg_foreign_table
|
||||||
|
WHERE ftrelid = 'table30000'::regclass
|
||||||
|
AND ftoptions @> array['fetch_size=30000'];
|
||||||
|
|
||||||
|
ALTER FOREIGN TABLE table30000 OPTIONS ( SET fetch_size '60000');
|
||||||
|
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM pg_foreign_table
|
||||||
|
WHERE ftrelid = 'table30000'::regclass
|
||||||
|
AND ftoptions @> array['fetch_size=30000'];
|
||||||
|
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM pg_foreign_table
|
||||||
|
WHERE ftrelid = 'table30000'::regclass
|
||||||
|
AND ftoptions @> array['fetch_size=60000'];
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
@ -290,10 +290,6 @@
|
|||||||
be considered shippable to the remote server.
|
be considered shippable to the remote server.
|
||||||
This option can only be specified for foreign servers, not per-table.
|
This option can only be specified for foreign servers, not per-table.
|
||||||
</para>
|
</para>
|
||||||
</listitem>
|
|
||||||
</varlistentry>
|
|
||||||
|
|
||||||
</variablelist>
|
|
||||||
|
|
||||||
<para>
|
<para>
|
||||||
When using the <literal>extensions</literal> option, <emphasis>it is the
|
When using the <literal>extensions</literal> option, <emphasis>it is the
|
||||||
@ -301,6 +297,23 @@
|
|||||||
identically on both the local and remote servers. Otherwise, remote
|
identically on both the local and remote servers. Otherwise, remote
|
||||||
queries may fail or behave unexpectedly.
|
queries may fail or behave unexpectedly.
|
||||||
</para>
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
|
||||||
|
<varlistentry>
|
||||||
|
<term><literal>fetch_size</literal></term>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
This option specifies the number of rows <filename>postgres_fdw</>
|
||||||
|
should get in each fetch operation. It can be specified for a foreign
|
||||||
|
table or a foreign server. The option specified on a table overrides
|
||||||
|
an option specified for the server.
|
||||||
|
The default is <literal>100</>.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
|
||||||
|
</variablelist>
|
||||||
|
|
||||||
</sect3>
|
</sect3>
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user