mirror of
https://github.com/postgres/postgres.git
synced 2025-10-28 11:55:03 +03:00
Fix an oversight in 3f28b2fcac.
Commit3f28b2fcactried to ensure that the replication origin shouldn't be advanced in case of an ERROR in the apply worker, so that it can request the same data again after restart. However, it is possible that an ERROR was caught and handled by a (say PL/pgSQL) function, and the apply worker continues to apply further changes, in which case, we shouldn't reset the replication origin. Ensure to reset the origin only when the apply worker exits after an ERROR. Commit3f28b2fcacadded new function geterrlevel, which we removed in HEAD as part of this commit, but kept it in backbranches to avoid breaking any applications. A separate case can be made to have such a function even for HEAD. Reported-by: Shawn McCoy <shawn.the.mccoy@gmail.com> Author: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: vignesh C <vignesh21@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Backpatch-through: 16, where it was introduced Discussion: https://postgr.es/m/CALsgZNCGARa2mcYNVTSj9uoPcJo-tPuWUGECReKpNgTpo31_Pw@mail.gmail.com
This commit is contained in:
@@ -414,6 +414,8 @@ static inline void reset_apply_error_context_info(void);
|
||||
static TransApplyAction get_transaction_apply_action(TransactionId xid,
|
||||
ParallelApplyWorkerInfo **winfo);
|
||||
|
||||
static void replorigin_reset(int code, Datum arg);
|
||||
|
||||
/*
|
||||
* Form the origin name for the subscription.
|
||||
*
|
||||
@@ -4516,6 +4518,14 @@ start_apply(XLogRecPtr origin_startpos)
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/*
|
||||
* Reset the origin state to prevent the advancement of origin
|
||||
* progress if we fail to apply. Otherwise, this will result in
|
||||
* transaction loss as that transaction won't be sent again by the
|
||||
* server.
|
||||
*/
|
||||
replorigin_reset(0, (Datum) 0);
|
||||
|
||||
if (MySubscription->disableonerr)
|
||||
DisableSubscriptionAndExit();
|
||||
else
|
||||
@@ -5004,23 +5014,12 @@ void
|
||||
apply_error_callback(void *arg)
|
||||
{
|
||||
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
|
||||
int elevel;
|
||||
|
||||
if (apply_error_callback_arg.command == 0)
|
||||
return;
|
||||
|
||||
Assert(errarg->origin_name);
|
||||
|
||||
elevel = geterrlevel();
|
||||
|
||||
/*
|
||||
* Reset the origin state to prevent the advancement of origin progress if
|
||||
* we fail to apply. Otherwise, this will result in transaction loss as
|
||||
* that transaction won't be sent again by the server.
|
||||
*/
|
||||
if (elevel >= ERROR)
|
||||
replorigin_reset(0, (Datum) 0);
|
||||
|
||||
if (errarg->rel == NULL)
|
||||
{
|
||||
if (!TransactionIdIsValid(errarg->remote_xid))
|
||||
|
||||
@@ -1590,23 +1590,6 @@ geterrcode(void)
|
||||
return edata->sqlerrcode;
|
||||
}
|
||||
|
||||
/*
|
||||
* geterrlevel --- return the currently set error level
|
||||
*
|
||||
* This is only intended for use in error callback subroutines, since there
|
||||
* is no other place outside elog.c where the concept is meaningful.
|
||||
*/
|
||||
int
|
||||
geterrlevel(void)
|
||||
{
|
||||
ErrorData *edata = &errordata[errordata_stack_depth];
|
||||
|
||||
/* we don't bother incrementing recursion_depth */
|
||||
CHECK_STACK_DEPTH();
|
||||
|
||||
return edata->elevel;
|
||||
}
|
||||
|
||||
/*
|
||||
* geterrposition --- return the currently set error position (0 if none)
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user