mirror of
https://github.com/postgres/postgres.git
synced 2025-06-14 18:42:34 +03:00
Allow insert and update tuple routing and COPY for foreign tables.
Also enable this for postgres_fdw. Etsuro Fujita, based on an earlier patch by Amit Langote. The larger patch series of which this is a part has been reviewed by Amit Langote, David Fetter, Maksim Milyutin, Álvaro Herrera, Stephen Frost, and me. Minor documentation changes to the final version by me. Discussion: http://postgr.es/m/29906a26-da12-8c86-4fb9-d8f88442f2b9@lab.ntt.co.jp
This commit is contained in:
@ -136,6 +136,11 @@ DELETE FROM agg_csv WHERE a = 100;
|
|||||||
-- but this should be allowed
|
-- but this should be allowed
|
||||||
SELECT * FROM agg_csv FOR UPDATE;
|
SELECT * FROM agg_csv FOR UPDATE;
|
||||||
|
|
||||||
|
-- copy from isn't supported either
|
||||||
|
COPY agg_csv FROM STDIN;
|
||||||
|
12 3.4
|
||||||
|
\.
|
||||||
|
|
||||||
-- constraint exclusion tests
|
-- constraint exclusion tests
|
||||||
\t on
|
\t on
|
||||||
EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv WHERE a < 0;
|
EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv WHERE a < 0;
|
||||||
|
@ -221,6 +221,9 @@ SELECT * FROM agg_csv FOR UPDATE;
|
|||||||
42 | 324.78
|
42 | 324.78
|
||||||
(3 rows)
|
(3 rows)
|
||||||
|
|
||||||
|
-- copy from isn't supported either
|
||||||
|
COPY agg_csv FROM STDIN;
|
||||||
|
ERROR: cannot insert into foreign table "agg_csv"
|
||||||
-- constraint exclusion tests
|
-- constraint exclusion tests
|
||||||
\t on
|
\t on
|
||||||
EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv WHERE a < 0;
|
EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv WHERE a < 0;
|
||||||
@ -315,7 +318,7 @@ SELECT tableoid::regclass, * FROM p2;
|
|||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
COPY pt FROM '@abs_srcdir@/data/list2.bad' with (format 'csv', delimiter ','); -- ERROR
|
COPY pt FROM '@abs_srcdir@/data/list2.bad' with (format 'csv', delimiter ','); -- ERROR
|
||||||
ERROR: cannot route inserted tuples to a foreign table
|
ERROR: cannot insert into foreign table "p1"
|
||||||
CONTEXT: COPY pt, line 2: "1,qux"
|
CONTEXT: COPY pt, line 2: "1,qux"
|
||||||
COPY pt FROM '@abs_srcdir@/data/list2.csv' with (format 'csv', delimiter ',');
|
COPY pt FROM '@abs_srcdir@/data/list2.csv' with (format 'csv', delimiter ',');
|
||||||
SELECT tableoid::regclass, * FROM pt;
|
SELECT tableoid::regclass, * FROM pt;
|
||||||
@ -342,10 +345,10 @@ SELECT tableoid::regclass, * FROM p2;
|
|||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
INSERT INTO pt VALUES (1, 'xyzzy'); -- ERROR
|
INSERT INTO pt VALUES (1, 'xyzzy'); -- ERROR
|
||||||
ERROR: cannot route inserted tuples to a foreign table
|
ERROR: cannot insert into foreign table "p1"
|
||||||
INSERT INTO pt VALUES (2, 'xyzzy');
|
INSERT INTO pt VALUES (2, 'xyzzy');
|
||||||
UPDATE pt set a = 1 where a = 2; -- ERROR
|
UPDATE pt set a = 1 where a = 2; -- ERROR
|
||||||
ERROR: cannot route inserted tuples to a foreign table
|
ERROR: cannot insert into foreign table "p1"
|
||||||
SELECT tableoid::regclass, * FROM pt;
|
SELECT tableoid::regclass, * FROM pt;
|
||||||
tableoid | a | b
|
tableoid | a | b
|
||||||
----------+---+-------
|
----------+---+-------
|
||||||
|
@ -7371,6 +7371,340 @@ NOTICE: drop cascades to foreign table bar2
|
|||||||
drop table loct1;
|
drop table loct1;
|
||||||
drop table loct2;
|
drop table loct2;
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
|
-- test tuple routing for foreign-table partitions
|
||||||
|
-- ===================================================================
|
||||||
|
-- Test insert tuple routing
|
||||||
|
create table itrtest (a int, b text) partition by list (a);
|
||||||
|
create table loct1 (a int check (a in (1)), b text);
|
||||||
|
create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1');
|
||||||
|
create table loct2 (a int check (a in (2)), b text);
|
||||||
|
create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2');
|
||||||
|
alter table itrtest attach partition remp1 for values in (1);
|
||||||
|
alter table itrtest attach partition remp2 for values in (2);
|
||||||
|
insert into itrtest values (1, 'foo');
|
||||||
|
insert into itrtest values (1, 'bar') returning *;
|
||||||
|
a | b
|
||||||
|
---+-----
|
||||||
|
1 | bar
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
insert into itrtest values (2, 'baz');
|
||||||
|
insert into itrtest values (2, 'qux') returning *;
|
||||||
|
a | b
|
||||||
|
---+-----
|
||||||
|
2 | qux
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
insert into itrtest values (1, 'test1'), (2, 'test2') returning *;
|
||||||
|
a | b
|
||||||
|
---+-------
|
||||||
|
1 | test1
|
||||||
|
2 | test2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM itrtest;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-------
|
||||||
|
remp1 | 1 | foo
|
||||||
|
remp1 | 1 | bar
|
||||||
|
remp1 | 1 | test1
|
||||||
|
remp2 | 2 | baz
|
||||||
|
remp2 | 2 | qux
|
||||||
|
remp2 | 2 | test2
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM remp1;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-------
|
||||||
|
remp1 | 1 | foo
|
||||||
|
remp1 | 1 | bar
|
||||||
|
remp1 | 1 | test1
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM remp2;
|
||||||
|
tableoid | b | a
|
||||||
|
----------+-------+---
|
||||||
|
remp2 | baz | 2
|
||||||
|
remp2 | qux | 2
|
||||||
|
remp2 | test2 | 2
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
delete from itrtest;
|
||||||
|
create unique index loct1_idx on loct1 (a);
|
||||||
|
-- DO NOTHING without an inference specification is supported
|
||||||
|
insert into itrtest values (1, 'foo') on conflict do nothing returning *;
|
||||||
|
a | b
|
||||||
|
---+-----
|
||||||
|
1 | foo
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
insert into itrtest values (1, 'foo') on conflict do nothing returning *;
|
||||||
|
a | b
|
||||||
|
---+---
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- But other cases are not supported
|
||||||
|
insert into itrtest values (1, 'bar') on conflict (a) do nothing;
|
||||||
|
ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
|
||||||
|
insert into itrtest values (1, 'bar') on conflict (a) do update set b = excluded.b;
|
||||||
|
ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
|
||||||
|
select tableoid::regclass, * FROM itrtest;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-----
|
||||||
|
remp1 | 1 | foo
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
drop table itrtest;
|
||||||
|
drop table loct1;
|
||||||
|
drop table loct2;
|
||||||
|
-- Test update tuple routing
|
||||||
|
create table utrtest (a int, b text) partition by list (a);
|
||||||
|
create table loct (a int check (a in (1)), b text);
|
||||||
|
create foreign table remp (a int check (a in (1)), b text) server loopback options (table_name 'loct');
|
||||||
|
create table locp (a int check (a in (2)), b text);
|
||||||
|
alter table utrtest attach partition remp for values in (1);
|
||||||
|
alter table utrtest attach partition locp for values in (2);
|
||||||
|
insert into utrtest values (1, 'foo');
|
||||||
|
insert into utrtest values (2, 'qux');
|
||||||
|
select tableoid::regclass, * FROM utrtest;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-----
|
||||||
|
remp | 1 | foo
|
||||||
|
locp | 2 | qux
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM remp;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-----
|
||||||
|
remp | 1 | foo
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM locp;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-----
|
||||||
|
locp | 2 | qux
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- It's not allowed to move a row from a partition that is foreign to another
|
||||||
|
update utrtest set a = 2 where b = 'foo' returning *;
|
||||||
|
ERROR: new row for relation "loct" violates check constraint "loct_a_check"
|
||||||
|
DETAIL: Failing row contains (2, foo).
|
||||||
|
CONTEXT: remote SQL command: UPDATE public.loct SET a = 2 WHERE ((b = 'foo'::text)) RETURNING a, b
|
||||||
|
-- But the reverse is allowed
|
||||||
|
update utrtest set a = 1 where b = 'qux' returning *;
|
||||||
|
a | b
|
||||||
|
---+-----
|
||||||
|
1 | qux
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM utrtest;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-----
|
||||||
|
remp | 1 | foo
|
||||||
|
remp | 1 | qux
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM remp;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-----
|
||||||
|
remp | 1 | foo
|
||||||
|
remp | 1 | qux
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM locp;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+---
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- The executor should not let unexercised FDWs shut down
|
||||||
|
update utrtest set a = 1 where b = 'foo';
|
||||||
|
drop table utrtest;
|
||||||
|
drop table loct;
|
||||||
|
-- Test copy tuple routing
|
||||||
|
create table ctrtest (a int, b text) partition by list (a);
|
||||||
|
create table loct1 (a int check (a in (1)), b text);
|
||||||
|
create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1');
|
||||||
|
create table loct2 (a int check (a in (2)), b text);
|
||||||
|
create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2');
|
||||||
|
alter table ctrtest attach partition remp1 for values in (1);
|
||||||
|
alter table ctrtest attach partition remp2 for values in (2);
|
||||||
|
copy ctrtest from stdin;
|
||||||
|
select tableoid::regclass, * FROM ctrtest;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-----
|
||||||
|
remp1 | 1 | foo
|
||||||
|
remp2 | 2 | qux
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM remp1;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-----
|
||||||
|
remp1 | 1 | foo
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM remp2;
|
||||||
|
tableoid | b | a
|
||||||
|
----------+-----+---
|
||||||
|
remp2 | qux | 2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Copying into foreign partitions directly should work as well
|
||||||
|
copy remp1 from stdin;
|
||||||
|
select tableoid::regclass, * FROM remp1;
|
||||||
|
tableoid | a | b
|
||||||
|
----------+---+-----
|
||||||
|
remp1 | 1 | foo
|
||||||
|
remp1 | 1 | bar
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
drop table ctrtest;
|
||||||
|
drop table loct1;
|
||||||
|
drop table loct2;
|
||||||
|
-- ===================================================================
|
||||||
|
-- test COPY FROM
|
||||||
|
-- ===================================================================
|
||||||
|
create table loc2 (f1 int, f2 text);
|
||||||
|
alter table loc2 set (autovacuum_enabled = 'false');
|
||||||
|
create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2');
|
||||||
|
-- Test basic functionality
|
||||||
|
copy rem2 from stdin;
|
||||||
|
select * from rem2;
|
||||||
|
f1 | f2
|
||||||
|
----+-----
|
||||||
|
1 | foo
|
||||||
|
2 | bar
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
delete from rem2;
|
||||||
|
-- Test check constraints
|
||||||
|
alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
|
||||||
|
alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
|
||||||
|
-- check constraint is enforced on the remote side, not locally
|
||||||
|
copy rem2 from stdin;
|
||||||
|
copy rem2 from stdin; -- ERROR
|
||||||
|
ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive"
|
||||||
|
DETAIL: Failing row contains (-1, xyzzy).
|
||||||
|
CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
|
||||||
|
COPY rem2, line 1: "-1 xyzzy"
|
||||||
|
select * from rem2;
|
||||||
|
f1 | f2
|
||||||
|
----+-----
|
||||||
|
1 | foo
|
||||||
|
2 | bar
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
alter foreign table rem2 drop constraint rem2_f1positive;
|
||||||
|
alter table loc2 drop constraint loc2_f1positive;
|
||||||
|
delete from rem2;
|
||||||
|
-- Test local triggers
|
||||||
|
create trigger trig_stmt_before before insert on rem2
|
||||||
|
for each statement execute procedure trigger_func();
|
||||||
|
create trigger trig_stmt_after after insert on rem2
|
||||||
|
for each statement execute procedure trigger_func();
|
||||||
|
create trigger trig_row_before before insert on rem2
|
||||||
|
for each row execute procedure trigger_data(23,'skidoo');
|
||||||
|
create trigger trig_row_after after insert on rem2
|
||||||
|
for each row execute procedure trigger_data(23,'skidoo');
|
||||||
|
copy rem2 from stdin;
|
||||||
|
NOTICE: trigger_func(<NULL>) called: action = INSERT, when = BEFORE, level = STATEMENT
|
||||||
|
NOTICE: trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2
|
||||||
|
NOTICE: NEW: (1,foo)
|
||||||
|
NOTICE: trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2
|
||||||
|
NOTICE: NEW: (2,bar)
|
||||||
|
NOTICE: trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2
|
||||||
|
NOTICE: NEW: (1,foo)
|
||||||
|
NOTICE: trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2
|
||||||
|
NOTICE: NEW: (2,bar)
|
||||||
|
NOTICE: trigger_func(<NULL>) called: action = INSERT, when = AFTER, level = STATEMENT
|
||||||
|
select * from rem2;
|
||||||
|
f1 | f2
|
||||||
|
----+-----
|
||||||
|
1 | foo
|
||||||
|
2 | bar
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
drop trigger trig_row_before on rem2;
|
||||||
|
drop trigger trig_row_after on rem2;
|
||||||
|
drop trigger trig_stmt_before on rem2;
|
||||||
|
drop trigger trig_stmt_after on rem2;
|
||||||
|
delete from rem2;
|
||||||
|
create trigger trig_row_before_insert before insert on rem2
|
||||||
|
for each row execute procedure trig_row_before_insupdate();
|
||||||
|
-- The new values are concatenated with ' triggered !'
|
||||||
|
copy rem2 from stdin;
|
||||||
|
select * from rem2;
|
||||||
|
f1 | f2
|
||||||
|
----+-----------------
|
||||||
|
1 | foo triggered !
|
||||||
|
2 | bar triggered !
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
drop trigger trig_row_before_insert on rem2;
|
||||||
|
delete from rem2;
|
||||||
|
create trigger trig_null before insert on rem2
|
||||||
|
for each row execute procedure trig_null();
|
||||||
|
-- Nothing happens
|
||||||
|
copy rem2 from stdin;
|
||||||
|
select * from rem2;
|
||||||
|
f1 | f2
|
||||||
|
----+----
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
drop trigger trig_null on rem2;
|
||||||
|
delete from rem2;
|
||||||
|
-- Test remote triggers
|
||||||
|
create trigger trig_row_before_insert before insert on loc2
|
||||||
|
for each row execute procedure trig_row_before_insupdate();
|
||||||
|
-- The new values are concatenated with ' triggered !'
|
||||||
|
copy rem2 from stdin;
|
||||||
|
select * from rem2;
|
||||||
|
f1 | f2
|
||||||
|
----+-----------------
|
||||||
|
1 | foo triggered !
|
||||||
|
2 | bar triggered !
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
drop trigger trig_row_before_insert on loc2;
|
||||||
|
delete from rem2;
|
||||||
|
create trigger trig_null before insert on loc2
|
||||||
|
for each row execute procedure trig_null();
|
||||||
|
-- Nothing happens
|
||||||
|
copy rem2 from stdin;
|
||||||
|
select * from rem2;
|
||||||
|
f1 | f2
|
||||||
|
----+----
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
drop trigger trig_null on loc2;
|
||||||
|
delete from rem2;
|
||||||
|
-- Test a combination of local and remote triggers
|
||||||
|
create trigger rem2_trig_row_before before insert on rem2
|
||||||
|
for each row execute procedure trigger_data(23,'skidoo');
|
||||||
|
create trigger rem2_trig_row_after after insert on rem2
|
||||||
|
for each row execute procedure trigger_data(23,'skidoo');
|
||||||
|
create trigger loc2_trig_row_before_insert before insert on loc2
|
||||||
|
for each row execute procedure trig_row_before_insupdate();
|
||||||
|
copy rem2 from stdin;
|
||||||
|
NOTICE: rem2_trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2
|
||||||
|
NOTICE: NEW: (1,foo)
|
||||||
|
NOTICE: rem2_trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2
|
||||||
|
NOTICE: NEW: (2,bar)
|
||||||
|
NOTICE: rem2_trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2
|
||||||
|
NOTICE: NEW: (1,"foo triggered !")
|
||||||
|
NOTICE: rem2_trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2
|
||||||
|
NOTICE: NEW: (2,"bar triggered !")
|
||||||
|
select * from rem2;
|
||||||
|
f1 | f2
|
||||||
|
----+-----------------
|
||||||
|
1 | foo triggered !
|
||||||
|
2 | bar triggered !
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
drop trigger rem2_trig_row_before on rem2;
|
||||||
|
drop trigger rem2_trig_row_after on rem2;
|
||||||
|
drop trigger loc2_trig_row_before_insert on loc2;
|
||||||
|
delete from rem2;
|
||||||
|
-- ===================================================================
|
||||||
-- test IMPORT FOREIGN SCHEMA
|
-- test IMPORT FOREIGN SCHEMA
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
CREATE SCHEMA import_source;
|
CREATE SCHEMA import_source;
|
||||||
|
@ -319,6 +319,10 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate,
|
|||||||
TupleTableSlot *planSlot);
|
TupleTableSlot *planSlot);
|
||||||
static void postgresEndForeignModify(EState *estate,
|
static void postgresEndForeignModify(EState *estate,
|
||||||
ResultRelInfo *resultRelInfo);
|
ResultRelInfo *resultRelInfo);
|
||||||
|
static void postgresBeginForeignInsert(ModifyTableState *mtstate,
|
||||||
|
ResultRelInfo *resultRelInfo);
|
||||||
|
static void postgresEndForeignInsert(EState *estate,
|
||||||
|
ResultRelInfo *resultRelInfo);
|
||||||
static int postgresIsForeignRelUpdatable(Relation rel);
|
static int postgresIsForeignRelUpdatable(Relation rel);
|
||||||
static bool postgresPlanDirectModify(PlannerInfo *root,
|
static bool postgresPlanDirectModify(PlannerInfo *root,
|
||||||
ModifyTable *plan,
|
ModifyTable *plan,
|
||||||
@ -473,6 +477,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
|
|||||||
routine->ExecForeignUpdate = postgresExecForeignUpdate;
|
routine->ExecForeignUpdate = postgresExecForeignUpdate;
|
||||||
routine->ExecForeignDelete = postgresExecForeignDelete;
|
routine->ExecForeignDelete = postgresExecForeignDelete;
|
||||||
routine->EndForeignModify = postgresEndForeignModify;
|
routine->EndForeignModify = postgresEndForeignModify;
|
||||||
|
routine->BeginForeignInsert = postgresBeginForeignInsert;
|
||||||
|
routine->EndForeignInsert = postgresEndForeignInsert;
|
||||||
routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
|
routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
|
||||||
routine->PlanDirectModify = postgresPlanDirectModify;
|
routine->PlanDirectModify = postgresPlanDirectModify;
|
||||||
routine->BeginDirectModify = postgresBeginDirectModify;
|
routine->BeginDirectModify = postgresBeginDirectModify;
|
||||||
@ -1959,6 +1965,96 @@ postgresEndForeignModify(EState *estate,
|
|||||||
finish_foreign_modify(fmstate);
|
finish_foreign_modify(fmstate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* postgresBeginForeignInsert
|
||||||
|
* Begin an insert operation on a foreign table
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
postgresBeginForeignInsert(ModifyTableState *mtstate,
|
||||||
|
ResultRelInfo *resultRelInfo)
|
||||||
|
{
|
||||||
|
PgFdwModifyState *fmstate;
|
||||||
|
Plan *plan = mtstate->ps.plan;
|
||||||
|
Relation rel = resultRelInfo->ri_RelationDesc;
|
||||||
|
RangeTblEntry *rte;
|
||||||
|
Query *query;
|
||||||
|
PlannerInfo *root;
|
||||||
|
TupleDesc tupdesc = RelationGetDescr(rel);
|
||||||
|
int attnum;
|
||||||
|
StringInfoData sql;
|
||||||
|
List *targetAttrs = NIL;
|
||||||
|
List *retrieved_attrs = NIL;
|
||||||
|
bool doNothing = false;
|
||||||
|
|
||||||
|
initStringInfo(&sql);
|
||||||
|
|
||||||
|
/* Set up largely-dummy planner state. */
|
||||||
|
rte = makeNode(RangeTblEntry);
|
||||||
|
rte->rtekind = RTE_RELATION;
|
||||||
|
rte->relid = RelationGetRelid(rel);
|
||||||
|
rte->relkind = RELKIND_FOREIGN_TABLE;
|
||||||
|
query = makeNode(Query);
|
||||||
|
query->commandType = CMD_INSERT;
|
||||||
|
query->resultRelation = 1;
|
||||||
|
query->rtable = list_make1(rte);
|
||||||
|
root = makeNode(PlannerInfo);
|
||||||
|
root->parse = query;
|
||||||
|
|
||||||
|
/* We transmit all columns that are defined in the foreign table. */
|
||||||
|
for (attnum = 1; attnum <= tupdesc->natts; attnum++)
|
||||||
|
{
|
||||||
|
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
|
||||||
|
|
||||||
|
if (!attr->attisdropped)
|
||||||
|
targetAttrs = lappend_int(targetAttrs, attnum);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check if we add the ON CONFLICT clause to the remote query. */
|
||||||
|
if (plan)
|
||||||
|
{
|
||||||
|
OnConflictAction onConflictAction = ((ModifyTable *) plan)->onConflictAction;
|
||||||
|
|
||||||
|
/* We only support DO NOTHING without an inference specification. */
|
||||||
|
if (onConflictAction == ONCONFLICT_NOTHING)
|
||||||
|
doNothing = true;
|
||||||
|
else if (onConflictAction != ONCONFLICT_NONE)
|
||||||
|
elog(ERROR, "unexpected ON CONFLICT specification: %d",
|
||||||
|
(int) onConflictAction);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Construct the SQL command string. */
|
||||||
|
deparseInsertSql(&sql, root, 1, rel, targetAttrs, doNothing,
|
||||||
|
resultRelInfo->ri_returningList, &retrieved_attrs);
|
||||||
|
|
||||||
|
/* Construct an execution state. */
|
||||||
|
fmstate = create_foreign_modify(mtstate->ps.state,
|
||||||
|
resultRelInfo,
|
||||||
|
CMD_INSERT,
|
||||||
|
NULL,
|
||||||
|
sql.data,
|
||||||
|
targetAttrs,
|
||||||
|
retrieved_attrs != NIL,
|
||||||
|
retrieved_attrs);
|
||||||
|
|
||||||
|
resultRelInfo->ri_FdwState = fmstate;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* postgresEndForeignInsert
|
||||||
|
* Finish an insert operation on a foreign table
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
postgresEndForeignInsert(EState *estate,
|
||||||
|
ResultRelInfo *resultRelInfo)
|
||||||
|
{
|
||||||
|
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
|
||||||
|
|
||||||
|
Assert(fmstate != NULL);
|
||||||
|
|
||||||
|
/* Destroy the execution state */
|
||||||
|
finish_foreign_modify(fmstate);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* postgresIsForeignRelUpdatable
|
* postgresIsForeignRelUpdatable
|
||||||
* Determine whether a foreign table supports INSERT, UPDATE and/or
|
* Determine whether a foreign table supports INSERT, UPDATE and/or
|
||||||
|
@ -1767,6 +1767,243 @@ drop table bar cascade;
|
|||||||
drop table loct1;
|
drop table loct1;
|
||||||
drop table loct2;
|
drop table loct2;
|
||||||
|
|
||||||
|
-- ===================================================================
|
||||||
|
-- test tuple routing for foreign-table partitions
|
||||||
|
-- ===================================================================
|
||||||
|
|
||||||
|
-- Test insert tuple routing
|
||||||
|
create table itrtest (a int, b text) partition by list (a);
|
||||||
|
create table loct1 (a int check (a in (1)), b text);
|
||||||
|
create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1');
|
||||||
|
create table loct2 (a int check (a in (2)), b text);
|
||||||
|
create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2');
|
||||||
|
alter table itrtest attach partition remp1 for values in (1);
|
||||||
|
alter table itrtest attach partition remp2 for values in (2);
|
||||||
|
|
||||||
|
insert into itrtest values (1, 'foo');
|
||||||
|
insert into itrtest values (1, 'bar') returning *;
|
||||||
|
insert into itrtest values (2, 'baz');
|
||||||
|
insert into itrtest values (2, 'qux') returning *;
|
||||||
|
insert into itrtest values (1, 'test1'), (2, 'test2') returning *;
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM itrtest;
|
||||||
|
select tableoid::regclass, * FROM remp1;
|
||||||
|
select tableoid::regclass, * FROM remp2;
|
||||||
|
|
||||||
|
delete from itrtest;
|
||||||
|
|
||||||
|
create unique index loct1_idx on loct1 (a);
|
||||||
|
|
||||||
|
-- DO NOTHING without an inference specification is supported
|
||||||
|
insert into itrtest values (1, 'foo') on conflict do nothing returning *;
|
||||||
|
insert into itrtest values (1, 'foo') on conflict do nothing returning *;
|
||||||
|
|
||||||
|
-- But other cases are not supported
|
||||||
|
insert into itrtest values (1, 'bar') on conflict (a) do nothing;
|
||||||
|
insert into itrtest values (1, 'bar') on conflict (a) do update set b = excluded.b;
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM itrtest;
|
||||||
|
|
||||||
|
drop table itrtest;
|
||||||
|
drop table loct1;
|
||||||
|
drop table loct2;
|
||||||
|
|
||||||
|
-- Test update tuple routing
|
||||||
|
create table utrtest (a int, b text) partition by list (a);
|
||||||
|
create table loct (a int check (a in (1)), b text);
|
||||||
|
create foreign table remp (a int check (a in (1)), b text) server loopback options (table_name 'loct');
|
||||||
|
create table locp (a int check (a in (2)), b text);
|
||||||
|
alter table utrtest attach partition remp for values in (1);
|
||||||
|
alter table utrtest attach partition locp for values in (2);
|
||||||
|
|
||||||
|
insert into utrtest values (1, 'foo');
|
||||||
|
insert into utrtest values (2, 'qux');
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM utrtest;
|
||||||
|
select tableoid::regclass, * FROM remp;
|
||||||
|
select tableoid::regclass, * FROM locp;
|
||||||
|
|
||||||
|
-- It's not allowed to move a row from a partition that is foreign to another
|
||||||
|
update utrtest set a = 2 where b = 'foo' returning *;
|
||||||
|
|
||||||
|
-- But the reverse is allowed
|
||||||
|
update utrtest set a = 1 where b = 'qux' returning *;
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM utrtest;
|
||||||
|
select tableoid::regclass, * FROM remp;
|
||||||
|
select tableoid::regclass, * FROM locp;
|
||||||
|
|
||||||
|
-- The executor should not let unexercised FDWs shut down
|
||||||
|
update utrtest set a = 1 where b = 'foo';
|
||||||
|
|
||||||
|
drop table utrtest;
|
||||||
|
drop table loct;
|
||||||
|
|
||||||
|
-- Test copy tuple routing
|
||||||
|
create table ctrtest (a int, b text) partition by list (a);
|
||||||
|
create table loct1 (a int check (a in (1)), b text);
|
||||||
|
create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1');
|
||||||
|
create table loct2 (a int check (a in (2)), b text);
|
||||||
|
create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2');
|
||||||
|
alter table ctrtest attach partition remp1 for values in (1);
|
||||||
|
alter table ctrtest attach partition remp2 for values in (2);
|
||||||
|
|
||||||
|
copy ctrtest from stdin;
|
||||||
|
1 foo
|
||||||
|
2 qux
|
||||||
|
\.
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM ctrtest;
|
||||||
|
select tableoid::regclass, * FROM remp1;
|
||||||
|
select tableoid::regclass, * FROM remp2;
|
||||||
|
|
||||||
|
-- Copying into foreign partitions directly should work as well
|
||||||
|
copy remp1 from stdin;
|
||||||
|
1 bar
|
||||||
|
\.
|
||||||
|
|
||||||
|
select tableoid::regclass, * FROM remp1;
|
||||||
|
|
||||||
|
drop table ctrtest;
|
||||||
|
drop table loct1;
|
||||||
|
drop table loct2;
|
||||||
|
|
||||||
|
-- ===================================================================
|
||||||
|
-- test COPY FROM
|
||||||
|
-- ===================================================================
|
||||||
|
|
||||||
|
create table loc2 (f1 int, f2 text);
|
||||||
|
alter table loc2 set (autovacuum_enabled = 'false');
|
||||||
|
create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2');
|
||||||
|
|
||||||
|
-- Test basic functionality
|
||||||
|
copy rem2 from stdin;
|
||||||
|
1 foo
|
||||||
|
2 bar
|
||||||
|
\.
|
||||||
|
select * from rem2;
|
||||||
|
|
||||||
|
delete from rem2;
|
||||||
|
|
||||||
|
-- Test check constraints
|
||||||
|
alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
|
||||||
|
alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
|
||||||
|
|
||||||
|
-- check constraint is enforced on the remote side, not locally
|
||||||
|
copy rem2 from stdin;
|
||||||
|
1 foo
|
||||||
|
2 bar
|
||||||
|
\.
|
||||||
|
copy rem2 from stdin; -- ERROR
|
||||||
|
-1 xyzzy
|
||||||
|
\.
|
||||||
|
select * from rem2;
|
||||||
|
|
||||||
|
alter foreign table rem2 drop constraint rem2_f1positive;
|
||||||
|
alter table loc2 drop constraint loc2_f1positive;
|
||||||
|
|
||||||
|
delete from rem2;
|
||||||
|
|
||||||
|
-- Test local triggers
|
||||||
|
create trigger trig_stmt_before before insert on rem2
|
||||||
|
for each statement execute procedure trigger_func();
|
||||||
|
create trigger trig_stmt_after after insert on rem2
|
||||||
|
for each statement execute procedure trigger_func();
|
||||||
|
create trigger trig_row_before before insert on rem2
|
||||||
|
for each row execute procedure trigger_data(23,'skidoo');
|
||||||
|
create trigger trig_row_after after insert on rem2
|
||||||
|
for each row execute procedure trigger_data(23,'skidoo');
|
||||||
|
|
||||||
|
copy rem2 from stdin;
|
||||||
|
1 foo
|
||||||
|
2 bar
|
||||||
|
\.
|
||||||
|
select * from rem2;
|
||||||
|
|
||||||
|
drop trigger trig_row_before on rem2;
|
||||||
|
drop trigger trig_row_after on rem2;
|
||||||
|
drop trigger trig_stmt_before on rem2;
|
||||||
|
drop trigger trig_stmt_after on rem2;
|
||||||
|
|
||||||
|
delete from rem2;
|
||||||
|
|
||||||
|
create trigger trig_row_before_insert before insert on rem2
|
||||||
|
for each row execute procedure trig_row_before_insupdate();
|
||||||
|
|
||||||
|
-- The new values are concatenated with ' triggered !'
|
||||||
|
copy rem2 from stdin;
|
||||||
|
1 foo
|
||||||
|
2 bar
|
||||||
|
\.
|
||||||
|
select * from rem2;
|
||||||
|
|
||||||
|
drop trigger trig_row_before_insert on rem2;
|
||||||
|
|
||||||
|
delete from rem2;
|
||||||
|
|
||||||
|
create trigger trig_null before insert on rem2
|
||||||
|
for each row execute procedure trig_null();
|
||||||
|
|
||||||
|
-- Nothing happens
|
||||||
|
copy rem2 from stdin;
|
||||||
|
1 foo
|
||||||
|
2 bar
|
||||||
|
\.
|
||||||
|
select * from rem2;
|
||||||
|
|
||||||
|
drop trigger trig_null on rem2;
|
||||||
|
|
||||||
|
delete from rem2;
|
||||||
|
|
||||||
|
-- Test remote triggers
|
||||||
|
create trigger trig_row_before_insert before insert on loc2
|
||||||
|
for each row execute procedure trig_row_before_insupdate();
|
||||||
|
|
||||||
|
-- The new values are concatenated with ' triggered !'
|
||||||
|
copy rem2 from stdin;
|
||||||
|
1 foo
|
||||||
|
2 bar
|
||||||
|
\.
|
||||||
|
select * from rem2;
|
||||||
|
|
||||||
|
drop trigger trig_row_before_insert on loc2;
|
||||||
|
|
||||||
|
delete from rem2;
|
||||||
|
|
||||||
|
create trigger trig_null before insert on loc2
|
||||||
|
for each row execute procedure trig_null();
|
||||||
|
|
||||||
|
-- Nothing happens
|
||||||
|
copy rem2 from stdin;
|
||||||
|
1 foo
|
||||||
|
2 bar
|
||||||
|
\.
|
||||||
|
select * from rem2;
|
||||||
|
|
||||||
|
drop trigger trig_null on loc2;
|
||||||
|
|
||||||
|
delete from rem2;
|
||||||
|
|
||||||
|
-- Test a combination of local and remote triggers
|
||||||
|
create trigger rem2_trig_row_before before insert on rem2
|
||||||
|
for each row execute procedure trigger_data(23,'skidoo');
|
||||||
|
create trigger rem2_trig_row_after after insert on rem2
|
||||||
|
for each row execute procedure trigger_data(23,'skidoo');
|
||||||
|
create trigger loc2_trig_row_before_insert before insert on loc2
|
||||||
|
for each row execute procedure trig_row_before_insupdate();
|
||||||
|
|
||||||
|
copy rem2 from stdin;
|
||||||
|
1 foo
|
||||||
|
2 bar
|
||||||
|
\.
|
||||||
|
select * from rem2;
|
||||||
|
|
||||||
|
drop trigger rem2_trig_row_before on rem2;
|
||||||
|
drop trigger rem2_trig_row_after on rem2;
|
||||||
|
drop trigger loc2_trig_row_before_insert on loc2;
|
||||||
|
|
||||||
|
delete from rem2;
|
||||||
|
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
-- test IMPORT FOREIGN SCHEMA
|
-- test IMPORT FOREIGN SCHEMA
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
|
@ -3037,11 +3037,9 @@ VALUES ('Albany', NULL, NULL, 'NY');
|
|||||||
</para>
|
</para>
|
||||||
|
|
||||||
<para>
|
<para>
|
||||||
Partitions can also be foreign tables
|
Partitions can also be foreign tables, although they have some limitations
|
||||||
(see <xref linkend="sql-createforeigntable"/>),
|
that normal tables do not; see <xref linkend="sql-createforeigntable"> for
|
||||||
although these have some limitations that normal tables do not. For
|
more information.
|
||||||
example, data inserted into the partitioned table is not routed to
|
|
||||||
foreign table partitions.
|
|
||||||
</para>
|
</para>
|
||||||
|
|
||||||
<para>
|
<para>
|
||||||
|
@ -694,6 +694,72 @@ EndForeignModify(EState *estate,
|
|||||||
<literal>NULL</literal>, no action is taken during executor shutdown.
|
<literal>NULL</literal>, no action is taken during executor shutdown.
|
||||||
</para>
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
Tuples inserted into a partitioned table by <command>INSERT</command> or
|
||||||
|
<command>COPY FROM</command> are routed to partitions. If an FDW
|
||||||
|
supports routable foreign-table partitions, it should also provide the
|
||||||
|
following callback functions. These functions are also called when
|
||||||
|
<command>COPY FROM</command> is executed on a foreign table.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
<programlisting>
|
||||||
|
void
|
||||||
|
BeginForeignInsert(ModifyTableState *mtstate,
|
||||||
|
ResultRelInfo *rinfo);
|
||||||
|
</programlisting>
|
||||||
|
|
||||||
|
Begin executing an insert operation on a foreign table. This routine is
|
||||||
|
called right before the first tuple is inserted into the foreign table
|
||||||
|
in both cases when it is the partition chosen for tuple routing and the
|
||||||
|
target specified in a <command>COPY FROM</command> command. It should
|
||||||
|
perform any initialization needed prior to the actual insertion.
|
||||||
|
Subsequently, <function>ExecForeignInsert</function> will be called for
|
||||||
|
each tuple to be inserted into the foreign table.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
<literal>mtstate</literal> is the overall state of the
|
||||||
|
<structname>ModifyTable</structname> plan node being executed; global data about
|
||||||
|
the plan and execution state is available via this structure.
|
||||||
|
<literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing
|
||||||
|
the target foreign table. (The <structfield>ri_FdwState</structfield> field of
|
||||||
|
<structname>ResultRelInfo</structname> is available for the FDW to store any
|
||||||
|
private state it needs for this operation.)
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
When this is called by a <command>COPY FROM</command> command, the
|
||||||
|
plan-related global data in <literal>mtstate</literal> is not provided
|
||||||
|
and the <literal>planSlot</literal> parameter of
|
||||||
|
<function>ExecForeignInsert</function> subsequently called for each
|
||||||
|
inserted tuple is <literal>NULL</literal>, whether the foreign table is
|
||||||
|
the partition chosen for tuple routing or the target specified in the
|
||||||
|
command.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
If the <function>BeginForeignInsert</function> pointer is set to
|
||||||
|
<literal>NULL</literal>, no action is taken for the initialization.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
<programlisting>
|
||||||
|
void
|
||||||
|
EndForeignInsert(EState *estate,
|
||||||
|
ResultRelInfo *rinfo);
|
||||||
|
</programlisting>
|
||||||
|
|
||||||
|
End the insert operation and release resources. It is normally not important
|
||||||
|
to release palloc'd memory, but for example open files and connections
|
||||||
|
to remote servers should be cleaned up.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
If the <function>EndForeignInsert</function> pointer is set to
|
||||||
|
<literal>NULL</literal>, no action is taken for the termination.
|
||||||
|
</para>
|
||||||
|
|
||||||
<para>
|
<para>
|
||||||
<programlisting>
|
<programlisting>
|
||||||
int
|
int
|
||||||
|
@ -402,8 +402,9 @@ COPY <replaceable class="parameter">count</replaceable>
|
|||||||
</para>
|
</para>
|
||||||
|
|
||||||
<para>
|
<para>
|
||||||
<command>COPY FROM</command> can be used with plain tables and with views
|
<command>COPY FROM</command> can be used with plain, foreign, or
|
||||||
that have <literal>INSTEAD OF INSERT</literal> triggers.
|
partitioned tables or with views that have
|
||||||
|
<literal>INSTEAD OF INSERT</literal> triggers.
|
||||||
</para>
|
</para>
|
||||||
|
|
||||||
<para>
|
<para>
|
||||||
|
@ -291,6 +291,9 @@ UPDATE <replaceable class="parameter">count</replaceable>
|
|||||||
concurrent <command>UPDATE</command> or <command>DELETE</command> on the
|
concurrent <command>UPDATE</command> or <command>DELETE</command> on the
|
||||||
same row may miss this row. For details see the section
|
same row may miss this row. For details see the section
|
||||||
<xref linkend="ddl-partitioning-declarative-limitations"/>.
|
<xref linkend="ddl-partitioning-declarative-limitations"/>.
|
||||||
|
Currently, rows cannot be moved from a partition that is a
|
||||||
|
foreign table to some other partition, but they can be moved into a foreign
|
||||||
|
table if the foreign data wrapper supports it.
|
||||||
</para>
|
</para>
|
||||||
</refsect1>
|
</refsect1>
|
||||||
|
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
#include "commands/trigger.h"
|
#include "commands/trigger.h"
|
||||||
#include "executor/execPartition.h"
|
#include "executor/execPartition.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
|
#include "foreign/fdwapi.h"
|
||||||
#include "libpq/libpq.h"
|
#include "libpq/libpq.h"
|
||||||
#include "libpq/pqformat.h"
|
#include "libpq/pqformat.h"
|
||||||
#include "mb/pg_wchar.h"
|
#include "mb/pg_wchar.h"
|
||||||
@ -2302,6 +2303,7 @@ CopyFrom(CopyState cstate)
|
|||||||
ResultRelInfo *resultRelInfo;
|
ResultRelInfo *resultRelInfo;
|
||||||
ResultRelInfo *saved_resultRelInfo = NULL;
|
ResultRelInfo *saved_resultRelInfo = NULL;
|
||||||
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
|
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
|
||||||
|
ModifyTableState *mtstate;
|
||||||
ExprContext *econtext;
|
ExprContext *econtext;
|
||||||
TupleTableSlot *myslot;
|
TupleTableSlot *myslot;
|
||||||
MemoryContext oldcontext = CurrentMemoryContext;
|
MemoryContext oldcontext = CurrentMemoryContext;
|
||||||
@ -2323,11 +2325,12 @@ CopyFrom(CopyState cstate)
|
|||||||
Assert(cstate->rel);
|
Assert(cstate->rel);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The target must be a plain relation or have an INSTEAD OF INSERT row
|
* The target must be a plain, foreign, or partitioned relation, or have
|
||||||
* trigger. (Currently, such triggers are only allowed on views, so we
|
* an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
|
||||||
* only hint about them in the view case.)
|
* allowed on views, so we only hint about them in the view case.)
|
||||||
*/
|
*/
|
||||||
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
|
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
|
||||||
|
cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
|
||||||
cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
|
cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
|
||||||
!(cstate->rel->trigdesc &&
|
!(cstate->rel->trigdesc &&
|
||||||
cstate->rel->trigdesc->trig_insert_instead_row))
|
cstate->rel->trigdesc->trig_insert_instead_row))
|
||||||
@ -2343,11 +2346,6 @@ CopyFrom(CopyState cstate)
|
|||||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||||
errmsg("cannot copy to materialized view \"%s\"",
|
errmsg("cannot copy to materialized view \"%s\"",
|
||||||
RelationGetRelationName(cstate->rel))));
|
RelationGetRelationName(cstate->rel))));
|
||||||
else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
||||||
errmsg("cannot copy to foreign table \"%s\"",
|
|
||||||
RelationGetRelationName(cstate->rel))));
|
|
||||||
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
|
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||||
@ -2454,6 +2452,9 @@ CopyFrom(CopyState cstate)
|
|||||||
NULL,
|
NULL,
|
||||||
0);
|
0);
|
||||||
|
|
||||||
|
/* Verify the named relation is a valid target for INSERT */
|
||||||
|
CheckValidResultRel(resultRelInfo, CMD_INSERT);
|
||||||
|
|
||||||
ExecOpenIndices(resultRelInfo, false);
|
ExecOpenIndices(resultRelInfo, false);
|
||||||
|
|
||||||
estate->es_result_relations = resultRelInfo;
|
estate->es_result_relations = resultRelInfo;
|
||||||
@ -2466,6 +2467,21 @@ CopyFrom(CopyState cstate)
|
|||||||
/* Triggers might need a slot as well */
|
/* Triggers might need a slot as well */
|
||||||
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
|
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set up a ModifyTableState so we can let FDW(s) init themselves for
|
||||||
|
* foreign-table result relation(s).
|
||||||
|
*/
|
||||||
|
mtstate = makeNode(ModifyTableState);
|
||||||
|
mtstate->ps.plan = NULL;
|
||||||
|
mtstate->ps.state = estate;
|
||||||
|
mtstate->operation = CMD_INSERT;
|
||||||
|
mtstate->resultRelInfo = estate->es_result_relations;
|
||||||
|
|
||||||
|
if (resultRelInfo->ri_FdwRoutine != NULL &&
|
||||||
|
resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
|
||||||
|
resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
|
||||||
|
resultRelInfo);
|
||||||
|
|
||||||
/* Prepare to catch AFTER triggers. */
|
/* Prepare to catch AFTER triggers. */
|
||||||
AfterTriggerBeginQuery();
|
AfterTriggerBeginQuery();
|
||||||
|
|
||||||
@ -2507,11 +2523,12 @@ CopyFrom(CopyState cstate)
|
|||||||
* expressions. Such triggers or expressions might query the table we're
|
* expressions. Such triggers or expressions might query the table we're
|
||||||
* inserting to, and act differently if the tuples that have already been
|
* inserting to, and act differently if the tuples that have already been
|
||||||
* processed and prepared for insertion are not there. We also can't do
|
* processed and prepared for insertion are not there. We also can't do
|
||||||
* it if the table is partitioned.
|
* it if the table is foreign or partitioned.
|
||||||
*/
|
*/
|
||||||
if ((resultRelInfo->ri_TrigDesc != NULL &&
|
if ((resultRelInfo->ri_TrigDesc != NULL &&
|
||||||
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
|
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
|
||||||
resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
|
resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
|
||||||
|
resultRelInfo->ri_FdwRoutine != NULL ||
|
||||||
cstate->partition_tuple_routing != NULL ||
|
cstate->partition_tuple_routing != NULL ||
|
||||||
cstate->volatile_defexprs)
|
cstate->volatile_defexprs)
|
||||||
{
|
{
|
||||||
@ -2626,19 +2643,13 @@ CopyFrom(CopyState cstate)
|
|||||||
resultRelInfo = proute->partitions[leaf_part_index];
|
resultRelInfo = proute->partitions[leaf_part_index];
|
||||||
if (resultRelInfo == NULL)
|
if (resultRelInfo == NULL)
|
||||||
{
|
{
|
||||||
resultRelInfo = ExecInitPartitionInfo(NULL,
|
resultRelInfo = ExecInitPartitionInfo(mtstate,
|
||||||
saved_resultRelInfo,
|
saved_resultRelInfo,
|
||||||
proute, estate,
|
proute, estate,
|
||||||
leaf_part_index);
|
leaf_part_index);
|
||||||
Assert(resultRelInfo != NULL);
|
Assert(resultRelInfo != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We do not yet have a way to insert into a foreign partition */
|
|
||||||
if (resultRelInfo->ri_FdwRoutine)
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("cannot route inserted tuples to a foreign table")));
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* For ExecInsertIndexTuples() to work on the partition's indexes
|
* For ExecInsertIndexTuples() to work on the partition's indexes
|
||||||
*/
|
*/
|
||||||
@ -2726,9 +2737,13 @@ CopyFrom(CopyState cstate)
|
|||||||
resultRelInfo->ri_TrigDesc->trig_insert_before_row))
|
resultRelInfo->ri_TrigDesc->trig_insert_before_row))
|
||||||
check_partition_constr = false;
|
check_partition_constr = false;
|
||||||
|
|
||||||
/* Check the constraints of the tuple */
|
/*
|
||||||
if (resultRelInfo->ri_RelationDesc->rd_att->constr ||
|
* If the target is a plain table, check the constraints of
|
||||||
check_partition_constr)
|
* the tuple.
|
||||||
|
*/
|
||||||
|
if (resultRelInfo->ri_FdwRoutine == NULL &&
|
||||||
|
(resultRelInfo->ri_RelationDesc->rd_att->constr ||
|
||||||
|
check_partition_constr))
|
||||||
ExecConstraints(resultRelInfo, slot, estate, true);
|
ExecConstraints(resultRelInfo, slot, estate, true);
|
||||||
|
|
||||||
if (useHeapMultiInsert)
|
if (useHeapMultiInsert)
|
||||||
@ -2760,10 +2775,32 @@ CopyFrom(CopyState cstate)
|
|||||||
{
|
{
|
||||||
List *recheckIndexes = NIL;
|
List *recheckIndexes = NIL;
|
||||||
|
|
||||||
/* OK, store the tuple and create index entries for it */
|
/* OK, store the tuple */
|
||||||
heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
|
if (resultRelInfo->ri_FdwRoutine != NULL)
|
||||||
hi_options, bistate);
|
{
|
||||||
|
slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
|
||||||
|
resultRelInfo,
|
||||||
|
slot,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if (slot == NULL) /* "do nothing" */
|
||||||
|
goto next_tuple;
|
||||||
|
|
||||||
|
/* FDW might have changed tuple */
|
||||||
|
tuple = ExecMaterializeSlot(slot);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AFTER ROW Triggers might reference the tableoid
|
||||||
|
* column, so initialize t_tableOid before evaluating
|
||||||
|
* them.
|
||||||
|
*/
|
||||||
|
tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
|
||||||
|
mycid, hi_options, bistate);
|
||||||
|
|
||||||
|
/* And create index entries for it */
|
||||||
if (resultRelInfo->ri_NumIndices > 0)
|
if (resultRelInfo->ri_NumIndices > 0)
|
||||||
recheckIndexes = ExecInsertIndexTuples(slot,
|
recheckIndexes = ExecInsertIndexTuples(slot,
|
||||||
&(tuple->t_self),
|
&(tuple->t_self),
|
||||||
@ -2781,13 +2818,14 @@ CopyFrom(CopyState cstate)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We count only tuples not suppressed by a BEFORE INSERT trigger;
|
* We count only tuples not suppressed by a BEFORE INSERT trigger
|
||||||
* this is the same definition used by execMain.c for counting
|
* or FDW; this is the same definition used by nodeModifyTable.c
|
||||||
* tuples inserted by an INSERT command.
|
* for counting tuples inserted by an INSERT command.
|
||||||
*/
|
*/
|
||||||
processed++;
|
processed++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
next_tuple:
|
||||||
/* Restore the saved ResultRelInfo */
|
/* Restore the saved ResultRelInfo */
|
||||||
if (saved_resultRelInfo)
|
if (saved_resultRelInfo)
|
||||||
{
|
{
|
||||||
@ -2828,11 +2866,17 @@ CopyFrom(CopyState cstate)
|
|||||||
|
|
||||||
ExecResetTupleTable(estate->es_tupleTable, false);
|
ExecResetTupleTable(estate->es_tupleTable, false);
|
||||||
|
|
||||||
|
/* Allow the FDW to shut down */
|
||||||
|
if (resultRelInfo->ri_FdwRoutine != NULL &&
|
||||||
|
resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
|
||||||
|
resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
|
||||||
|
resultRelInfo);
|
||||||
|
|
||||||
ExecCloseIndices(resultRelInfo);
|
ExecCloseIndices(resultRelInfo);
|
||||||
|
|
||||||
/* Close all the partitioned tables, leaf partitions, and their indices */
|
/* Close all the partitioned tables, leaf partitions, and their indices */
|
||||||
if (cstate->partition_tuple_routing)
|
if (cstate->partition_tuple_routing)
|
||||||
ExecCleanupTupleRouting(cstate->partition_tuple_routing);
|
ExecCleanupTupleRouting(mtstate, cstate->partition_tuple_routing);
|
||||||
|
|
||||||
/* Close any trigger target relations */
|
/* Close any trigger target relations */
|
||||||
ExecCleanUpTriggerState(estate);
|
ExecCleanUpTriggerState(estate);
|
||||||
|
@ -1179,13 +1179,6 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation)
|
|||||||
switch (operation)
|
switch (operation)
|
||||||
{
|
{
|
||||||
case CMD_INSERT:
|
case CMD_INSERT:
|
||||||
|
|
||||||
/*
|
|
||||||
* If foreign partition to do tuple-routing for, skip the
|
|
||||||
* check; it's disallowed elsewhere.
|
|
||||||
*/
|
|
||||||
if (resultRelInfo->ri_PartitionRoot)
|
|
||||||
break;
|
|
||||||
if (fdwroutine->ExecForeignInsert == NULL)
|
if (fdwroutine->ExecForeignInsert == NULL)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
@ -1378,6 +1371,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
|
|||||||
|
|
||||||
resultRelInfo->ri_PartitionCheck = partition_check;
|
resultRelInfo->ri_PartitionCheck = partition_check;
|
||||||
resultRelInfo->ri_PartitionRoot = partition_root;
|
resultRelInfo->ri_PartitionRoot = partition_root;
|
||||||
|
resultRelInfo->ri_PartitionReadyForRouting = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "executor/execPartition.h"
|
#include "executor/execPartition.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
|
#include "foreign/fdwapi.h"
|
||||||
#include "mb/pg_wchar.h"
|
#include "mb/pg_wchar.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
@ -55,12 +56,13 @@ static List *adjust_partition_tlist(List *tlist, TupleConversionMap *map);
|
|||||||
* see ExecInitPartitionInfo. However, if the function is invoked for update
|
* see ExecInitPartitionInfo. However, if the function is invoked for update
|
||||||
* tuple routing, caller would already have initialized ResultRelInfo's for
|
* tuple routing, caller would already have initialized ResultRelInfo's for
|
||||||
* some of the partitions, which are reused and assigned to their respective
|
* some of the partitions, which are reused and assigned to their respective
|
||||||
* slot in the aforementioned array.
|
* slot in the aforementioned array. For such partitions, we delay setting
|
||||||
|
* up objects such as TupleConversionMap until those are actually chosen as
|
||||||
|
* the partitions to route tuples to. See ExecPrepareTupleRouting.
|
||||||
*/
|
*/
|
||||||
PartitionTupleRouting *
|
PartitionTupleRouting *
|
||||||
ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
|
ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
|
||||||
{
|
{
|
||||||
TupleDesc tupDesc = RelationGetDescr(rel);
|
|
||||||
List *leaf_parts;
|
List *leaf_parts;
|
||||||
ListCell *cell;
|
ListCell *cell;
|
||||||
int i;
|
int i;
|
||||||
@ -141,11 +143,7 @@ ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
|
|||||||
if (update_rri_index < num_update_rri &&
|
if (update_rri_index < num_update_rri &&
|
||||||
RelationGetRelid(update_rri[update_rri_index].ri_RelationDesc) == leaf_oid)
|
RelationGetRelid(update_rri[update_rri_index].ri_RelationDesc) == leaf_oid)
|
||||||
{
|
{
|
||||||
Relation partrel;
|
|
||||||
TupleDesc part_tupdesc;
|
|
||||||
|
|
||||||
leaf_part_rri = &update_rri[update_rri_index];
|
leaf_part_rri = &update_rri[update_rri_index];
|
||||||
partrel = leaf_part_rri->ri_RelationDesc;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This is required in order to convert the partition's tuple to
|
* This is required in order to convert the partition's tuple to
|
||||||
@ -159,23 +157,6 @@ ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel)
|
|||||||
proute->subplan_partition_offsets[update_rri_index] = i;
|
proute->subplan_partition_offsets[update_rri_index] = i;
|
||||||
|
|
||||||
update_rri_index++;
|
update_rri_index++;
|
||||||
|
|
||||||
part_tupdesc = RelationGetDescr(partrel);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Save a tuple conversion map to convert a tuple routed to this
|
|
||||||
* partition from the parent's type to the partition's.
|
|
||||||
*/
|
|
||||||
proute->parent_child_tupconv_maps[i] =
|
|
||||||
convert_tuples_by_name(tupDesc, part_tupdesc,
|
|
||||||
gettext_noop("could not convert row type"));
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Verify result relation is a valid target for an INSERT. An
|
|
||||||
* UPDATE of a partition-key becomes a DELETE+INSERT operation, so
|
|
||||||
* this check is required even when the operation is CMD_UPDATE.
|
|
||||||
*/
|
|
||||||
CheckValidResultRel(leaf_part_rri, CMD_INSERT);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
proute->partitions[i] = leaf_part_rri;
|
proute->partitions[i] = leaf_part_rri;
|
||||||
@ -347,10 +328,10 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
|||||||
PartitionTupleRouting *proute,
|
PartitionTupleRouting *proute,
|
||||||
EState *estate, int partidx)
|
EState *estate, int partidx)
|
||||||
{
|
{
|
||||||
|
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
|
||||||
Relation rootrel = resultRelInfo->ri_RelationDesc,
|
Relation rootrel = resultRelInfo->ri_RelationDesc,
|
||||||
partrel;
|
partrel;
|
||||||
ResultRelInfo *leaf_part_rri;
|
ResultRelInfo *leaf_part_rri;
|
||||||
ModifyTable *node = mtstate ? (ModifyTable *) mtstate->ps.plan : NULL;
|
|
||||||
MemoryContext oldContext;
|
MemoryContext oldContext;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -374,13 +355,6 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
|||||||
|
|
||||||
leaf_part_rri->ri_PartitionLeafIndex = partidx;
|
leaf_part_rri->ri_PartitionLeafIndex = partidx;
|
||||||
|
|
||||||
/*
|
|
||||||
* Verify result relation is a valid target for an INSERT. An UPDATE of a
|
|
||||||
* partition-key becomes a DELETE+INSERT operation, so this check is still
|
|
||||||
* required when the operation is CMD_UPDATE.
|
|
||||||
*/
|
|
||||||
CheckValidResultRel(leaf_part_rri, CMD_INSERT);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Since we've just initialized this ResultRelInfo, it's not in any list
|
* Since we've just initialized this ResultRelInfo, it's not in any list
|
||||||
* attached to the estate as yet. Add it, so that it can be found later.
|
* attached to the estate as yet. Add it, so that it can be found later.
|
||||||
@ -393,6 +367,9 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
|||||||
lappend(estate->es_tuple_routing_result_relations,
|
lappend(estate->es_tuple_routing_result_relations,
|
||||||
leaf_part_rri);
|
leaf_part_rri);
|
||||||
|
|
||||||
|
/* Set up information needed for routing tuples to this partition. */
|
||||||
|
ExecInitRoutingInfo(mtstate, estate, proute, leaf_part_rri, partidx);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Open partition indices. The user may have asked to check for conflicts
|
* Open partition indices. The user may have asked to check for conflicts
|
||||||
* within this leaf partition and do "nothing" instead of throwing an
|
* within this leaf partition and do "nothing" instead of throwing an
|
||||||
@ -498,6 +475,7 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
|||||||
returningList = map_partition_varattnos(returningList, firstVarno,
|
returningList = map_partition_varattnos(returningList, firstVarno,
|
||||||
partrel, firstResultRel,
|
partrel, firstResultRel,
|
||||||
NULL);
|
NULL);
|
||||||
|
leaf_part_rri->ri_returningList = returningList;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initialize the projection itself.
|
* Initialize the projection itself.
|
||||||
@ -514,15 +492,6 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
|||||||
&mtstate->ps, RelationGetDescr(partrel));
|
&mtstate->ps, RelationGetDescr(partrel));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Save a tuple conversion map to convert a tuple routed to this partition
|
|
||||||
* from the parent's type to the partition's.
|
|
||||||
*/
|
|
||||||
proute->parent_child_tupconv_maps[partidx] =
|
|
||||||
convert_tuples_by_name(RelationGetDescr(rootrel),
|
|
||||||
RelationGetDescr(partrel),
|
|
||||||
gettext_noop("could not convert row type"));
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If there is an ON CONFLICT clause, initialize state for it.
|
* If there is an ON CONFLICT clause, initialize state for it.
|
||||||
*/
|
*/
|
||||||
@ -751,6 +720,50 @@ ExecInitPartitionInfo(ModifyTableState *mtstate,
|
|||||||
return leaf_part_rri;
|
return leaf_part_rri;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecInitRoutingInfo
|
||||||
|
* Set up information needed for routing tuples to a leaf partition if
|
||||||
|
* routable; else abort the operation
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecInitRoutingInfo(ModifyTableState *mtstate,
|
||||||
|
EState *estate,
|
||||||
|
PartitionTupleRouting *proute,
|
||||||
|
ResultRelInfo *partRelInfo,
|
||||||
|
int partidx)
|
||||||
|
{
|
||||||
|
MemoryContext oldContext;
|
||||||
|
|
||||||
|
/* Verify the partition is a valid target for INSERT */
|
||||||
|
CheckValidResultRel(partRelInfo, CMD_INSERT);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Switch into per-query memory context.
|
||||||
|
*/
|
||||||
|
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set up a tuple conversion map to convert a tuple routed to the
|
||||||
|
* partition from the parent's type to the partition's.
|
||||||
|
*/
|
||||||
|
proute->parent_child_tupconv_maps[partidx] =
|
||||||
|
convert_tuples_by_name(RelationGetDescr(partRelInfo->ri_PartitionRoot),
|
||||||
|
RelationGetDescr(partRelInfo->ri_RelationDesc),
|
||||||
|
gettext_noop("could not convert row type"));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the partition is a foreign table, let the FDW init itself for
|
||||||
|
* routing tuples to the partition.
|
||||||
|
*/
|
||||||
|
if (partRelInfo->ri_FdwRoutine != NULL &&
|
||||||
|
partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
|
||||||
|
partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
partRelInfo->ri_PartitionReadyForRouting = true;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExecSetupChildParentMapForLeaf -- Initialize the per-leaf-partition
|
* ExecSetupChildParentMapForLeaf -- Initialize the per-leaf-partition
|
||||||
* child-to-root tuple conversion map array.
|
* child-to-root tuple conversion map array.
|
||||||
@ -853,7 +866,8 @@ ConvertPartitionTupleSlot(TupleConversionMap *map,
|
|||||||
* Close all the partitioned tables, leaf partitions, and their indices.
|
* Close all the partitioned tables, leaf partitions, and their indices.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ExecCleanupTupleRouting(PartitionTupleRouting *proute)
|
ExecCleanupTupleRouting(ModifyTableState *mtstate,
|
||||||
|
PartitionTupleRouting *proute)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
int subplan_index = 0;
|
int subplan_index = 0;
|
||||||
@ -881,6 +895,13 @@ ExecCleanupTupleRouting(PartitionTupleRouting *proute)
|
|||||||
if (resultRelInfo == NULL)
|
if (resultRelInfo == NULL)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
/* Allow any FDWs to shut down if they've been exercised */
|
||||||
|
if (resultRelInfo->ri_PartitionReadyForRouting &&
|
||||||
|
resultRelInfo->ri_FdwRoutine != NULL &&
|
||||||
|
resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
|
||||||
|
resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state,
|
||||||
|
resultRelInfo);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If this result rel is one of the UPDATE subplan result rels, let
|
* If this result rel is one of the UPDATE subplan result rels, let
|
||||||
* ExecEndPlan() close it. For INSERT or COPY,
|
* ExecEndPlan() close it. For INSERT or COPY,
|
||||||
|
@ -1826,11 +1826,21 @@ ExecPrepareTupleRouting(ModifyTableState *mtstate,
|
|||||||
proute, estate,
|
proute, estate,
|
||||||
partidx);
|
partidx);
|
||||||
|
|
||||||
/* We do not yet have a way to insert into a foreign partition */
|
/*
|
||||||
if (partrel->ri_FdwRoutine)
|
* Set up information needed for routing tuples to the partition if we
|
||||||
ereport(ERROR,
|
* didn't yet (ExecInitRoutingInfo would abort the operation if the
|
||||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
* partition isn't routable).
|
||||||
errmsg("cannot route inserted tuples to a foreign table")));
|
*
|
||||||
|
* Note: an UPDATE of a partition key invokes an INSERT that moves the
|
||||||
|
* tuple to a new partition. This setup would be needed for a subplan
|
||||||
|
* partition of such an UPDATE that is chosen as the partition to route
|
||||||
|
* the tuple to. The reason we do this setup here rather than in
|
||||||
|
* ExecSetupPartitionTupleRouting is to avoid aborting such an UPDATE
|
||||||
|
* unnecessarily due to non-routable subplan partitions that may not be
|
||||||
|
* chosen for update tuple movement after all.
|
||||||
|
*/
|
||||||
|
if (!partrel->ri_PartitionReadyForRouting)
|
||||||
|
ExecInitRoutingInfo(mtstate, estate, proute, partrel, partidx);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make it look like we are inserting into the partition.
|
* Make it look like we are inserting into the partition.
|
||||||
@ -2531,6 +2541,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
|
|||||||
{
|
{
|
||||||
List *rlist = (List *) lfirst(l);
|
List *rlist = (List *) lfirst(l);
|
||||||
|
|
||||||
|
resultRelInfo->ri_returningList = rlist;
|
||||||
resultRelInfo->ri_projectReturning =
|
resultRelInfo->ri_projectReturning =
|
||||||
ExecBuildProjectionInfo(rlist, econtext, slot, &mtstate->ps,
|
ExecBuildProjectionInfo(rlist, econtext, slot, &mtstate->ps,
|
||||||
resultRelInfo->ri_RelationDesc->rd_att);
|
resultRelInfo->ri_RelationDesc->rd_att);
|
||||||
@ -2830,7 +2841,7 @@ ExecEndModifyTable(ModifyTableState *node)
|
|||||||
|
|
||||||
/* Close all the partitioned tables, leaf partitions, and their indices */
|
/* Close all the partitioned tables, leaf partitions, and their indices */
|
||||||
if (node->mt_partition_tuple_routing)
|
if (node->mt_partition_tuple_routing)
|
||||||
ExecCleanupTupleRouting(node->mt_partition_tuple_routing);
|
ExecCleanupTupleRouting(node, node->mt_partition_tuple_routing);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Free the exprcontext
|
* Free the exprcontext
|
||||||
|
@ -119,6 +119,11 @@ extern ResultRelInfo *ExecInitPartitionInfo(ModifyTableState *mtstate,
|
|||||||
ResultRelInfo *resultRelInfo,
|
ResultRelInfo *resultRelInfo,
|
||||||
PartitionTupleRouting *proute,
|
PartitionTupleRouting *proute,
|
||||||
EState *estate, int partidx);
|
EState *estate, int partidx);
|
||||||
|
extern void ExecInitRoutingInfo(ModifyTableState *mtstate,
|
||||||
|
EState *estate,
|
||||||
|
PartitionTupleRouting *proute,
|
||||||
|
ResultRelInfo *partRelInfo,
|
||||||
|
int partidx);
|
||||||
extern void ExecSetupChildParentMapForLeaf(PartitionTupleRouting *proute);
|
extern void ExecSetupChildParentMapForLeaf(PartitionTupleRouting *proute);
|
||||||
extern TupleConversionMap *TupConvMapForLeaf(PartitionTupleRouting *proute,
|
extern TupleConversionMap *TupConvMapForLeaf(PartitionTupleRouting *proute,
|
||||||
ResultRelInfo *rootRelInfo, int leaf_index);
|
ResultRelInfo *rootRelInfo, int leaf_index);
|
||||||
@ -126,6 +131,7 @@ extern HeapTuple ConvertPartitionTupleSlot(TupleConversionMap *map,
|
|||||||
HeapTuple tuple,
|
HeapTuple tuple,
|
||||||
TupleTableSlot *new_slot,
|
TupleTableSlot *new_slot,
|
||||||
TupleTableSlot **p_my_slot);
|
TupleTableSlot **p_my_slot);
|
||||||
extern void ExecCleanupTupleRouting(PartitionTupleRouting *proute);
|
extern void ExecCleanupTupleRouting(ModifyTableState *mtstate,
|
||||||
|
PartitionTupleRouting *proute);
|
||||||
|
|
||||||
#endif /* EXECPARTITION_H */
|
#endif /* EXECPARTITION_H */
|
||||||
|
@ -98,6 +98,12 @@ typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate,
|
|||||||
typedef void (*EndForeignModify_function) (EState *estate,
|
typedef void (*EndForeignModify_function) (EState *estate,
|
||||||
ResultRelInfo *rinfo);
|
ResultRelInfo *rinfo);
|
||||||
|
|
||||||
|
typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate,
|
||||||
|
ResultRelInfo *rinfo);
|
||||||
|
|
||||||
|
typedef void (*EndForeignInsert_function) (EState *estate,
|
||||||
|
ResultRelInfo *rinfo);
|
||||||
|
|
||||||
typedef int (*IsForeignRelUpdatable_function) (Relation rel);
|
typedef int (*IsForeignRelUpdatable_function) (Relation rel);
|
||||||
|
|
||||||
typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
|
typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
|
||||||
@ -205,6 +211,8 @@ typedef struct FdwRoutine
|
|||||||
ExecForeignUpdate_function ExecForeignUpdate;
|
ExecForeignUpdate_function ExecForeignUpdate;
|
||||||
ExecForeignDelete_function ExecForeignDelete;
|
ExecForeignDelete_function ExecForeignDelete;
|
||||||
EndForeignModify_function EndForeignModify;
|
EndForeignModify_function EndForeignModify;
|
||||||
|
BeginForeignInsert_function BeginForeignInsert;
|
||||||
|
EndForeignInsert_function EndForeignInsert;
|
||||||
IsForeignRelUpdatable_function IsForeignRelUpdatable;
|
IsForeignRelUpdatable_function IsForeignRelUpdatable;
|
||||||
PlanDirectModify_function PlanDirectModify;
|
PlanDirectModify_function PlanDirectModify;
|
||||||
BeginDirectModify_function BeginDirectModify;
|
BeginDirectModify_function BeginDirectModify;
|
||||||
|
@ -444,6 +444,9 @@ typedef struct ResultRelInfo
|
|||||||
/* for removing junk attributes from tuples */
|
/* for removing junk attributes from tuples */
|
||||||
JunkFilter *ri_junkFilter;
|
JunkFilter *ri_junkFilter;
|
||||||
|
|
||||||
|
/* list of RETURNING expressions */
|
||||||
|
List *ri_returningList;
|
||||||
|
|
||||||
/* for computing a RETURNING list */
|
/* for computing a RETURNING list */
|
||||||
ProjectionInfo *ri_projectReturning;
|
ProjectionInfo *ri_projectReturning;
|
||||||
|
|
||||||
@ -462,6 +465,9 @@ typedef struct ResultRelInfo
|
|||||||
/* relation descriptor for root partitioned table */
|
/* relation descriptor for root partitioned table */
|
||||||
Relation ri_PartitionRoot;
|
Relation ri_PartitionRoot;
|
||||||
|
|
||||||
|
/* true if ready for tuple routing */
|
||||||
|
bool ri_PartitionReadyForRouting;
|
||||||
|
|
||||||
int ri_PartitionLeafIndex;
|
int ri_PartitionLeafIndex;
|
||||||
/* for running MERGE on this result relation */
|
/* for running MERGE on this result relation */
|
||||||
MergeState *ri_mergeState;
|
MergeState *ri_mergeState;
|
||||||
|
Reference in New Issue
Block a user