From 071d5e71e49ee0eb87f10461fbe3a9eb83087054 Mon Sep 17 00:00:00 2001 From: Andy Uhnak Date: Wed, 30 Nov 2022 10:24:28 +0000 Subject: [PATCH] Resume to-device message queue after resumed sync --- src/ToDeviceMessageQueue.ts | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/ToDeviceMessageQueue.ts b/src/ToDeviceMessageQueue.ts index e78c46ba2..986eb5a9f 100644 --- a/src/ToDeviceMessageQueue.ts +++ b/src/ToDeviceMessageQueue.ts @@ -16,9 +16,10 @@ limitations under the License. import { ToDeviceMessageId } from "./@types/event"; import { logger } from "./logger"; -import { MatrixError, MatrixClient } from "./matrix"; +import { MatrixError, MatrixClient, ClientEvent } from "./matrix"; import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage"; import { MatrixScheduler } from "./scheduler"; +import { SyncState } from "./sync"; const MAX_BATCH_SIZE = 20; @@ -37,12 +38,14 @@ export class ToDeviceMessageQueue { public start(): void { this.running = true; this.sendQueue(); + this.client.on(ClientEvent.Sync, this.onResumedSync); } public stop(): void { this.running = false; if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); this.retryTimeout = null; + this.client.removeListener(ClientEvent.Sync, this.onResumedSync); } public async queueBatch(batch: ToDeviceBatch): Promise { @@ -132,4 +135,15 @@ export class ToDeviceMessageQueue { await this.client.sendToDevice(batch.eventType, contentMap, batch.txnId); } + + /** + * Listen to sync state changes and automatically resend any pending events + * once syncing is resumed + */ + private onResumedSync = (state: SyncState | null, oldState: SyncState | null): void => { + if (state === SyncState.Syncing && oldState !== SyncState.Syncing) { + logger.info(`Resuming queue after resumed sync`); + this.sendQueue(); + } + }; }