mirror of
https://github.com/postgres/postgres.git
synced 2025-05-06 19:59:18 +03:00
Wait between tablesync worker restarts
Before restarting a tablesync worker for the same relation, wait wal_retrieve_retry_interval (currently 5s by default). This avoids restarting failing workers in a tight loop. We keep the last start times in a hash table last_start_times that is separate from the table_states list, because that list is cleared out on syscache invalidation, which happens whenever a table finishes syncing. The hash table is kept until all tables have finished syncing. A future project might be to unify these two and keep everything in one data structure, but for now this is a less invasive change to accomplish the original purpose. For the test suite, set wal_retrieve_retry_interval to its minimum value, to not increase the test suite run time. Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
This commit is contained in:
parent
d981074c24
commit
e3cf708016
@ -245,7 +245,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
|
|||||||
*
|
*
|
||||||
* If there are tables that need synchronizing and are not being synchronized
|
* If there are tables that need synchronizing and are not being synchronized
|
||||||
* yet, start sync workers for them (if there are free slots for sync
|
* yet, start sync workers for them (if there are free slots for sync
|
||||||
* workers).
|
* workers). To prevent starting the sync worker for the same relation at a
|
||||||
|
* high frequency after a failure, we store its last start time with each sync
|
||||||
|
* state info. We start the sync worker for the same relation after waiting
|
||||||
|
* at least wal_retrieve_retry_interval.
|
||||||
*
|
*
|
||||||
* For tables that are being synchronized already, check if sync workers
|
* For tables that are being synchronized already, check if sync workers
|
||||||
* either need action from the apply worker or have finished.
|
* either need action from the apply worker or have finished.
|
||||||
@ -263,7 +266,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
|
|||||||
static void
|
static void
|
||||||
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
|
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
|
||||||
{
|
{
|
||||||
|
struct tablesync_start_time_mapping
|
||||||
|
{
|
||||||
|
Oid relid;
|
||||||
|
TimestampTz last_start_time;
|
||||||
|
};
|
||||||
static List *table_states = NIL;
|
static List *table_states = NIL;
|
||||||
|
static HTAB *last_start_times = NULL;
|
||||||
ListCell *lc;
|
ListCell *lc;
|
||||||
|
|
||||||
Assert(!IsTransactionState());
|
Assert(!IsTransactionState());
|
||||||
@ -300,6 +309,31 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
|
|||||||
table_states_valid = true;
|
table_states_valid = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Prepare hash table for tracking last start times of workers, to avoid
|
||||||
|
* immediate restarts. We don't need it if there are no tables that need
|
||||||
|
* syncing.
|
||||||
|
*/
|
||||||
|
if (table_states && !last_start_times)
|
||||||
|
{
|
||||||
|
HASHCTL ctl;
|
||||||
|
|
||||||
|
memset(&ctl, 0, sizeof(ctl));
|
||||||
|
ctl.keysize = sizeof(Oid);
|
||||||
|
ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
|
||||||
|
last_start_times = hash_create("Logical replication table sync worker start times",
|
||||||
|
256, &ctl, HASH_ELEM | HASH_BLOBS);
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Clean up the hash table when we're done with all tables (just to
|
||||||
|
* release the bit of memory).
|
||||||
|
*/
|
||||||
|
else if (!table_states && last_start_times)
|
||||||
|
{
|
||||||
|
hash_destroy(last_start_times);
|
||||||
|
last_start_times = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Process all tables that are being synchronized. */
|
/* Process all tables that are being synchronized. */
|
||||||
foreach(lc, table_states)
|
foreach(lc, table_states)
|
||||||
{
|
{
|
||||||
@ -403,11 +437,23 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
|
|||||||
*/
|
*/
|
||||||
else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
|
else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
|
||||||
{
|
{
|
||||||
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
|
TimestampTz now = GetCurrentTimestamp();
|
||||||
MySubscription->oid,
|
struct tablesync_start_time_mapping *hentry;
|
||||||
MySubscription->name,
|
bool found;
|
||||||
MyLogicalRepWorker->userid,
|
|
||||||
rstate->relid);
|
hentry = hash_search(last_start_times, &rstate->relid, HASH_ENTER, &found);
|
||||||
|
|
||||||
|
if (!found ||
|
||||||
|
TimestampDifferenceExceeds(hentry->last_start_time, now,
|
||||||
|
wal_retrieve_retry_interval))
|
||||||
|
{
|
||||||
|
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
|
||||||
|
MySubscription->oid,
|
||||||
|
MySubscription->name,
|
||||||
|
MyLogicalRepWorker->userid,
|
||||||
|
rstate->relid);
|
||||||
|
hentry->last_start_time = now;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@ $node_publisher->start;
|
|||||||
# Create subscriber node
|
# Create subscriber node
|
||||||
my $node_subscriber = get_new_node('subscriber');
|
my $node_subscriber = get_new_node('subscriber');
|
||||||
$node_subscriber->init(allows_streaming => 'logical');
|
$node_subscriber->init(allows_streaming => 'logical');
|
||||||
|
$node_subscriber->append_conf('postgresql.conf', "wal_retrieve_retry_interval = 1ms");
|
||||||
$node_subscriber->start;
|
$node_subscriber->start;
|
||||||
|
|
||||||
# Create some preexisting content on publisher
|
# Create some preexisting content on publisher
|
||||||
|
Loading…
x
Reference in New Issue
Block a user