diff --git a/spec/unit/rust-crypto/rust-crypto.spec.ts b/spec/unit/rust-crypto/rust-crypto.spec.ts index d0d2b67cf..b786a0864 100644 --- a/spec/unit/rust-crypto/rust-crypto.spec.ts +++ b/spec/unit/rust-crypto/rust-crypto.spec.ts @@ -16,7 +16,7 @@ limitations under the License. import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm"; import { KeysQueryRequest, OlmMachine } from "@matrix-org/matrix-sdk-crypto-wasm"; -import { Mocked } from "jest-mock"; +import { mocked, Mocked } from "jest-mock"; import fetchMock from "fetch-mock-jest"; import { RustCrypto } from "../../../src/rust-crypto/rust-crypto"; @@ -40,6 +40,7 @@ import { OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingReque import { ServerSideSecretStorage } from "../../../src/secret-storage"; import { CryptoCallbacks, ImportRoomKeysOpts, VerificationRequest } from "../../../src/crypto-api"; import * as testData from "../../test-utils/test-data"; +import { defer } from "../../../src/utils"; const TEST_USER = "@alice:example.com"; const TEST_DEVICE_ID = "TEST_DEVICE"; @@ -274,6 +275,31 @@ describe("RustCrypto", () => { expect(outgoingRequestProcessor.makeOutgoingRequest).toHaveBeenCalledWith(testReq); }); + it("should go round the loop again if another sync completes while the first `outgoingRequests` is running", async () => { + // the first call to `outgoingMessages` will return a promise which blocks for a while + const firstOutgoingRequestsDefer = defer>(); + mocked(olmMachine.outgoingRequests).mockReturnValueOnce(firstOutgoingRequestsDefer.promise); + + // the second will return a KeysQueryRequest. + const testReq = new KeysQueryRequest("1234", "{}"); + outgoingRequestQueue.push([testReq]); + + // the first sync completes, triggering the first call to `outgoingMessages` + rustCrypto.onSyncCompleted({}); + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1); + + // a second /sync completes before the first call to `outgoingRequests` completes. It shouldn't trigger + // a second call immediately, but should queue one up. + rustCrypto.onSyncCompleted({}); + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1); + + // the first call now completes, *with an empty result*, which would normally cause us to exit the loop, but + // we should have a second call queued. It should trigger a call to `makeOutgoingRequest`. + firstOutgoingRequestsDefer.resolve([]); + await awaitCallToMakeOutgoingRequest(); + expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2); + }); + it("stops looping when stop() is called", async () => { for (let i = 0; i < 5; i++) { outgoingRequestQueue.push([new KeysQueryRequest("1234", "{}")]); diff --git a/src/rust-crypto/rust-crypto.ts b/src/rust-crypto/rust-crypto.ts index 7bf266148..c6ff61e78 100644 --- a/src/rust-crypto/rust-crypto.ts +++ b/src/rust-crypto/rust-crypto.ts @@ -96,6 +96,13 @@ export class RustCrypto extends TypedEventEmitter = {}; @@ -1375,6 +1382,8 @@ export class RustCrypto extends TypedEventEmitter { + /** start the outgoing request loop if it is not already running */ + private outgoingRequestLoop(): void { if (this.outgoingRequestLoopRunning) { + // The loop is already running, but we have reason to believe that there may be new items in the queue. + // + // There is potential for a race whereby the item is added *after* `OlmMachine.outgoingRequests` checks + // the queue, but *before* it returns. In such a case, the item could sit there unnoticed for some time. + // + // In order to circumvent the race, we set a flag which tells the loop to go round once again even if the + // queue appears to be empty. + this.outgoingRequestLoopOneMoreLoop = true; return; } + // fire off the loop in the background + this.outgoingRequestLoopInner().catch((e) => { + logger.error("Error processing outgoing-message requests from rust crypto-sdk", e); + }); + } + + private async outgoingRequestLoopInner(): Promise { + /* istanbul ignore if */ + if (this.outgoingRequestLoopRunning) { + throw new Error("Cannot run two outgoing request loops"); + } this.outgoingRequestLoopRunning = true; try { while (!this.stopped) { + // we clear the "one more loop" flag just before calling `OlmMachine.outgoingRequests()`, so we can tell + // if `this.outgoingRequestLoop()` was called while `OlmMachine.outgoingRequests()` was running. + this.outgoingRequestLoopOneMoreLoop = false; + + logger.debug("Calling OlmMachine.outgoingRequests()"); const outgoingRequests: Object[] = await this.olmMachine.outgoingRequests(); - if (outgoingRequests.length == 0 || this.stopped) { - // no more messages to send (or we have been told to stop): exit the loop + + if (this.stopped) { + // we've been told to stop while `outgoingRequests` was running: exit the loop without processing + // any of the returned requests (anything important will happen next time the client starts.) return; } + + if (outgoingRequests.length === 0 && !this.outgoingRequestLoopOneMoreLoop) { + // `OlmMachine.outgoingRequests` returned no messages, and there was no call to + // `this.outgoingRequestLoop()` while it was running. We can stop the loop for a while. + return; + } + for (const msg of outgoingRequests) { await this.outgoingRequestProcessor.makeOutgoingRequest(msg as OutgoingRequest); } } - } catch (e) { - logger.error("Error processing outgoing-message requests from rust crypto-sdk", e); } finally { this.outgoingRequestLoopRunning = false; }