diff --git a/spec/unit/ToDeviceMessageQueue.spec.ts b/spec/unit/ToDeviceMessageQueue.spec.ts new file mode 100644 index 000000000..8752331c7 --- /dev/null +++ b/spec/unit/ToDeviceMessageQueue.spec.ts @@ -0,0 +1,106 @@ +import { ConnectionError } from "../../src/http-api/errors"; +import { ClientEvent, MatrixClient, Store } from "../../src/client"; +import { ToDeviceMessageQueue } from "../../src/ToDeviceMessageQueue"; +import { getMockClientWithEventEmitter } from "../test-utils/client"; +import { StubStore } from "../../src/store/stub"; +import { IndexedToDeviceBatch } from "../../src/models/ToDeviceMessage"; +import { SyncState } from "../../src/sync"; + +describe("onResumedSync", () => { + let batch: IndexedToDeviceBatch | null; + let shouldFailSendToDevice: Boolean; + let onSendToDeviceFailure: () => void; + let onSendToDeviceSuccess: () => void; + let resumeSync: (newState: SyncState, oldState: SyncState) => void; + + let store: Store; + let mockClient: MatrixClient; + let queue: ToDeviceMessageQueue; + + beforeEach(() => { + batch = { + id: 0, + txnId: "123", + eventType: "m.dummy", + batch: [], + }; + + shouldFailSendToDevice = true; + onSendToDeviceFailure = () => {}; + onSendToDeviceSuccess = () => {}; + resumeSync = (newState, oldState) => { + shouldFailSendToDevice = false; + mockClient.emit(ClientEvent.Sync, newState, oldState); + }; + + store = new StubStore(); + store.getOldestToDeviceBatch = jest.fn().mockImplementation(() => { + return batch; + }); + store.removeToDeviceBatch = jest.fn().mockImplementation(() => { + batch = null; + }); + + mockClient = getMockClientWithEventEmitter({}); + mockClient.store = store; + mockClient.sendToDevice = jest.fn().mockImplementation(async () => { + if (shouldFailSendToDevice) { + await Promise.reject(new ConnectionError("")).finally(() => { + setTimeout(onSendToDeviceFailure, 0); + }); + } else { + await Promise.resolve({}).finally(() => { + setTimeout(onSendToDeviceSuccess, 0); + }); + } + }); + + queue = new ToDeviceMessageQueue(mockClient); + }); + + it("resends queue after connectivity restored", (done) => { + onSendToDeviceFailure = () => { + expect(store.getOldestToDeviceBatch).toHaveBeenCalledTimes(1); + expect(store.removeToDeviceBatch).not.toHaveBeenCalled(); + + resumeSync(SyncState.Syncing, SyncState.Catchup); + expect(store.getOldestToDeviceBatch).toHaveBeenCalledTimes(2); + }; + + onSendToDeviceSuccess = () => { + expect(store.getOldestToDeviceBatch).toHaveBeenCalledTimes(3); + expect(store.removeToDeviceBatch).toHaveBeenCalled(); + done(); + }; + + queue.start(); + }); + + it("does not resend queue if client sync still catching up", (done) => { + onSendToDeviceFailure = () => { + expect(store.getOldestToDeviceBatch).toHaveBeenCalledTimes(1); + expect(store.removeToDeviceBatch).not.toHaveBeenCalled(); + + resumeSync(SyncState.Catchup, SyncState.Catchup); + expect(store.getOldestToDeviceBatch).toHaveBeenCalledTimes(1); + done(); + }; + + queue.start(); + }); + + it("does not resend queue if connectivity restored after queue stopped", (done) => { + onSendToDeviceFailure = () => { + expect(store.getOldestToDeviceBatch).toHaveBeenCalledTimes(1); + expect(store.removeToDeviceBatch).not.toHaveBeenCalled(); + + queue.stop(); + + resumeSync(SyncState.Syncing, SyncState.Catchup); + expect(store.getOldestToDeviceBatch).toHaveBeenCalledTimes(1); + done(); + }; + + queue.start(); + }); +}); diff --git a/src/ToDeviceMessageQueue.ts b/src/ToDeviceMessageQueue.ts index 986eb5a9f..ec5922bb6 100644 --- a/src/ToDeviceMessageQueue.ts +++ b/src/ToDeviceMessageQueue.ts @@ -16,7 +16,8 @@ limitations under the License. import { ToDeviceMessageId } from "./@types/event"; import { logger } from "./logger"; -import { MatrixError, MatrixClient, ClientEvent } from "./matrix"; +import { MatrixClient, ClientEvent } from "./client"; +import { MatrixError } from "./http-api"; import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage"; import { MatrixScheduler } from "./scheduler"; import { SyncState } from "./sync";