diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 3cf7b4eb1e0..6faf499f9a6 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -1705,13 +1705,16 @@ deparseRangeTblRef(StringInfo buf, PlannerInfo *root, RelOptInfo *foreignrel, * The statement text is appended to buf, and we also create an integer List * of the columns being retrieved by WITH CHECK OPTION or RETURNING (if any), * which is returned to *retrieved_attrs. + * + * This also stores end position of the VALUES clause, so that we can rebuild + * an INSERT for a batch of rows later. */ void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, - List **retrieved_attrs) + List **retrieved_attrs, int *values_end_len) { AttrNumber pindex; bool first; @@ -1754,6 +1757,7 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, } else appendStringInfoString(buf, " DEFAULT VALUES"); + *values_end_len = buf->len; if (doNothing) appendStringInfoString(buf, " ON CONFLICT DO NOTHING"); @@ -1763,6 +1767,54 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * rebuild remote INSERT statement + * + * Provided a number of rows in a batch, builds INSERT statement with the + * right number of parameters. + */ +void +rebuildInsertSql(StringInfo buf, char *orig_query, + int values_end_len, int num_cols, + int num_rows) +{ + int i, j; + int pindex; + bool first; + + /* Make sure the values_end_len is sensible */ + Assert((values_end_len > 0) && (values_end_len <= strlen(orig_query))); + + /* Copy up to the end of the first record from the original query */ + appendBinaryStringInfo(buf, orig_query, values_end_len); + + /* + * Add records to VALUES clause (we already have parameters for the + * first row, so start at the right offset). + */ + pindex = num_cols + 1; + for (i = 0; i < num_rows; i++) + { + appendStringInfoString(buf, ", ("); + + first = true; + for (j = 0; j < num_cols; j++) + { + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + appendStringInfo(buf, "$%d", pindex); + pindex++; + } + + appendStringInfoChar(buf, ')'); + } + + /* Copy stuff after VALUES clause from the original query */ + appendStringInfoString(buf, orig_query + values_end_len); +} + /* * deparse remote UPDATE statement * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 1cad3114364..b4a04d2c143 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -3887,9 +3887,10 @@ EXPLAIN (VERBOSE, COSTS OFF) EXECUTE st7; ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Insert on public.ft1 Remote SQL: INSERT INTO "S 1"."T 1"("C 1", c2, c3, c4, c5, c6, c7, c8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + Batch Size: 1 -> Result Output: NULL::integer, 1001, 101, 'foo'::text, NULL::timestamp with time zone, NULL::timestamp without time zone, NULL::character varying, 'ft1 '::character(10), NULL::user_enum -(4 rows) +(5 rows) ALTER TABLE "S 1"."T 1" RENAME TO "T 0"; ALTER FOREIGN TABLE ft1 OPTIONS (SET table_name 'T 0'); @@ -3920,9 +3921,10 @@ EXPLAIN (VERBOSE, COSTS OFF) EXECUTE st7; ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Insert on public.ft1 Remote SQL: INSERT INTO "S 1"."T 0"("C 1", c2, c3, c4, c5, c6, c7, c8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + Batch Size: 1 -> Result Output: NULL::integer, 1001, 101, 'foo'::text, NULL::timestamp with time zone, NULL::timestamp without time zone, NULL::character varying, 'ft1 '::character(10), NULL::user_enum -(4 rows) +(5 rows) ALTER TABLE "S 1"."T 0" RENAME TO "T 1"; ALTER FOREIGN TABLE ft1 OPTIONS (SET table_name 'T 1'); @@ -4244,12 +4246,13 @@ INSERT INTO ft2 (c1,c2,c3) SELECT c1+1000,c2+100, c3 || c3 FROM ft2 LIMIT 20; -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Insert on public.ft2 Remote SQL: INSERT INTO "S 1"."T 1"("C 1", c2, c3, c4, c5, c6, c7, c8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + Batch Size: 1 -> Subquery Scan on "*SELECT*" Output: "*SELECT*"."?column?", "*SELECT*"."?column?_1", NULL::integer, "*SELECT*"."?column?_2", NULL::timestamp with time zone, NULL::timestamp without time zone, NULL::character varying, 'ft2 '::character(10), NULL::user_enum -> Foreign Scan on public.ft2 ft2_1 Output: (ft2_1.c1 + 1000), (ft2_1.c2 + 100), (ft2_1.c3 || ft2_1.c3) Remote SQL: SELECT "C 1", c2, c3 FROM "S 1"."T 1" LIMIT 20::bigint -(7 rows) +(8 rows) INSERT INTO ft2 (c1,c2,c3) SELECT c1+1000,c2+100, c3 || c3 FROM ft2 LIMIT 20; INSERT INTO ft2 (c1,c2,c3) @@ -5360,9 +5363,10 @@ INSERT INTO ft2 (c1,c2,c3) VALUES (1200,999,'foo') RETURNING tableoid::regclass; Insert on public.ft2 Output: (ft2.tableoid)::regclass Remote SQL: INSERT INTO "S 1"."T 1"("C 1", c2, c3, c4, c5, c6, c7, c8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + Batch Size: 1 -> Result Output: 1200, 999, NULL::integer, 'foo'::text, NULL::timestamp with time zone, NULL::timestamp without time zone, NULL::character varying, 'ft2 '::character(10), NULL::user_enum -(5 rows) +(6 rows) INSERT INTO ft2 (c1,c2,c3) VALUES (1200,999,'foo') RETURNING tableoid::regclass; tableoid @@ -6212,9 +6216,10 @@ INSERT INTO rw_view VALUES (0, 5); -------------------------------------------------------------------------------- Insert on public.foreign_tbl Remote SQL: INSERT INTO public.base_tbl(a, b) VALUES ($1, $2) RETURNING a, b + Batch Size: 1 -> Result Output: 0, 5 -(4 rows) +(5 rows) INSERT INTO rw_view VALUES (0, 5); -- should fail ERROR: new row violates check option for view "rw_view" @@ -6225,9 +6230,10 @@ INSERT INTO rw_view VALUES (0, 15); -------------------------------------------------------------------------------- Insert on public.foreign_tbl Remote SQL: INSERT INTO public.base_tbl(a, b) VALUES ($1, $2) RETURNING a, b + Batch Size: 1 -> Result Output: 0, 15 -(4 rows) +(5 rows) INSERT INTO rw_view VALUES (0, 15); -- ok SELECT * FROM foreign_tbl; @@ -8923,7 +8929,7 @@ DO $d$ END; $d$; ERROR: invalid option "password" -HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size +HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')" PL/pgSQL function inline_code_block line 3 at EXECUTE -- If we add a password for our user mapping instead, we should get a different @@ -9112,3 +9118,138 @@ SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; loopback2 | t (1 row) +-- =================================================================== +-- batch insert +-- =================================================================== +BEGIN; +CREATE SERVER batch10 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( batch_size '10' ); +SELECT count(*) +FROM pg_foreign_server +WHERE srvname = 'batch10' +AND srvoptions @> array['batch_size=10']; + count +------- + 1 +(1 row) + +ALTER SERVER batch10 OPTIONS( SET batch_size '20' ); +SELECT count(*) +FROM pg_foreign_server +WHERE srvname = 'batch10' +AND srvoptions @> array['batch_size=10']; + count +------- + 0 +(1 row) + +SELECT count(*) +FROM pg_foreign_server +WHERE srvname = 'batch10' +AND srvoptions @> array['batch_size=20']; + count +------- + 1 +(1 row) + +CREATE FOREIGN TABLE table30 ( x int ) SERVER batch10 OPTIONS ( batch_size '30' ); +SELECT COUNT(*) +FROM pg_foreign_table +WHERE ftrelid = 'table30'::regclass +AND ftoptions @> array['batch_size=30']; + count +------- + 1 +(1 row) + +ALTER FOREIGN TABLE table30 OPTIONS ( SET batch_size '40'); +SELECT COUNT(*) +FROM pg_foreign_table +WHERE ftrelid = 'table30'::regclass +AND ftoptions @> array['batch_size=30']; + count +------- + 0 +(1 row) + +SELECT COUNT(*) +FROM pg_foreign_table +WHERE ftrelid = 'table30'::regclass +AND ftoptions @> array['batch_size=40']; + count +------- + 1 +(1 row) + +ROLLBACK; +CREATE TABLE batch_table ( x int ); +CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '10' ); +EXPLAIN (VERBOSE, COSTS OFF) INSERT INTO ftable SELECT * FROM generate_series(1, 10) i; + QUERY PLAN +------------------------------------------------------------- + Insert on public.ftable + Remote SQL: INSERT INTO public.batch_table(x) VALUES ($1) + Batch Size: 10 + -> Function Scan on pg_catalog.generate_series i + Output: i.i + Function Call: generate_series(1, 10) +(6 rows) + +INSERT INTO ftable SELECT * FROM generate_series(1, 10) i; +INSERT INTO ftable SELECT * FROM generate_series(11, 31) i; +INSERT INTO ftable VALUES (32); +INSERT INTO ftable VALUES (33), (34); +SELECT COUNT(*) FROM ftable; + count +------- + 34 +(1 row) + +TRUNCATE batch_table; +DROP FOREIGN TABLE ftable; +-- Disable batch insert +CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '1' ); +EXPLAIN (VERBOSE, COSTS OFF) INSERT INTO ftable VALUES (1), (2); + QUERY PLAN +------------------------------------------------------------- + Insert on public.ftable + Remote SQL: INSERT INTO public.batch_table(x) VALUES ($1) + Batch Size: 1 + -> Values Scan on "*VALUES*" + Output: "*VALUES*".column1 +(5 rows) + +INSERT INTO ftable VALUES (1), (2); +SELECT COUNT(*) FROM ftable; + count +------- + 2 +(1 row) + +DROP FOREIGN TABLE ftable; +DROP TABLE batch_table; +-- Use partitioning +CREATE TABLE batch_table ( x int ) PARTITION BY HASH (x); +CREATE TABLE batch_table_p0 (LIKE batch_table); +CREATE FOREIGN TABLE batch_table_p0f + PARTITION OF batch_table + FOR VALUES WITH (MODULUS 3, REMAINDER 0) + SERVER loopback + OPTIONS (table_name 'batch_table_p0', batch_size '10'); +CREATE TABLE batch_table_p1 (LIKE batch_table); +CREATE FOREIGN TABLE batch_table_p1f + PARTITION OF batch_table + FOR VALUES WITH (MODULUS 3, REMAINDER 1) + SERVER loopback + OPTIONS (table_name 'batch_table_p1', batch_size '1'); +CREATE TABLE batch_table_p2 + PARTITION OF batch_table + FOR VALUES WITH (MODULUS 3, REMAINDER 2); +INSERT INTO batch_table SELECT * FROM generate_series(1, 66) i; +SELECT COUNT(*) FROM batch_table; + count +------- + 66 +(1 row) + +-- Clean up +DROP TABLE batch_table CASCADE; diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 1fec3c3eeac..64698c4da3a 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -142,6 +142,17 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) errmsg("%s requires a non-negative integer value", def->defname))); } + else if (strcmp(def->defname, "batch_size") == 0) + { + int batch_size; + + batch_size = strtol(defGetString(def), NULL, 10); + if (batch_size <= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s requires a non-negative integer value", + def->defname))); + } else if (strcmp(def->defname, "password_required") == 0) { bool pw_required = defGetBoolean(def); @@ -203,6 +214,9 @@ InitPgFdwOptions(void) /* fetch_size is available on both server and table */ {"fetch_size", ForeignServerRelationId, false}, {"fetch_size", ForeignTableRelationId, false}, + /* batch_size is available on both server and table */ + {"batch_size", ForeignServerRelationId, false}, + {"batch_size", ForeignTableRelationId, false}, {"password_required", UserMappingRelationId, false}, /* diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 2f2d4d171c1..9a31bbb86b2 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -87,8 +87,10 @@ enum FdwScanPrivateIndex * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server * 2) Integer list of target attribute numbers for INSERT/UPDATE * (NIL for a DELETE) - * 3) Boolean flag showing if the remote query has a RETURNING clause - * 4) Integer list of attribute numbers retrieved by RETURNING, if any + * 3) Length till the end of VALUES clause for INSERT + * (-1 for a DELETE/UPDATE) + * 4) Boolean flag showing if the remote query has a RETURNING clause + * 5) Integer list of attribute numbers retrieved by RETURNING, if any */ enum FdwModifyPrivateIndex { @@ -96,6 +98,8 @@ enum FdwModifyPrivateIndex FdwModifyPrivateUpdateSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, + /* Length till the end of VALUES clause (as an integer Value node) */ + FdwModifyPrivateLen, /* has-returning flag (as an integer Value node) */ FdwModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ @@ -176,7 +180,10 @@ typedef struct PgFdwModifyState /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ + char *orig_query; /* original text of INSERT command */ List *target_attrs; /* list of target attribute numbers */ + int values_end; /* length up to the end of VALUES */ + int batch_size; /* value of FDW option "batch_size" */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ @@ -185,6 +192,9 @@ typedef struct PgFdwModifyState int p_nums; /* number of parameters to transmit */ FmgrInfo *p_flinfo; /* output conversion functions for them */ + /* batch operation stuff */ + int num_slots; /* number of slots to insert */ + /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -343,6 +353,12 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); +static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo); static TupleTableSlot *postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, @@ -429,20 +445,24 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, Plan *subplan, char *query, List *target_attrs, + int len, bool has_returning, List *retrieved_attrs); -static TupleTableSlot *execute_foreign_modify(EState *estate, +static TupleTableSlot **execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, - TupleTableSlot *slot, - TupleTableSlot *planSlot); + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, - TupleTableSlot *slot); + TupleTableSlot **slots, + int numSlots); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); static void finish_foreign_modify(PgFdwModifyState *fmstate); +static void deallocate_query(PgFdwModifyState *fmstate); static List *build_remote_returning(Index rtindex, Relation rel, List *returningList); static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); @@ -505,6 +525,7 @@ static void apply_table_options(PgFdwRelationInfo *fpinfo); static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); +static int get_batch_size_option(Relation rel); /* @@ -530,6 +551,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->PlanForeignModify = postgresPlanForeignModify; routine->BeginForeignModify = postgresBeginForeignModify; routine->ExecForeignInsert = postgresExecForeignInsert; + routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert; + routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; @@ -1665,6 +1688,7 @@ postgresPlanForeignModify(PlannerInfo *root, List *returningList = NIL; List *retrieved_attrs = NIL; bool doNothing = false; + int values_end_len = -1; initStringInfo(&sql); @@ -1752,7 +1776,7 @@ postgresPlanForeignModify(PlannerInfo *root, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, withCheckOptionList, returningList, - &retrieved_attrs); + &retrieved_attrs, &values_end_len); break; case CMD_UPDATE: deparseUpdateSql(&sql, rte, resultRelation, rel, @@ -1776,8 +1800,9 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - return list_make4(makeString(sql.data), + return list_make5(makeString(sql.data), targetAttrs, + makeInteger(values_end_len), makeInteger((retrieved_attrs != NIL)), retrieved_attrs); } @@ -1797,6 +1822,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, char *query; List *target_attrs; bool has_returning; + int values_end_len; List *retrieved_attrs; RangeTblEntry *rte; @@ -1812,6 +1838,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate, FdwModifyPrivateUpdateSql)); target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums); + values_end_len = intVal(list_nth(fdw_private, + FdwModifyPrivateLen)); has_returning = intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning)); retrieved_attrs = (List *) list_nth(fdw_private, @@ -1829,6 +1857,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, mtstate->mt_plans[subplan_index]->plan, query, target_attrs, + values_end_len, has_returning, retrieved_attrs); @@ -1846,7 +1875,8 @@ postgresExecForeignInsert(EState *estate, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - TupleTableSlot *rslot; + TupleTableSlot **rslot; + int numSlots = 1; /* * If the fmstate has aux_fmstate set, use the aux_fmstate (see @@ -1855,7 +1885,36 @@ postgresExecForeignInsert(EState *estate, if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate->aux_fmstate; rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, - slot, planSlot); + &slot, &planSlot, &numSlots); + /* Revert that change */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate; + + return rslot ? *rslot : NULL; +} + +/* + * postgresExecForeignBatchInsert + * Insert multiple rows into a foreign table + */ +static TupleTableSlot ** +postgresExecForeignBatchInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + TupleTableSlot **rslot; + + /* + * If the fmstate has aux_fmstate set, use the aux_fmstate (see + * postgresBeginForeignInsert()) + */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate->aux_fmstate; + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, + slots, planSlots, numSlots); /* Revert that change */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate; @@ -1863,6 +1922,42 @@ postgresExecForeignInsert(EState *estate, return rslot; } +/* + * postgresGetForeignModifyBatchSize + * Determine the maximum number of tuples that can be inserted in bulk + * + * Returns the batch size specified for server or table. When batching is not + * allowed (e.g. for tables with AFTER ROW triggers or with RETURNING clause), + * returns 1. + */ +static int +postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo) +{ + int batch_size; + + /* should be called only once */ + Assert(resultRelInfo->ri_BatchSize == 0); + + /* + * In EXPLAIN without ANALYZE, ri_fdwstate is NULL, so we have to lookup + * the option directly in server/table options. Otherwise just use the + * value we determined earlier. + */ + if (resultRelInfo->ri_FdwState) + batch_size = ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->batch_size; + else + batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc); + + /* Disable batching when we have to use RETURNING. */ + if (resultRelInfo->ri_projectReturning != NULL || + (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_after_row)) + return 1; + + /* Otherwise use the batch size specified for server/table. */ + return batch_size; +} + /* * postgresExecForeignUpdate * Update one row in a foreign table @@ -1873,8 +1968,13 @@ postgresExecForeignUpdate(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, - slot, planSlot); + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, + &slot, &planSlot, &numSlots); + + return rslot ? rslot[0] : NULL; } /* @@ -1887,8 +1987,13 @@ postgresExecForeignDelete(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, - slot, planSlot); + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, + &slot, &planSlot, &numSlots); + + return rslot ? rslot[0] : NULL; } /* @@ -1925,6 +2030,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RangeTblEntry *rte; TupleDesc tupdesc = RelationGetDescr(rel); int attnum; + int values_end_len; StringInfoData sql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; @@ -2001,7 +2107,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, resultRelInfo->ri_WithCheckOptions, resultRelInfo->ri_returningList, - &retrieved_attrs); + &retrieved_attrs, &values_end_len); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -2011,6 +2117,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, NULL, sql.data, targetAttrs, + values_end_len, retrieved_attrs != NIL, retrieved_attrs); @@ -2636,6 +2743,13 @@ postgresExplainForeignModify(ModifyTableState *mtstate, FdwModifyPrivateUpdateSql)); ExplainPropertyText("Remote SQL", sql, es); + + /* + * For INSERT we should always have batch size >= 1, but UPDATE + * and DELETE don't support batching so don't show the property. + */ + if (rinfo->ri_BatchSize > 0) + ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es); } } @@ -3530,6 +3644,7 @@ create_foreign_modify(EState *estate, Plan *subplan, char *query, List *target_attrs, + int values_end, bool has_returning, List *retrieved_attrs) { @@ -3564,7 +3679,10 @@ create_foreign_modify(EState *estate, /* Set up remote query information. */ fmstate->query = query; + if (operation == CMD_INSERT) + fmstate->orig_query = pstrdup(fmstate->query); fmstate->target_attrs = target_attrs; + fmstate->values_end = values_end; fmstate->has_returning = has_returning; fmstate->retrieved_attrs = retrieved_attrs; @@ -3616,6 +3734,12 @@ create_foreign_modify(EState *estate, Assert(fmstate->p_nums <= n_params); + /* Set batch_size from foreign server/table options. */ + if (operation == CMD_INSERT) + fmstate->batch_size = get_batch_size_option(rel); + + fmstate->num_slots = 1; + /* Initialize auxiliary state */ fmstate->aux_fmstate = NULL; @@ -3626,26 +3750,48 @@ create_foreign_modify(EState *estate, * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING * result if any. (This is the shared guts of postgresExecForeignInsert, - * postgresExecForeignUpdate, and postgresExecForeignDelete.) + * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and + * postgresExecForeignDelete.) */ -static TupleTableSlot * +static TupleTableSlot ** execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, - TupleTableSlot *slot, - TupleTableSlot *planSlot) + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; ItemPointer ctid = NULL; const char **p_values; PGresult *res; int n_rows; + StringInfoData sql; /* The operation should be INSERT, UPDATE, or DELETE */ Assert(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE); + /* + * If the existing query was deparsed and prepared for a different number + * of rows, rebuild it for the proper number. + */ + if (operation == CMD_INSERT && fmstate->num_slots != *numSlots) + { + /* Destroy the prepared statement created previously */ + if (fmstate->p_name) + deallocate_query(fmstate); + + /* Build INSERT string with numSlots records in its VALUES clause. */ + initStringInfo(&sql); + rebuildInsertSql(&sql, fmstate->orig_query, fmstate->values_end, + fmstate->p_nums, *numSlots - 1); + pfree(fmstate->query); + fmstate->query = sql.data; + fmstate->num_slots = *numSlots; + } + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -3658,7 +3804,7 @@ execute_foreign_modify(EState *estate, Datum datum; bool isNull; - datum = ExecGetJunkAttribute(planSlot, + datum = ExecGetJunkAttribute(planSlots[0], fmstate->ctidAttno, &isNull); /* shouldn't ever get a null result... */ @@ -3668,14 +3814,14 @@ execute_foreign_modify(EState *estate, } /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, ctid, slot); + p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); /* * Execute the prepared statement. */ if (!PQsendQueryPrepared(fmstate->conn, fmstate->p_name, - fmstate->p_nums, + fmstate->p_nums * (*numSlots), p_values, NULL, NULL, @@ -3696,9 +3842,10 @@ execute_foreign_modify(EState *estate, /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { + Assert(*numSlots == 1); n_rows = PQntuples(res); if (n_rows > 0) - store_returning_result(fmstate, slot, res); + store_returning_result(fmstate, slots[0], res); } else n_rows = atoi(PQcmdTuples(res)); @@ -3708,10 +3855,12 @@ execute_foreign_modify(EState *estate, MemoryContextReset(fmstate->temp_cxt); + *numSlots = n_rows; + /* * Return NULL if nothing was inserted/updated/deleted on the remote end */ - return (n_rows > 0) ? slot : NULL; + return (n_rows > 0) ? slots : NULL; } /* @@ -3771,52 +3920,64 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, - TupleTableSlot *slot) + TupleTableSlot **slots, + int numSlots) { const char **p_values; + int i; + int j; int pindex = 0; MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); - p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); + p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots); + + /* ctid is provided only for UPDATE/DELETE, which don't allow batching */ + Assert(!(tupleid != NULL && numSlots > 1)); /* 1st parameter should be ctid, if it's in use */ if (tupleid != NULL) { + Assert(numSlots == 1); /* don't need set_transmission_modes for TID output */ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); pindex++; } - /* get following parameters from slot */ - if (slot != NULL && fmstate->target_attrs != NIL) + /* get following parameters from slots */ + if (slots != NULL && fmstate->target_attrs != NIL) { int nestlevel; ListCell *lc; nestlevel = set_transmission_modes(); - foreach(lc, fmstate->target_attrs) + for (i = 0; i < numSlots; i++) { - int attnum = lfirst_int(lc); - Datum value; - bool isnull; + j = (tupleid != NULL) ? 1 : 0; + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Datum value; + bool isnull; - value = slot_getattr(slot, attnum, &isnull); - if (isnull) - p_values[pindex] = NULL; - else - p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], - value); - pindex++; + value = slot_getattr(slots[i], attnum, &isnull); + if (isnull) + p_values[pindex] = NULL; + else + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j], + value); + pindex++; + j++; + } } reset_transmission_modes(nestlevel); } - Assert(pindex == fmstate->p_nums); + Assert(pindex == fmstate->p_nums * numSlots); MemoryContextSwitchTo(oldcontext); @@ -3870,29 +4031,41 @@ finish_foreign_modify(PgFdwModifyState *fmstate) Assert(fmstate != NULL); /* If we created a prepared statement, destroy it */ - if (fmstate->p_name) - { - char sql[64]; - PGresult *res; - - snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); - - /* - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ - res = pgfdw_exec_query(fmstate->conn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); - PQclear(res); - fmstate->p_name = NULL; - } + deallocate_query(fmstate); /* Release remote connection */ ReleaseConnection(fmstate->conn); fmstate->conn = NULL; } +/* + * deallocate_query + * Deallocate a prepared statement for a foreign insert/update/delete + * operation + */ +static void +deallocate_query(PgFdwModifyState *fmstate) +{ + char sql[64]; + PGresult *res; + + /* do nothing if the query is not allocated */ + if (!fmstate->p_name) + return; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(fmstate->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + PQclear(res); + fmstate->p_name = NULL; +} + /* * build_remote_returning * Build a RETURNING targetlist of a remote query for performing an @@ -6577,3 +6750,45 @@ find_em_expr_for_input_target(PlannerInfo *root, elog(ERROR, "could not find pathkey item to sort"); return NULL; /* keep compiler quiet */ } + +/* + * Determine batch size for a given foreign table. The option specified for + * a table has precedence. + */ +static int +get_batch_size_option(Relation rel) +{ + Oid foreigntableid = RelationGetRelid(rel); + ForeignTable *table; + ForeignServer *server; + List *options; + ListCell *lc; + + /* we use 1 by default, which means "no batching" */ + int batch_size = 1; + + /* + * Load options for table and server. We append server options after + * table options, because table options take precedence. + */ + table = GetForeignTable(foreigntableid); + server = GetForeignServer(table->serverid); + + options = NIL; + options = list_concat(options, table->options); + options = list_concat(options, server->options); + + /* See if either table or server specifies batch_size. */ + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "batch_size") == 0) + { + batch_size = strtol(defGetString(def), NULL, 10); + break; + } + } + + return batch_size; +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 19ea27a1bcd..1f67b4d9fd2 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -161,7 +161,10 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, List *withCheckOptionList, List *returningList, - List **retrieved_attrs); + List **retrieved_attrs, int *values_end_len); +extern void rebuildInsertSql(StringInfo buf, char *orig_query, + int values_end_len, int num_cols, + int num_rows); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index ebf6eb10a61..28b82f5f9dc 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2738,3 +2738,96 @@ COMMIT; -- should not be output because they should be closed at the end of -- the above transaction. SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; + +-- =================================================================== +-- batch insert +-- =================================================================== + +BEGIN; + +CREATE SERVER batch10 FOREIGN DATA WRAPPER postgres_fdw OPTIONS( batch_size '10' ); + +SELECT count(*) +FROM pg_foreign_server +WHERE srvname = 'batch10' +AND srvoptions @> array['batch_size=10']; + +ALTER SERVER batch10 OPTIONS( SET batch_size '20' ); + +SELECT count(*) +FROM pg_foreign_server +WHERE srvname = 'batch10' +AND srvoptions @> array['batch_size=10']; + +SELECT count(*) +FROM pg_foreign_server +WHERE srvname = 'batch10' +AND srvoptions @> array['batch_size=20']; + +CREATE FOREIGN TABLE table30 ( x int ) SERVER batch10 OPTIONS ( batch_size '30' ); + +SELECT COUNT(*) +FROM pg_foreign_table +WHERE ftrelid = 'table30'::regclass +AND ftoptions @> array['batch_size=30']; + +ALTER FOREIGN TABLE table30 OPTIONS ( SET batch_size '40'); + +SELECT COUNT(*) +FROM pg_foreign_table +WHERE ftrelid = 'table30'::regclass +AND ftoptions @> array['batch_size=30']; + +SELECT COUNT(*) +FROM pg_foreign_table +WHERE ftrelid = 'table30'::regclass +AND ftoptions @> array['batch_size=40']; + +ROLLBACK; + +CREATE TABLE batch_table ( x int ); + +CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '10' ); +EXPLAIN (VERBOSE, COSTS OFF) INSERT INTO ftable SELECT * FROM generate_series(1, 10) i; +INSERT INTO ftable SELECT * FROM generate_series(1, 10) i; +INSERT INTO ftable SELECT * FROM generate_series(11, 31) i; +INSERT INTO ftable VALUES (32); +INSERT INTO ftable VALUES (33), (34); +SELECT COUNT(*) FROM ftable; +TRUNCATE batch_table; +DROP FOREIGN TABLE ftable; + +-- Disable batch insert +CREATE FOREIGN TABLE ftable ( x int ) SERVER loopback OPTIONS ( table_name 'batch_table', batch_size '1' ); +EXPLAIN (VERBOSE, COSTS OFF) INSERT INTO ftable VALUES (1), (2); +INSERT INTO ftable VALUES (1), (2); +SELECT COUNT(*) FROM ftable; +DROP FOREIGN TABLE ftable; +DROP TABLE batch_table; + +-- Use partitioning +CREATE TABLE batch_table ( x int ) PARTITION BY HASH (x); + +CREATE TABLE batch_table_p0 (LIKE batch_table); +CREATE FOREIGN TABLE batch_table_p0f + PARTITION OF batch_table + FOR VALUES WITH (MODULUS 3, REMAINDER 0) + SERVER loopback + OPTIONS (table_name 'batch_table_p0', batch_size '10'); + +CREATE TABLE batch_table_p1 (LIKE batch_table); +CREATE FOREIGN TABLE batch_table_p1f + PARTITION OF batch_table + FOR VALUES WITH (MODULUS 3, REMAINDER 1) + SERVER loopback + OPTIONS (table_name 'batch_table_p1', batch_size '1'); + +CREATE TABLE batch_table_p2 + PARTITION OF batch_table + FOR VALUES WITH (MODULUS 3, REMAINDER 2); + +INSERT INTO batch_table SELECT * FROM generate_series(1, 66) i; +SELECT COUNT(*) FROM batch_table; + +-- Clean up +DROP TABLE batch_table CASCADE; diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index 9c9293414c5..854913ae5fc 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -523,8 +523,9 @@ BeginForeignModify(ModifyTableState *mtstate, Begin executing a foreign table modification operation. This routine is called during executor startup. It should perform any initialization needed prior to the actual table modifications. Subsequently, - ExecForeignInsert, ExecForeignUpdate or - ExecForeignDelete will be called for each tuple to be + ExecForeignInsert/ExecForeignBatchInsert, + ExecForeignUpdate or + ExecForeignDelete will be called for tuple(s) to be inserted, updated, or deleted. @@ -614,6 +615,81 @@ ExecForeignInsert(EState *estate, +TupleTableSlot ** +ExecForeignBatchInsert(EState *estate, + ResultRelInfo *rinfo, + TupleTableSlot **slots, + TupleTableSlot *planSlots, + int *numSlots); + + + Insert multiple tuples in bulk into the foreign table. + The parameters are the same for ExecForeignInsert + except slots and planSlots contain + multiple tuples and *numSlots> specifies the number of + tuples in those arrays. + + + + The return value is an array of slots containing the data that was + actually inserted (this might differ from the data supplied, for + example as a result of trigger actions.) + The passed-in slots can be re-used for this purpose. + The number of successfully inserted tuples is returned in + *numSlots. + + + + The data in the returned slot is used only if the INSERT + statement involves a view + WITH CHECK OPTION; or if the foreign table has + an AFTER ROW trigger. Triggers require all columns, + but the FDW could choose to optimize away returning some or all columns + depending on the contents of the + WITH CHECK OPTION constraints. + + + + If the ExecForeignBatchInsert or + GetForeignModifyBatchSize pointer is set to + NULL, attempts to insert into the foreign table will + use ExecForeignInsert. + This function is not used if the INSERT has the + RETURNING> clause. + + + + Note that this function is also called when inserting routed tuples into + a foreign-table partition. See the callback functions + described below that allow the FDW to support that. + + + + +int +GetForeignModifyBatchSize(ResultRelInfo *rinfo); + + + Report the maximum number of tuples that a single + ExecForeignBatchInsert call can handle for + the specified foreign table. That is, The executor passes at most + the number of tuples that this function returns to + ExecForeignBatchInsert. + rinfo is the ResultRelInfo struct describing + the target foreign table. + The FDW is expected to provide a foreign server and/or foreign + table option for the user to set this value, or some hard-coded value. + + + + If the ExecForeignBatchInsert or + GetForeignModifyBatchSize pointer is set to + NULL, attempts to insert into the foreign table will + use ExecForeignInsert. + + + + TupleTableSlot * ExecForeignUpdate(EState *estate, ResultRelInfo *rinfo, @@ -741,8 +817,9 @@ BeginForeignInsert(ModifyTableState *mtstate, in both cases when it is the partition chosen for tuple routing and the target specified in a COPY FROM command. It should perform any initialization needed prior to the actual insertion. - Subsequently, ExecForeignInsert will be called for - each tuple to be inserted into the foreign table. + Subsequently, ExecForeignInsert or + ExecForeignBatchInsert will be called for + tuple(s) to be inserted into the foreign table. @@ -773,8 +850,8 @@ BeginForeignInsert(ModifyTableState *mtstate, Note that if the FDW does not support routable foreign-table partitions and/or executing COPY FROM on foreign tables, this - function or ExecForeignInsert subsequently called - must throw error as needed. + function or ExecForeignInsert/ExecForeignBatchInsert + subsequently called must throw error as needed. diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index 9adc8d12a9d..fb4c22ac69f 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -354,6 +354,19 @@ OPTIONS (ADD password_required 'false'); + + batch_size + + + This option specifies the number of rows postgres_fdw + should insert in each insert operation. It can be specified for a + foreign table or a foreign server. The option specified on a table + overrides an option specified for the server. + The default is 1. + + + + diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index 941731a0a9b..1746cb87936 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -993,6 +993,23 @@ ExecInitRoutingInfo(ModifyTableState *mtstate, partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo); + /* + * Determine if the FDW supports batch insert and determine the batch + * size (a FDW may support batching, but it may be disabled for the + * server/table or for this particular query). + * + * If the FDW does not support batching, we set the batch size to 1. + */ + if (partRelInfo->ri_FdwRoutine != NULL && + partRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize && + partRelInfo->ri_FdwRoutine->ExecForeignBatchInsert) + partRelInfo->ri_BatchSize = + partRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(partRelInfo); + else + partRelInfo->ri_BatchSize = 1; + + Assert(partRelInfo->ri_BatchSize >= 1); + partRelInfo->ri_CopyMultiInsertBuffer = NULL; /* diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 921e6954194..9c36860704a 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -58,6 +58,13 @@ #include "utils/rel.h" +static void ExecBatchInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int numSlots, + EState *estate, + bool canSetTag); static bool ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer conflictTid, @@ -389,6 +396,7 @@ ExecInsert(ModifyTableState *mtstate, ModifyTable *node = (ModifyTable *) mtstate->ps.plan; OnConflictAction onconflict = node->onConflictAction; PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing; + MemoryContext oldContext; /* * If the input result relation is a partitioned table, find the leaf @@ -441,6 +449,55 @@ ExecInsert(ModifyTableState *mtstate, ExecComputeStoredGenerated(resultRelInfo, estate, slot, CMD_INSERT); + /* + * If the FDW supports batching, and batching is requested, accumulate + * rows and insert them in batches. Otherwise use the per-row inserts. + */ + if (resultRelInfo->ri_BatchSize > 1) + { + /* + * If a certain number of tuples have already been accumulated, + * or a tuple has come for a different relation than that for + * the accumulated tuples, perform the batch insert + */ + if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize) + { + ExecBatchInsert(mtstate, resultRelInfo, + resultRelInfo->ri_Slots, + resultRelInfo->ri_PlanSlots, + resultRelInfo->ri_NumSlots, + estate, canSetTag); + resultRelInfo->ri_NumSlots = 0; + } + + oldContext = MemoryContextSwitchTo(estate->es_query_cxt); + + if (resultRelInfo->ri_Slots == NULL) + { + resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) * + resultRelInfo->ri_BatchSize); + resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) * + resultRelInfo->ri_BatchSize); + } + + resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] = + MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, + slot->tts_ops); + ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots], + slot); + resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] = + MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor, + planSlot->tts_ops); + ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots], + planSlot); + + resultRelInfo->ri_NumSlots++; + + MemoryContextSwitchTo(oldContext); + + return NULL; + } + /* * insert into foreign table: let the FDW do it */ @@ -698,6 +755,70 @@ ExecInsert(ModifyTableState *mtstate, return result; } +/* ---------------------------------------------------------------- + * ExecBatchInsert + * + * Insert multiple tuples in an efficient way. + * Currently, this handles inserting into a foreign table without + * RETURNING clause. + * ---------------------------------------------------------------- + */ +static void +ExecBatchInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int numSlots, + EState *estate, + bool canSetTag) +{ + int i; + int numInserted = numSlots; + TupleTableSlot *slot = NULL; + TupleTableSlot **rslots; + + /* + * insert into foreign table: let the FDW do it + */ + rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate, + resultRelInfo, + slots, + planSlots, + &numInserted); + + for (i = 0; i < numInserted; i++) + { + slot = rslots[i]; + + /* + * AFTER ROW Triggers or RETURNING expressions might reference the + * tableoid column, so (re-)initialize tts_tableOid before evaluating + * them. + */ + slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, slot, NIL, + mtstate->mt_transition_capture); + + /* + * Check any WITH CHECK OPTION constraints from parent views. See the + * comment in ExecInsert. + */ + if (resultRelInfo->ri_WithCheckOptions != NIL) + ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate); + } + + if (canSetTag && numInserted > 0) + estate->es_processed += numInserted; + + for (i = 0; i < numSlots; i++) + { + ExecDropSingleTupleTableSlot(slots[i]); + ExecDropSingleTupleTableSlot(planSlots[i]); + } +} + /* ---------------------------------------------------------------- * ExecDelete * @@ -1937,6 +2058,9 @@ ExecModifyTable(PlanState *pstate) ItemPointerData tuple_ctid; HeapTupleData oldtupdata; HeapTuple oldtuple; + PartitionTupleRouting *proute = node->mt_partition_tuple_routing; + List *relinfos = NIL; + ListCell *lc; CHECK_FOR_INTERRUPTS(); @@ -2152,6 +2276,25 @@ ExecModifyTable(PlanState *pstate) return slot; } + /* + * Insert remaining tuples for batch insert. + */ + if (proute) + relinfos = estate->es_tuple_routing_result_relations; + else + relinfos = estate->es_opened_result_relations; + + foreach(lc, relinfos) + { + resultRelInfo = lfirst(lc); + if (resultRelInfo->ri_NumSlots > 0) + ExecBatchInsert(node, resultRelInfo, + resultRelInfo->ri_Slots, + resultRelInfo->ri_PlanSlots, + resultRelInfo->ri_NumSlots, + estate, node->canSetTag); + } + /* * We're done, but fire AFTER STATEMENT triggers before exiting. */ @@ -2650,6 +2793,23 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } } + /* + * Determine if the FDW supports batch insert and determine the batch + * size (a FDW may support batching, but it may be disabled for the + * server/table). + */ + if (!resultRelInfo->ri_usesFdwDirectModify && + operation == CMD_INSERT && + resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize && + resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert) + resultRelInfo->ri_BatchSize = + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo); + else + resultRelInfo->ri_BatchSize = 1; + + Assert(resultRelInfo->ri_BatchSize >= 1); + /* * Lastly, if this is not the primary (canSetTag) ModifyTable node, add it * to estate->es_auxmodifytables so that it will be run to completion by diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c index c4eba6b053f..dbf6b30233a 100644 --- a/src/backend/nodes/list.c +++ b/src/backend/nodes/list.c @@ -277,6 +277,21 @@ list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2, return list; } +List * +list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2, + ListCell datum3, ListCell datum4, ListCell datum5) +{ + List *list = new_list(t, 5); + + list->elements[0] = datum1; + list->elements[1] = datum2; + list->elements[2] = datum3; + list->elements[3] = datum4; + list->elements[4] = datum5; + check_list_invariants(list); + return list; +} + /* * Make room for a new head cell in the given (non-NIL) list. * diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 2953499fb10..248f78da452 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -85,6 +85,14 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot); +typedef TupleTableSlot **(*ExecForeignBatchInsert_function) (EState *estate, + ResultRelInfo *rinfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); + +typedef int (*GetForeignModifyBatchSize_function) (ResultRelInfo *rinfo); + typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slot, @@ -209,6 +217,8 @@ typedef struct FdwRoutine PlanForeignModify_function PlanForeignModify; BeginForeignModify_function BeginForeignModify; ExecForeignInsert_function ExecForeignInsert; + ExecForeignBatchInsert_function ExecForeignBatchInsert; + GetForeignModifyBatchSize_function GetForeignModifyBatchSize; ExecForeignUpdate_function ExecForeignUpdate; ExecForeignDelete_function ExecForeignDelete; EndForeignModify_function EndForeignModify; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 48c3f570fa9..d65099c94aa 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -446,6 +446,12 @@ typedef struct ResultRelInfo /* true when modifying foreign table directly */ bool ri_usesFdwDirectModify; + /* batch insert stuff */ + int ri_NumSlots; /* number of slots in the array */ + int ri_BatchSize; /* max slots inserted in a single batch */ + TupleTableSlot **ri_Slots; /* input tuples for batch insert */ + TupleTableSlot **ri_PlanSlots; + /* list of WithCheckOption's to be checked */ List *ri_WithCheckOptions; diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h index 710dcd37ef4..404e03f132d 100644 --- a/src/include/nodes/pg_list.h +++ b/src/include/nodes/pg_list.h @@ -213,6 +213,10 @@ list_length(const List *l) #define list_make4(x1,x2,x3,x4) \ list_make4_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \ list_make_ptr_cell(x3), list_make_ptr_cell(x4)) +#define list_make5(x1,x2,x3,x4,x5) \ + list_make5_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \ + list_make_ptr_cell(x3), list_make_ptr_cell(x4), \ + list_make_ptr_cell(x5)) #define list_make1_int(x1) \ list_make1_impl(T_IntList, list_make_int_cell(x1)) @@ -224,6 +228,10 @@ list_length(const List *l) #define list_make4_int(x1,x2,x3,x4) \ list_make4_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \ list_make_int_cell(x3), list_make_int_cell(x4)) +#define list_make5_int(x1,x2,x3,x4,x5) \ + list_make5_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \ + list_make_int_cell(x3), list_make_int_cell(x4), \ + list_make_int_cell(x5)) #define list_make1_oid(x1) \ list_make1_impl(T_OidList, list_make_oid_cell(x1)) @@ -235,6 +243,10 @@ list_length(const List *l) #define list_make4_oid(x1,x2,x3,x4) \ list_make4_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \ list_make_oid_cell(x3), list_make_oid_cell(x4)) +#define list_make5_oid(x1,x2,x3,x4,x5) \ + list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \ + list_make_oid_cell(x3), list_make_oid_cell(x4), \ + list_make_oid_cell(x5)) /* * Locate the n'th cell (counting from 0) of the list. @@ -520,6 +532,9 @@ extern List *list_make3_impl(NodeTag t, ListCell datum1, ListCell datum2, ListCell datum3); extern List *list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2, ListCell datum3, ListCell datum4); +extern List *list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2, + ListCell datum3, ListCell datum4, + ListCell datum5); extern pg_nodiscard List *lappend(List *list, void *datum); extern pg_nodiscard List *lappend_int(List *list, int datum);