1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-27 23:21:58 +03:00

Fix EvalPlanQual rechecking during MERGE.

Under some circumstances, concurrent MERGE operations could lead to
inconsistent results, that varied according the plan chosen. This was
caused by a lack of rowmarks on the source relation, which meant that
EvalPlanQual rechecking was not guaranteed to return the same source
tuples when re-running the join query.

Fix by ensuring that preprocess_rowmarks() sets up PlanRowMarks for
all non-target relations used in MERGE, in the same way that it does
for UPDATE and DELETE.

Per bug #18103. Back-patch to v15, where MERGE was introduced.

Dean Rasheed, reviewed by Richard Guo.

Discussion: https://postgr.es/m/18103-c4386baab8e355e3%40postgresql.org
This commit is contained in:
Dean Rasheed
2023-09-30 10:52:21 +01:00
parent f02154652d
commit 1d5caec221
10 changed files with 237 additions and 38 deletions

View File

@ -26,7 +26,7 @@ unnecessarily (for example, Sort does not rescan its input if no parameters
of the input have changed, since it can just reread its stored sorted data). of the input have changed, since it can just reread its stored sorted data).
For a SELECT, it is only necessary to deliver the top-level result tuples For a SELECT, it is only necessary to deliver the top-level result tuples
to the client. For INSERT/UPDATE/DELETE, the actual table modification to the client. For INSERT/UPDATE/DELETE/MERGE, the actual table modification
operations happen in a top-level ModifyTable plan node. If the query operations happen in a top-level ModifyTable plan node. If the query
includes a RETURNING clause, the ModifyTable node delivers the computed includes a RETURNING clause, the ModifyTable node delivers the computed
RETURNING rows as output, otherwise it returns nothing. Handling INSERT RETURNING rows as output, otherwise it returns nothing. Handling INSERT
@ -353,8 +353,8 @@ EvalPlanQual (READ COMMITTED Update Checking)
For simple SELECTs, the executor need only pay attention to tuples that are For simple SELECTs, the executor need only pay attention to tuples that are
valid according to the snapshot seen by the current transaction (ie, they valid according to the snapshot seen by the current transaction (ie, they
were inserted by a previously committed transaction, and not deleted by any were inserted by a previously committed transaction, and not deleted by any
previously committed transaction). However, for UPDATE and DELETE it is not previously committed transaction). However, for UPDATE, DELETE, and MERGE it
cool to modify or delete a tuple that's been modified by an open or is not cool to modify or delete a tuple that's been modified by an open or
concurrently-committed transaction. If we are running in SERIALIZABLE concurrently-committed transaction. If we are running in SERIALIZABLE
isolation level then we just raise an error when this condition is seen to isolation level then we just raise an error when this condition is seen to
occur. In READ COMMITTED isolation level, we must work a lot harder. occur. In READ COMMITTED isolation level, we must work a lot harder.
@ -378,14 +378,14 @@ we're doing UPDATE). If no tuple is returned, then the modified tuple(s)
fail the quals, so we ignore the current result tuple and continue the fail the quals, so we ignore the current result tuple and continue the
original query. original query.
In UPDATE/DELETE, only the target relation needs to be handled this way. In UPDATE/DELETE/MERGE, only the target relation needs to be handled this way.
In SELECT FOR UPDATE, there may be multiple relations flagged FOR UPDATE, In SELECT FOR UPDATE, there may be multiple relations flagged FOR UPDATE,
so we obtain lock on the current tuple version in each such relation before so we obtain lock on the current tuple version in each such relation before
executing the recheck. executing the recheck.
It is also possible that there are relations in the query that are not It is also possible that there are relations in the query that are not
to be locked (they are neither the UPDATE/DELETE target nor specified to to be locked (they are neither the UPDATE/DELETE/MERGE target nor specified
be locked in SELECT FOR UPDATE/SHARE). When re-running the test query to be locked in SELECT FOR UPDATE/SHARE). When re-running the test query
we want to use the same rows from these relations that were joined to we want to use the same rows from these relations that were joined to
the locked rows. For ordinary relations this can be implemented relatively the locked rows. For ordinary relations this can be implemented relatively
cheaply by including the row TID in the join outputs and re-fetching that cheaply by including the row TID in the join outputs and re-fetching that

View File

@ -4284,9 +4284,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
/* /*
* If we have any secondary relations in an UPDATE or DELETE, they need to * If we have any secondary relations in an UPDATE or DELETE, they need to
* be treated like non-locked relations in SELECT FOR UPDATE, ie, the * be treated like non-locked relations in SELECT FOR UPDATE, i.e., the
* EvalPlanQual mechanism needs to be told about them. Locate the * EvalPlanQual mechanism needs to be told about them. This also goes for
* relevant ExecRowMarks. * the source relations in a MERGE. Locate the relevant ExecRowMarks.
*/ */
arowmarks = NIL; arowmarks = NIL;
foreach(l, node->rowMarks) foreach(l, node->rowMarks)

View File

@ -2262,11 +2262,12 @@ preprocess_rowmarks(PlannerInfo *root)
else else
{ {
/* /*
* We only need rowmarks for UPDATE, DELETE, or FOR [KEY] * We only need rowmarks for UPDATE, DELETE, MERGE, or FOR [KEY]
* UPDATE/SHARE. * UPDATE/SHARE.
*/ */
if (parse->commandType != CMD_UPDATE && if (parse->commandType != CMD_UPDATE &&
parse->commandType != CMD_DELETE) parse->commandType != CMD_DELETE &&
parse->commandType != CMD_MERGE)
return; return;
} }

View File

@ -721,8 +721,8 @@ typedef struct EState
* ExecRowMark - * ExecRowMark -
* runtime representation of FOR [KEY] UPDATE/SHARE clauses * runtime representation of FOR [KEY] UPDATE/SHARE clauses
* *
* When doing UPDATE, DELETE, or SELECT FOR [KEY] UPDATE/SHARE, we will have an * When doing UPDATE/DELETE/MERGE/SELECT FOR [KEY] UPDATE/SHARE, we will have
* ExecRowMark for each non-target relation in the query (except inheritance * an ExecRowMark for each non-target relation in the query (except inheritance
* parent RTEs, which can be ignored at runtime). Virtual relations such as * parent RTEs, which can be ignored at runtime). Virtual relations such as
* subqueries-in-FROM will have an ExecRowMark with relation == NULL. See * subqueries-in-FROM will have an ExecRowMark with relation == NULL. See
* PlanRowMark for details about most of the fields. In addition to fields * PlanRowMark for details about most of the fields. In addition to fields

View File

@ -1309,7 +1309,7 @@ typedef struct Limit
* doing a separate remote query to lock each selected row is usually pretty * doing a separate remote query to lock each selected row is usually pretty
* unappealing, so early locking remains a credible design choice for FDWs. * unappealing, so early locking remains a credible design choice for FDWs.
* *
* When doing UPDATE, DELETE, or SELECT FOR UPDATE/SHARE, we have to uniquely * When doing UPDATE/DELETE/MERGE/SELECT FOR UPDATE/SHARE, we have to uniquely
* identify all the source rows, not only those from the target relations, so * identify all the source rows, not only those from the target relations, so
* that we can perform EvalPlanQual rechecking at need. For plain tables we * that we can perform EvalPlanQual rechecking at need. For plain tables we
* can just fetch the TID, much as for a target relation; this case is * can just fetch the TID, much as for a target relation; this case is
@ -1338,7 +1338,7 @@ typedef enum RowMarkType
* PlanRowMark - * PlanRowMark -
* plan-time representation of FOR [KEY] UPDATE/SHARE clauses * plan-time representation of FOR [KEY] UPDATE/SHARE clauses
* *
* When doing UPDATE, DELETE, or SELECT FOR UPDATE/SHARE, we create a separate * When doing UPDATE/DELETE/MERGE/SELECT FOR UPDATE/SHARE, we create a separate
* PlanRowMark node for each non-target relation in the query. Relations that * PlanRowMark node for each non-target relation in the query. Relations that
* are not specified as FOR UPDATE/SHARE are marked ROW_MARK_REFERENCE (if * are not specified as FOR UPDATE/SHARE are marked ROW_MARK_REFERENCE (if
* regular tables or supported foreign tables) or ROW_MARK_COPY (if not). * regular tables or supported foreign tables) or ROW_MARK_COPY (if not).

View File

@ -0,0 +1,148 @@
Parsed test spec with 2 sessions
starting permutation: b1 m1 s1 c1 b2 m2 s2 c2
step b1: BEGIN ISOLATION LEVEL READ COMMITTED;
step m1: MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
step s1: SELECT * FROM tgt;
id|val
--+---
1| 10
2| 20
3| 30
(3 rows)
step c1: COMMIT;
step b2: BEGIN ISOLATION LEVEL READ COMMITTED;
step m2: MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
step s2: SELECT * FROM tgt;
id|val
--+---
1| 10
2| 20
3| 30
(3 rows)
step c2: COMMIT;
starting permutation: b1 b2 m1 hj ex m2 c1 c2 s1
step b1: BEGIN ISOLATION LEVEL READ COMMITTED;
step b2: BEGIN ISOLATION LEVEL READ COMMITTED;
step m1: MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
step hj: SET LOCAL enable_mergejoin = off; SET LOCAL enable_nestloop = off;
step ex: EXPLAIN (verbose, costs off)
MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
QUERY PLAN
---------------------------------------------------
Merge on public.tgt
-> Hash Left Join
Output: tgt.ctid, src.val, src.id, src.ctid
Inner Unique: true
Hash Cond: (src.id = tgt.id)
-> Seq Scan on public.src
Output: src.val, src.id, src.ctid
-> Hash
Output: tgt.ctid, tgt.id
-> Seq Scan on public.tgt
Output: tgt.ctid, tgt.id
(11 rows)
step m2: MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); <waiting ...>
step c1: COMMIT;
step m2: <... completed>
step c2: COMMIT;
step s1: SELECT * FROM tgt;
id|val
--+---
1| 10
2| 20
3| 30
(3 rows)
starting permutation: b1 b2 m1 mj ex m2 c1 c2 s1
step b1: BEGIN ISOLATION LEVEL READ COMMITTED;
step b2: BEGIN ISOLATION LEVEL READ COMMITTED;
step m1: MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
step mj: SET LOCAL enable_hashjoin = off; SET LOCAL enable_nestloop = off;
step ex: EXPLAIN (verbose, costs off)
MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
QUERY PLAN
---------------------------------------------------
Merge on public.tgt
-> Merge Left Join
Output: tgt.ctid, src.val, src.id, src.ctid
Inner Unique: true
Merge Cond: (src.id = tgt.id)
-> Index Scan using src_pkey on public.src
Output: src.val, src.id, src.ctid
-> Index Scan using tgt_pkey on public.tgt
Output: tgt.ctid, tgt.id
(9 rows)
step m2: MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); <waiting ...>
step c1: COMMIT;
step m2: <... completed>
step c2: COMMIT;
step s1: SELECT * FROM tgt;
id|val
--+---
1| 10
2| 20
3| 30
(3 rows)
starting permutation: b1 b2 m1 nl ex m2 c1 c2 s1
step b1: BEGIN ISOLATION LEVEL READ COMMITTED;
step b2: BEGIN ISOLATION LEVEL READ COMMITTED;
step m1: MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
step nl: SET LOCAL enable_hashjoin = off; SET LOCAL enable_mergejoin = off;
step ex: EXPLAIN (verbose, costs off)
MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val);
QUERY PLAN
---------------------------------------------------
Merge on public.tgt
-> Nested Loop Left Join
Output: tgt.ctid, src.val, src.id, src.ctid
Inner Unique: true
-> Seq Scan on public.src
Output: src.val, src.id, src.ctid
-> Index Scan using tgt_pkey on public.tgt
Output: tgt.ctid, tgt.id
Index Cond: (tgt.id = src.id)
(9 rows)
step m2: MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); <waiting ...>
step c1: COMMIT;
step m2: <... completed>
step c2: COMMIT;
step s1: SELECT * FROM tgt;
id|val
--+---
1| 10
2| 20
3| 30
(3 rows)

View File

@ -51,6 +51,7 @@ test: merge-insert-update
test: merge-delete test: merge-delete
test: merge-update test: merge-update
test: merge-match-recheck test: merge-match-recheck
test: merge-join
test: delete-abort-savept test: delete-abort-savept
test: delete-abort-savept-2 test: delete-abort-savept-2
test: aborted-keyrevoke test: aborted-keyrevoke

View File

@ -0,0 +1,45 @@
# MERGE JOIN
#
# This test checks the EPQ recheck mechanism during MERGE when joining to a
# source table using different join methods, per bug #18103
setup
{
CREATE TABLE src (id int PRIMARY KEY, val int);
CREATE TABLE tgt (id int PRIMARY KEY, val int);
INSERT INTO src SELECT x, x*10 FROM generate_series(1,3) g(x);
INSERT INTO tgt SELECT x, x FROM generate_series(1,3) g(x);
}
teardown
{
DROP TABLE src, tgt;
}
session s1
step b1 { BEGIN ISOLATION LEVEL READ COMMITTED; }
step m1 { MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); }
step s1 { SELECT * FROM tgt; }
step c1 { COMMIT; }
session s2
step b2 { BEGIN ISOLATION LEVEL READ COMMITTED; }
step hj { SET LOCAL enable_mergejoin = off; SET LOCAL enable_nestloop = off; }
step mj { SET LOCAL enable_hashjoin = off; SET LOCAL enable_nestloop = off; }
step nl { SET LOCAL enable_hashjoin = off; SET LOCAL enable_mergejoin = off; }
step ex { EXPLAIN (verbose, costs off)
MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); }
step m2 { MERGE INTO tgt USING src ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET val = src.val
WHEN NOT MATCHED THEN INSERT VALUES (src.id, src.val); }
step s2 { SELECT * FROM tgt; }
step c2 { COMMIT; }
permutation b1 m1 s1 c1 b2 m2 s2 c2
permutation b1 b2 m1 hj ex m2 c1 c2 s1
permutation b1 b2 m1 mj ex m2 c1 c2 s1
permutation b1 b2 m1 nl ex m2 c1 c2 s1

View File

@ -1829,11 +1829,11 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid
Merge on public.pa_target t Merge on public.pa_target t
Merge on public.pa_targetp t_1 Merge on public.pa_targetp t_1
-> Hash Left Join -> Hash Left Join
Output: s.sid, t_1.tableoid, t_1.ctid Output: s.sid, s.ctid, t_1.tableoid, t_1.ctid
Inner Unique: true Inner Unique: true
Hash Cond: (s.sid = t_1.tid) Hash Cond: (s.sid = t_1.tid)
-> Seq Scan on public.pa_source s -> Seq Scan on public.pa_source s
Output: s.sid Output: s.sid, s.ctid
-> Hash -> Hash
Output: t_1.tid, t_1.tableoid, t_1.ctid Output: t_1.tid, t_1.tableoid, t_1.ctid
-> Seq Scan on public.pa_targetp t_1 -> Seq Scan on public.pa_targetp t_1
@ -1859,11 +1859,11 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid
-------------------------------------------- --------------------------------------------
Merge on public.pa_target t Merge on public.pa_target t
-> Hash Left Join -> Hash Left Join
Output: s.sid, t.ctid Output: s.sid, s.ctid, t.ctid
Inner Unique: true Inner Unique: true
Hash Cond: (s.sid = t.tid) Hash Cond: (s.sid = t.tid)
-> Seq Scan on public.pa_source s -> Seq Scan on public.pa_source s
Output: s.sid Output: s.sid, s.ctid
-> Hash -> Hash
Output: t.tid, t.ctid Output: t.tid, t.ctid
-> Result -> Result

View File

@ -3015,28 +3015,30 @@ WITH cte_basic AS MATERIALIZED (SELECT 1 a, 'cte_basic val' b)
MERGE INTO m USING (select 0 k, 'merge source SubPlan' v offset 0) o ON m.k=o.k MERGE INTO m USING (select 0 k, 'merge source SubPlan' v offset 0) o ON m.k=o.k
WHEN MATCHED THEN UPDATE SET v = (SELECT b || ' merge update' FROM cte_basic WHERE cte_basic.a = m.k LIMIT 1) WHEN MATCHED THEN UPDATE SET v = (SELECT b || ' merge update' FROM cte_basic WHERE cte_basic.a = m.k LIMIT 1)
WHEN NOT MATCHED THEN INSERT VALUES(o.k, o.v); WHEN NOT MATCHED THEN INSERT VALUES(o.k, o.v);
QUERY PLAN QUERY PLAN
---------------------------------------------------------------- -------------------------------------------------------------------
Merge on public.m Merge on public.m
CTE cte_basic CTE cte_basic
-> Result -> Result
Output: 1, 'cte_basic val'::text Output: 1, 'cte_basic val'::text
-> Hash Right Join -> Hash Right Join
Output: m.ctid, (0), ('merge source SubPlan'::text) Output: m.ctid, o.k, o.v, o.*
Hash Cond: (m.k = (0)) Hash Cond: (m.k = o.k)
-> Seq Scan on public.m -> Seq Scan on public.m
Output: m.ctid, m.k Output: m.ctid, m.k
-> Hash -> Hash
Output: (0), ('merge source SubPlan'::text) Output: o.k, o.v, o.*
-> Result -> Subquery Scan on o
Output: 0, 'merge source SubPlan'::text Output: o.k, o.v, o.*
-> Result
Output: 0, 'merge source SubPlan'::text
SubPlan 2 SubPlan 2
-> Limit -> Limit
Output: ((cte_basic.b || ' merge update'::text)) Output: ((cte_basic.b || ' merge update'::text))
-> CTE Scan on cte_basic -> CTE Scan on cte_basic
Output: (cte_basic.b || ' merge update'::text) Output: (cte_basic.b || ' merge update'::text)
Filter: (cte_basic.a = m.k) Filter: (cte_basic.a = m.k)
(19 rows) (21 rows)
-- InitPlan -- InitPlan
WITH cte_init AS MATERIALIZED (SELECT 1 a, 'cte_init val' b) WITH cte_init AS MATERIALIZED (SELECT 1 a, 'cte_init val' b)
@ -3056,8 +3058,8 @@ WITH cte_init AS MATERIALIZED (SELECT 1 a, 'cte_init val' b)
MERGE INTO m USING (select 1 k, 'merge source InitPlan' v offset 0) o ON m.k=o.k MERGE INTO m USING (select 1 k, 'merge source InitPlan' v offset 0) o ON m.k=o.k
WHEN MATCHED THEN UPDATE SET v = (SELECT b || ' merge update' FROM cte_init WHERE a = 1 LIMIT 1) WHEN MATCHED THEN UPDATE SET v = (SELECT b || ' merge update' FROM cte_init WHERE a = 1 LIMIT 1)
WHEN NOT MATCHED THEN INSERT VALUES(o.k, o.v); WHEN NOT MATCHED THEN INSERT VALUES(o.k, o.v);
QUERY PLAN QUERY PLAN
--------------------------------------------------------------- --------------------------------------------------------------------
Merge on public.m Merge on public.m
CTE cte_init CTE cte_init
-> Result -> Result
@ -3069,15 +3071,17 @@ WHEN NOT MATCHED THEN INSERT VALUES(o.k, o.v);
Output: (cte_init.b || ' merge update'::text) Output: (cte_init.b || ' merge update'::text)
Filter: (cte_init.a = 1) Filter: (cte_init.a = 1)
-> Hash Right Join -> Hash Right Join
Output: m.ctid, (1), ('merge source InitPlan'::text) Output: m.ctid, o.k, o.v, o.*
Hash Cond: (m.k = (1)) Hash Cond: (m.k = o.k)
-> Seq Scan on public.m -> Seq Scan on public.m
Output: m.ctid, m.k Output: m.ctid, m.k
-> Hash -> Hash
Output: (1), ('merge source InitPlan'::text) Output: o.k, o.v, o.*
-> Result -> Subquery Scan on o
Output: 1, 'merge source InitPlan'::text Output: o.k, o.v, o.*
(19 rows) -> Result
Output: 1, 'merge source InitPlan'::text
(21 rows)
-- MERGE source comes from CTE: -- MERGE source comes from CTE:
WITH merge_source_cte AS MATERIALIZED (SELECT 15 a, 'merge_source_cte val' b) WITH merge_source_cte AS MATERIALIZED (SELECT 15 a, 'merge_source_cte val' b)
@ -3111,14 +3115,14 @@ WHEN NOT MATCHED THEN INSERT VALUES(o.a, o.b || (SELECT merge_source_cte.*::text
-> CTE Scan on merge_source_cte merge_source_cte_2 -> CTE Scan on merge_source_cte merge_source_cte_2
Output: ((merge_source_cte_2.*)::text || ' merge insert'::text) Output: ((merge_source_cte_2.*)::text || ' merge insert'::text)
-> Hash Right Join -> Hash Right Join
Output: m.ctid, merge_source_cte.a, merge_source_cte.b Output: m.ctid, merge_source_cte.a, merge_source_cte.b, merge_source_cte.*
Hash Cond: (m.k = merge_source_cte.a) Hash Cond: (m.k = merge_source_cte.a)
-> Seq Scan on public.m -> Seq Scan on public.m
Output: m.ctid, m.k Output: m.ctid, m.k
-> Hash -> Hash
Output: merge_source_cte.a, merge_source_cte.b Output: merge_source_cte.a, merge_source_cte.b, merge_source_cte.*
-> CTE Scan on merge_source_cte -> CTE Scan on merge_source_cte
Output: merge_source_cte.a, merge_source_cte.b Output: merge_source_cte.a, merge_source_cte.b, merge_source_cte.*
(20 rows) (20 rows)
DROP TABLE m; DROP TABLE m;