mirror of
https://github.com/postgres/postgres.git
synced 2025-11-07 19:06:32 +03:00
Raise error when affecting tuple moved into different partition.
When an update moves a row between partitions (supported since
2f17844104), our normal logic for following update chains in READ
COMMITTED mode doesn't work anymore. Cross partition updates are
modeled as an delete from the old and insert into the new
partition. No ctid chain exists across partitions, and there's no
convenient space to introduce that link.
Not throwing an error in a partitioned context when one would have
been thrown without partitioning is obviously problematic. This commit
introduces infrastructure to detect when a tuple has been moved, not
just plainly deleted. That allows to throw an error when encountering
a deletion that's actually a move, while attempting to following a
ctid chain.
The row deleted as part of a cross partition update is marked by
pointing it's t_ctid to an invalid block, instead of self as a normal
update would. That was deemed to be the least invasive and most
future proof way to represent the knowledge, given how few infomask
bits are there to be recycled (there's also some locking issues with
using infomask bits).
External code following ctid chains should be updated to check for
moved tuples. The most likely consequence of not doing so is a missed
error.
Author: Amul Sul, editorialized by me
Reviewed-By: Amit Kapila, Pavan Deolasee, Andres Freund, Robert Haas
Discussion: http://postgr.es/m/CAAJ_b95PkwojoYfz0bzXU8OokcTVGzN6vYGCNVUukeUDrnF3dw@mail.gmail.com
This commit is contained in:
@@ -2308,6 +2308,7 @@ heap_get_latest_tid(Relation relation,
|
||||
*/
|
||||
if ((tp.t_data->t_infomask & HEAP_XMAX_INVALID) ||
|
||||
HeapTupleHeaderIsOnlyLocked(tp.t_data) ||
|
||||
HeapTupleHeaderIndicatesMovedPartitions(tp.t_data) ||
|
||||
ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid))
|
||||
{
|
||||
UnlockReleaseBuffer(buffer);
|
||||
@@ -3041,6 +3042,8 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
|
||||
* crosscheck - if not InvalidSnapshot, also check tuple against this
|
||||
* wait - true if should wait for any conflicting update to commit/abort
|
||||
* hufd - output parameter, filled in failure cases (see below)
|
||||
* changingPart - true iff the tuple is being moved to another partition
|
||||
* table due to an update of the partition key. Otherwise, false.
|
||||
*
|
||||
* Normal, successful return value is HeapTupleMayBeUpdated, which
|
||||
* actually means we did delete it. Failure return codes are
|
||||
@@ -3056,7 +3059,7 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
|
||||
HTSU_Result
|
||||
heap_delete(Relation relation, ItemPointer tid,
|
||||
CommandId cid, Snapshot crosscheck, bool wait,
|
||||
HeapUpdateFailureData *hufd)
|
||||
HeapUpdateFailureData *hufd, bool changingPart)
|
||||
{
|
||||
HTSU_Result result;
|
||||
TransactionId xid = GetCurrentTransactionId();
|
||||
@@ -3325,6 +3328,10 @@ l1:
|
||||
/* Make sure there is no forward chain link in t_ctid */
|
||||
tp.t_data->t_ctid = tp.t_self;
|
||||
|
||||
/* Signal that this is actually a move into another partition */
|
||||
if (changingPart)
|
||||
HeapTupleHeaderSetMovedPartitions(tp.t_data);
|
||||
|
||||
MarkBufferDirty(buffer);
|
||||
|
||||
/*
|
||||
@@ -3342,7 +3349,11 @@ l1:
|
||||
if (RelationIsAccessibleInLogicalDecoding(relation))
|
||||
log_heap_new_cid(relation, &tp);
|
||||
|
||||
xlrec.flags = all_visible_cleared ? XLH_DELETE_ALL_VISIBLE_CLEARED : 0;
|
||||
xlrec.flags = 0;
|
||||
if (all_visible_cleared)
|
||||
xlrec.flags |= XLH_DELETE_ALL_VISIBLE_CLEARED;
|
||||
if (changingPart)
|
||||
xlrec.flags |= XLH_DELETE_IS_PARTITION_MOVE;
|
||||
xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
|
||||
tp.t_data->t_infomask2);
|
||||
xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
|
||||
@@ -3450,7 +3461,7 @@ simple_heap_delete(Relation relation, ItemPointer tid)
|
||||
result = heap_delete(relation, tid,
|
||||
GetCurrentCommandId(true), InvalidSnapshot,
|
||||
true /* wait for commit */ ,
|
||||
&hufd);
|
||||
&hufd, false /* changingPart */);
|
||||
switch (result)
|
||||
{
|
||||
case HeapTupleSelfUpdated:
|
||||
@@ -6051,6 +6062,7 @@ l4:
|
||||
next:
|
||||
/* if we find the end of update chain, we're done. */
|
||||
if (mytup.t_data->t_infomask & HEAP_XMAX_INVALID ||
|
||||
HeapTupleHeaderIndicatesMovedPartitions(mytup.t_data) ||
|
||||
ItemPointerEquals(&mytup.t_self, &mytup.t_data->t_ctid) ||
|
||||
HeapTupleHeaderIsOnlyLocked(mytup.t_data))
|
||||
{
|
||||
@@ -6102,7 +6114,12 @@ static HTSU_Result
|
||||
heap_lock_updated_tuple(Relation rel, HeapTuple tuple, ItemPointer ctid,
|
||||
TransactionId xid, LockTupleMode mode)
|
||||
{
|
||||
if (!ItemPointerEquals(&tuple->t_self, ctid))
|
||||
/*
|
||||
* If the tuple has not been updated, or has moved into another partition
|
||||
* (effectively a delete) stop here.
|
||||
*/
|
||||
if (!HeapTupleHeaderIndicatesMovedPartitions(tuple->t_data) &&
|
||||
!ItemPointerEquals(&tuple->t_self, ctid))
|
||||
{
|
||||
/*
|
||||
* If this is the first possibly-multixact-able operation in the
|
||||
@@ -8493,8 +8510,11 @@ heap_xlog_delete(XLogReaderState *record)
|
||||
if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
|
||||
PageClearAllVisible(page);
|
||||
|
||||
/* Make sure there is no forward chain link in t_ctid */
|
||||
htup->t_ctid = target_tid;
|
||||
/* Make sure t_ctid is set correctly */
|
||||
if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE)
|
||||
HeapTupleHeaderSetMovedPartitions(htup);
|
||||
else
|
||||
htup->t_ctid = target_tid;
|
||||
PageSetLSN(page, lsn);
|
||||
MarkBufferDirty(buffer);
|
||||
}
|
||||
@@ -9422,6 +9442,13 @@ heap_mask(char *pagedata, BlockNumber blkno)
|
||||
*/
|
||||
if (HeapTupleHeaderIsSpeculative(page_htup))
|
||||
ItemPointerSet(&page_htup->t_ctid, blkno, off);
|
||||
|
||||
/*
|
||||
* NB: Not ignoring ctid changes due to the tuple having moved
|
||||
* (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's
|
||||
* important information that needs to be in-sync between primary
|
||||
* and standby, and thus is WAL logged.
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -552,6 +552,9 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
|
||||
if (!HeapTupleHeaderIsHotUpdated(htup))
|
||||
break;
|
||||
|
||||
/* HOT implies it can't have moved to different partition */
|
||||
Assert(!HeapTupleHeaderIndicatesMovedPartitions(htup));
|
||||
|
||||
/*
|
||||
* Advance to next chain member.
|
||||
*/
|
||||
@@ -823,6 +826,9 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
|
||||
if (!HeapTupleHeaderIsHotUpdated(htup))
|
||||
break;
|
||||
|
||||
/* HOT implies it can't have moved to different partition */
|
||||
Assert(!HeapTupleHeaderIndicatesMovedPartitions(htup));
|
||||
|
||||
nextoffnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
|
||||
priorXmax = HeapTupleHeaderGetUpdateXid(htup);
|
||||
}
|
||||
|
||||
@@ -424,6 +424,7 @@ rewrite_heap_tuple(RewriteState state,
|
||||
*/
|
||||
if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
|
||||
HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
|
||||
!HeapTupleHeaderIndicatesMovedPartitions(old_tuple->t_data) &&
|
||||
!(ItemPointerEquals(&(old_tuple->t_self),
|
||||
&(old_tuple->t_data->t_ctid))))
|
||||
{
|
||||
|
||||
@@ -3315,6 +3315,11 @@ ltrmark:;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("tuple to be locked was already moved to another partition due to concurrent update")));
|
||||
|
||||
if (!ItemPointerEquals(&hufd.ctid, &tuple.t_self))
|
||||
{
|
||||
/* it was updated, so look at the updated version */
|
||||
|
||||
@@ -2733,6 +2733,10 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("tuple to be locked was already moved to another partition due to concurrent update")));
|
||||
|
||||
/* Should not encounter speculative tuple on recheck */
|
||||
Assert(!HeapTupleHeaderIsSpeculative(tuple.t_data));
|
||||
@@ -2801,6 +2805,14 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
|
||||
* As above, it should be safe to examine xmax and t_ctid without the
|
||||
* buffer content lock, because they can't be changing.
|
||||
*/
|
||||
|
||||
/* check whether next version would be in a different partition */
|
||||
if (HeapTupleHeaderIndicatesMovedPartitions(tuple.t_data))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("tuple to be locked was already moved to another partition due to concurrent update")));
|
||||
|
||||
/* check whether tuple has been deleted */
|
||||
if (ItemPointerEquals(&tuple.t_self, &tuple.t_data->t_ctid))
|
||||
{
|
||||
/* deleted, so forget about it */
|
||||
|
||||
@@ -324,7 +324,8 @@ lmerge_matched:;
|
||||
slot = ExecDelete(mtstate, tupleid, NULL,
|
||||
slot, epqstate, estate,
|
||||
&tuple_deleted, false, &hufd, action,
|
||||
mtstate->canSetTag);
|
||||
mtstate->canSetTag,
|
||||
false /* changingPart */);
|
||||
|
||||
break;
|
||||
|
||||
|
||||
@@ -191,9 +191,14 @@ retry:
|
||||
break;
|
||||
case HeapTupleUpdated:
|
||||
/* XXX: Improve handling here */
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("concurrent update, retrying")));
|
||||
if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
|
||||
else
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("concurrent update, retrying")));
|
||||
goto retry;
|
||||
case HeapTupleInvisible:
|
||||
elog(ERROR, "attempted to lock invisible tuple");
|
||||
@@ -349,9 +354,14 @@ retry:
|
||||
break;
|
||||
case HeapTupleUpdated:
|
||||
/* XXX: Improve handling here */
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("concurrent update, retrying")));
|
||||
if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
|
||||
else
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("concurrent update, retrying")));
|
||||
goto retry;
|
||||
case HeapTupleInvisible:
|
||||
elog(ERROR, "attempted to lock invisible tuple");
|
||||
|
||||
@@ -218,6 +218,11 @@ lnext:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("tuple to be locked was already moved to another partition due to concurrent update")));
|
||||
|
||||
if (ItemPointerEquals(&hufd.ctid, &tuple.t_self))
|
||||
{
|
||||
/* Tuple was deleted, so don't return it */
|
||||
|
||||
@@ -645,7 +645,8 @@ ExecDelete(ModifyTableState *mtstate,
|
||||
bool processReturning,
|
||||
HeapUpdateFailureData *hufdp,
|
||||
MergeActionState *actionState,
|
||||
bool canSetTag)
|
||||
bool canSetTag,
|
||||
bool changingPart)
|
||||
{
|
||||
ResultRelInfo *resultRelInfo;
|
||||
Relation resultRelationDesc;
|
||||
@@ -744,7 +745,8 @@ ldelete:;
|
||||
estate->es_output_cid,
|
||||
estate->es_crosscheck_snapshot,
|
||||
true /* wait for commit */ ,
|
||||
&hufd);
|
||||
&hufd,
|
||||
changingPart);
|
||||
|
||||
/*
|
||||
* Copy the necessary information, if the caller has asked for it. We
|
||||
@@ -803,6 +805,10 @@ ldelete:;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("tuple to be deleted was already moved to another partition due to concurrent update")));
|
||||
|
||||
if (!ItemPointerEquals(tupleid, &hufd.ctid))
|
||||
{
|
||||
@@ -1157,7 +1163,7 @@ lreplace:;
|
||||
*/
|
||||
ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate,
|
||||
estate, &tuple_deleted, false, hufdp, NULL,
|
||||
false);
|
||||
false /* canSetTag */, true /* changingPart */);
|
||||
|
||||
/*
|
||||
* For some reason if DELETE didn't happen (e.g. trigger prevented
|
||||
@@ -1333,6 +1339,10 @@ lreplace:;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("tuple to be updated was already moved to another partition due to concurrent update")));
|
||||
|
||||
if (!ItemPointerEquals(tupleid, &hufd.ctid))
|
||||
{
|
||||
@@ -1522,6 +1532,14 @@ ExecOnConflictUpdate(ModifyTableState *mtstate,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("could not serialize access due to concurrent update")));
|
||||
|
||||
/*
|
||||
* As long as we don't support an UPDATE of INSERT ON CONFLICT for
|
||||
* a partitioned table we shouldn't reach to a case where tuple to
|
||||
* be lock is moved to another partition due to concurrent update
|
||||
* of the partition key.
|
||||
*/
|
||||
Assert(!ItemPointerIndicatesMovedPartitions(&hufd.ctid));
|
||||
|
||||
/*
|
||||
* Tell caller to try again from the very start.
|
||||
*
|
||||
@@ -2274,7 +2292,8 @@ ExecModifyTable(PlanState *pstate)
|
||||
case CMD_DELETE:
|
||||
slot = ExecDelete(node, tupleid, oldtuple, planSlot,
|
||||
&node->mt_epqstate, estate,
|
||||
NULL, true, NULL, NULL, node->canSetTag);
|
||||
NULL, true, NULL, NULL, node->canSetTag,
|
||||
false /* changingPart */);
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unknown operation");
|
||||
|
||||
Reference in New Issue
Block a user