You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-11-23 17:02:25 +03:00
Fix potential delay in sending out requests from the rust SDK (#3717)
* Emit a `UserTrustStatusChanged` when user identity is updated * Remove redundant `onCrossSigningKeysImport` callback This now happens as a side-effect of importing the keys. * bump to alpha release of matrix-rust-sdk-crypto-wasm * fixup! Remove redundant `onCrossSigningKeysImport` callback * Fix potential delay in sending out requests from the rust SDK There was a potential race which could cause us to be very slow to send out pending HTTP requests, particularly when handling a user verification. Add some resiliece to make sure we handle it correctly. * add comments * Add a unit test --------- Co-authored-by: Andy Balaam <andy.balaam@matrix.org>
This commit is contained in:
committed by
GitHub
parent
c9435af637
commit
5e542b3869
@@ -16,7 +16,7 @@ limitations under the License.
|
|||||||
|
|
||||||
import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm";
|
import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm";
|
||||||
import { KeysQueryRequest, OlmMachine } from "@matrix-org/matrix-sdk-crypto-wasm";
|
import { KeysQueryRequest, OlmMachine } from "@matrix-org/matrix-sdk-crypto-wasm";
|
||||||
import { Mocked } from "jest-mock";
|
import { mocked, Mocked } from "jest-mock";
|
||||||
import fetchMock from "fetch-mock-jest";
|
import fetchMock from "fetch-mock-jest";
|
||||||
|
|
||||||
import { RustCrypto } from "../../../src/rust-crypto/rust-crypto";
|
import { RustCrypto } from "../../../src/rust-crypto/rust-crypto";
|
||||||
@@ -40,6 +40,7 @@ import { OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingReque
|
|||||||
import { ServerSideSecretStorage } from "../../../src/secret-storage";
|
import { ServerSideSecretStorage } from "../../../src/secret-storage";
|
||||||
import { CryptoCallbacks, ImportRoomKeysOpts, VerificationRequest } from "../../../src/crypto-api";
|
import { CryptoCallbacks, ImportRoomKeysOpts, VerificationRequest } from "../../../src/crypto-api";
|
||||||
import * as testData from "../../test-utils/test-data";
|
import * as testData from "../../test-utils/test-data";
|
||||||
|
import { defer } from "../../../src/utils";
|
||||||
|
|
||||||
const TEST_USER = "@alice:example.com";
|
const TEST_USER = "@alice:example.com";
|
||||||
const TEST_DEVICE_ID = "TEST_DEVICE";
|
const TEST_DEVICE_ID = "TEST_DEVICE";
|
||||||
@@ -274,6 +275,31 @@ describe("RustCrypto", () => {
|
|||||||
expect(outgoingRequestProcessor.makeOutgoingRequest).toHaveBeenCalledWith(testReq);
|
expect(outgoingRequestProcessor.makeOutgoingRequest).toHaveBeenCalledWith(testReq);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("should go round the loop again if another sync completes while the first `outgoingRequests` is running", async () => {
|
||||||
|
// the first call to `outgoingMessages` will return a promise which blocks for a while
|
||||||
|
const firstOutgoingRequestsDefer = defer<Array<any>>();
|
||||||
|
mocked(olmMachine.outgoingRequests).mockReturnValueOnce(firstOutgoingRequestsDefer.promise);
|
||||||
|
|
||||||
|
// the second will return a KeysQueryRequest.
|
||||||
|
const testReq = new KeysQueryRequest("1234", "{}");
|
||||||
|
outgoingRequestQueue.push([testReq]);
|
||||||
|
|
||||||
|
// the first sync completes, triggering the first call to `outgoingMessages`
|
||||||
|
rustCrypto.onSyncCompleted({});
|
||||||
|
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// a second /sync completes before the first call to `outgoingRequests` completes. It shouldn't trigger
|
||||||
|
// a second call immediately, but should queue one up.
|
||||||
|
rustCrypto.onSyncCompleted({});
|
||||||
|
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// the first call now completes, *with an empty result*, which would normally cause us to exit the loop, but
|
||||||
|
// we should have a second call queued. It should trigger a call to `makeOutgoingRequest`.
|
||||||
|
firstOutgoingRequestsDefer.resolve([]);
|
||||||
|
await awaitCallToMakeOutgoingRequest();
|
||||||
|
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2);
|
||||||
|
});
|
||||||
|
|
||||||
it("stops looping when stop() is called", async () => {
|
it("stops looping when stop() is called", async () => {
|
||||||
for (let i = 0; i < 5; i++) {
|
for (let i = 0; i < 5; i++) {
|
||||||
outgoingRequestQueue.push([new KeysQueryRequest("1234", "{}")]);
|
outgoingRequestQueue.push([new KeysQueryRequest("1234", "{}")]);
|
||||||
|
|||||||
@@ -96,6 +96,13 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
|
|||||||
/** whether {@link outgoingRequestLoop} is currently running */
|
/** whether {@link outgoingRequestLoop} is currently running */
|
||||||
private outgoingRequestLoopRunning = false;
|
private outgoingRequestLoopRunning = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* whether we check the outgoing requests queue again after the current check finishes.
|
||||||
|
*
|
||||||
|
* This should never be `true` unless `outgoingRequestLoopRunning` is also true.
|
||||||
|
*/
|
||||||
|
private outgoingRequestLoopOneMoreLoop = false;
|
||||||
|
|
||||||
/** mapping of roomId → encryptor class */
|
/** mapping of roomId → encryptor class */
|
||||||
private roomEncryptors: Record<string, RoomEncryptor> = {};
|
private roomEncryptors: Record<string, RoomEncryptor> = {};
|
||||||
|
|
||||||
@@ -1375,6 +1382,8 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
|
|||||||
throw new Error("missing roomId in the event");
|
throw new Error("missing roomId in the event");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.debug(`Incoming verification event ${event.getId()} type ${event.getType()} from ${event.getSender()}`);
|
||||||
|
|
||||||
await this.olmMachine.receiveVerificationEvent(
|
await this.olmMachine.receiveVerificationEvent(
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
event_id: event.getId(),
|
event_id: event.getId(),
|
||||||
@@ -1386,6 +1395,9 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
|
|||||||
}),
|
}),
|
||||||
new RustSdkCryptoJs.RoomId(roomId),
|
new RustSdkCryptoJs.RoomId(roomId),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// that may have caused us to queue up outgoing requests, so make sure we send them.
|
||||||
|
this.outgoingRequestLoop();
|
||||||
}
|
}
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
@@ -1394,24 +1406,56 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
|
|||||||
//
|
//
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
private async outgoingRequestLoop(): Promise<void> {
|
/** start the outgoing request loop if it is not already running */
|
||||||
|
private outgoingRequestLoop(): void {
|
||||||
if (this.outgoingRequestLoopRunning) {
|
if (this.outgoingRequestLoopRunning) {
|
||||||
|
// The loop is already running, but we have reason to believe that there may be new items in the queue.
|
||||||
|
//
|
||||||
|
// There is potential for a race whereby the item is added *after* `OlmMachine.outgoingRequests` checks
|
||||||
|
// the queue, but *before* it returns. In such a case, the item could sit there unnoticed for some time.
|
||||||
|
//
|
||||||
|
// In order to circumvent the race, we set a flag which tells the loop to go round once again even if the
|
||||||
|
// queue appears to be empty.
|
||||||
|
this.outgoingRequestLoopOneMoreLoop = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// fire off the loop in the background
|
||||||
|
this.outgoingRequestLoopInner().catch((e) => {
|
||||||
|
logger.error("Error processing outgoing-message requests from rust crypto-sdk", e);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async outgoingRequestLoopInner(): Promise<void> {
|
||||||
|
/* istanbul ignore if */
|
||||||
|
if (this.outgoingRequestLoopRunning) {
|
||||||
|
throw new Error("Cannot run two outgoing request loops");
|
||||||
|
}
|
||||||
this.outgoingRequestLoopRunning = true;
|
this.outgoingRequestLoopRunning = true;
|
||||||
try {
|
try {
|
||||||
while (!this.stopped) {
|
while (!this.stopped) {
|
||||||
|
// we clear the "one more loop" flag just before calling `OlmMachine.outgoingRequests()`, so we can tell
|
||||||
|
// if `this.outgoingRequestLoop()` was called while `OlmMachine.outgoingRequests()` was running.
|
||||||
|
this.outgoingRequestLoopOneMoreLoop = false;
|
||||||
|
|
||||||
|
logger.debug("Calling OlmMachine.outgoingRequests()");
|
||||||
const outgoingRequests: Object[] = await this.olmMachine.outgoingRequests();
|
const outgoingRequests: Object[] = await this.olmMachine.outgoingRequests();
|
||||||
if (outgoingRequests.length == 0 || this.stopped) {
|
|
||||||
// no more messages to send (or we have been told to stop): exit the loop
|
if (this.stopped) {
|
||||||
|
// we've been told to stop while `outgoingRequests` was running: exit the loop without processing
|
||||||
|
// any of the returned requests (anything important will happen next time the client starts.)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (outgoingRequests.length === 0 && !this.outgoingRequestLoopOneMoreLoop) {
|
||||||
|
// `OlmMachine.outgoingRequests` returned no messages, and there was no call to
|
||||||
|
// `this.outgoingRequestLoop()` while it was running. We can stop the loop for a while.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
for (const msg of outgoingRequests) {
|
for (const msg of outgoingRequests) {
|
||||||
await this.outgoingRequestProcessor.makeOutgoingRequest(msg as OutgoingRequest);
|
await this.outgoingRequestProcessor.makeOutgoingRequest(msg as OutgoingRequest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (e) {
|
|
||||||
logger.error("Error processing outgoing-message requests from rust crypto-sdk", e);
|
|
||||||
} finally {
|
} finally {
|
||||||
this.outgoingRequestLoopRunning = false;
|
this.outgoingRequestLoopRunning = false;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user