You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-11-23 17:02:25 +03:00
Resume to-device message queue after resumed sync
This commit is contained in:
@@ -16,9 +16,10 @@ limitations under the License.
|
|||||||
|
|
||||||
import { ToDeviceMessageId } from "./@types/event";
|
import { ToDeviceMessageId } from "./@types/event";
|
||||||
import { logger } from "./logger";
|
import { logger } from "./logger";
|
||||||
import { MatrixError, MatrixClient } from "./matrix";
|
import { MatrixError, MatrixClient, ClientEvent } from "./matrix";
|
||||||
import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage";
|
import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage";
|
||||||
import { MatrixScheduler } from "./scheduler";
|
import { MatrixScheduler } from "./scheduler";
|
||||||
|
import { SyncState } from "./sync";
|
||||||
|
|
||||||
const MAX_BATCH_SIZE = 20;
|
const MAX_BATCH_SIZE = 20;
|
||||||
|
|
||||||
@@ -37,12 +38,14 @@ export class ToDeviceMessageQueue {
|
|||||||
public start(): void {
|
public start(): void {
|
||||||
this.running = true;
|
this.running = true;
|
||||||
this.sendQueue();
|
this.sendQueue();
|
||||||
|
this.client.on(ClientEvent.Sync, this.onResumedSync);
|
||||||
}
|
}
|
||||||
|
|
||||||
public stop(): void {
|
public stop(): void {
|
||||||
this.running = false;
|
this.running = false;
|
||||||
if (this.retryTimeout !== null) clearTimeout(this.retryTimeout);
|
if (this.retryTimeout !== null) clearTimeout(this.retryTimeout);
|
||||||
this.retryTimeout = null;
|
this.retryTimeout = null;
|
||||||
|
this.client.removeListener(ClientEvent.Sync, this.onResumedSync);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async queueBatch(batch: ToDeviceBatch): Promise<void> {
|
public async queueBatch(batch: ToDeviceBatch): Promise<void> {
|
||||||
@@ -132,4 +135,15 @@ export class ToDeviceMessageQueue {
|
|||||||
|
|
||||||
await this.client.sendToDevice(batch.eventType, contentMap, batch.txnId);
|
await this.client.sendToDevice(batch.eventType, contentMap, batch.txnId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listen to sync state changes and automatically resend any pending events
|
||||||
|
* once syncing is resumed
|
||||||
|
*/
|
||||||
|
private onResumedSync = (state: SyncState | null, oldState: SyncState | null): void => {
|
||||||
|
if (state === SyncState.Syncing && oldState !== SyncState.Syncing) {
|
||||||
|
logger.info(`Resuming queue after resumed sync`);
|
||||||
|
this.sendQueue();
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user