1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-08-07 23:02:56 +03:00

Fix members being loaded from server on initial sync (defeating lazy loading) (#3830)

* fix members loaded on intitial sync

* Update test to use KeyResponder

* Use E2EKeyResponder

* code review

* better comment

* fix test

* post merge fix

* fix imports

* refactoring, better names

* code review

* clean tests

* Cleanups per review comments

* fix test

* Apply suggestions from code review

---------

Co-authored-by: Richard van der Hoff <richard@matrix.org>
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
This commit is contained in:
Valere
2023-11-03 15:55:48 +01:00
committed by GitHub
parent 5bc132a24c
commit 7921fee164
6 changed files with 478 additions and 150 deletions

View File

@@ -692,7 +692,13 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string,
});
it("prepareToEncrypt", async () => {
expectAliceKeyQuery({ device_keys: { "@alice:localhost": {} }, failures: {} });
const homeserverUrl = aliceClient.getHomeserverUrl();
keyResponder = new E2EKeyResponder(homeserverUrl);
keyResponder.addKeyReceiver("@alice:localhost", keyReceiver);
const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID");
keyResponder.addDeviceKeys(testDeviceKeys);
await startClientAndAwaitFirstSync();
aliceClient.setGlobalErrorOnUnknownDevices(false);
@@ -700,10 +706,7 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string,
syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"]));
await syncPromise(aliceClient);
// we expect alice first to query bob's keys...
expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz"));
// ... and then claim one of his OTKs
// Alice should claim one of Bob's OTKs
expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz"));
// fire off the prepare request
@@ -720,18 +723,20 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string,
it("Alice sends a megolm message with GlobalErrorOnUnknownDevices=false", async () => {
aliceClient.setGlobalErrorOnUnknownDevices(false);
expectAliceKeyQuery({ device_keys: { "@alice:localhost": {} }, failures: {} });
const homeserverUrl = aliceClient.getHomeserverUrl();
keyResponder = new E2EKeyResponder(homeserverUrl);
keyResponder.addKeyReceiver("@alice:localhost", keyReceiver);
const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID");
keyResponder.addDeviceKeys(testDeviceKeys);
await startClientAndAwaitFirstSync();
// Alice shares a room with Bob
syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"]));
await syncPromise(aliceClient);
// Once we send the message, Alice will check Bob's device list (twice, because reasons) ...
expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz"));
expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz"));
// ... and claim one of his OTKs ...
// ... and claim one of Bob's OTKs ...
expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz"));
// ... and send an m.room_key message
@@ -746,18 +751,20 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string,
it("We should start a new megolm session after forceDiscardSession", async () => {
aliceClient.setGlobalErrorOnUnknownDevices(false);
expectAliceKeyQuery({ device_keys: { "@alice:localhost": {} }, failures: {} });
const homeserverUrl = aliceClient.getHomeserverUrl();
keyResponder = new E2EKeyResponder(homeserverUrl);
keyResponder.addKeyReceiver("@alice:localhost", keyReceiver);
const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID");
keyResponder.addDeviceKeys(testDeviceKeys);
await startClientAndAwaitFirstSync();
// Alice shares a room with Bob
syncResponder.sendOrQueueSyncResponse(getSyncResponse(["@bob:xyz"]));
await syncPromise(aliceClient);
// Once we send the message, Alice will check Bob's device list (twice, because reasons) ...
expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz"));
expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz"));
// ... and claim one of his OTKs ...
// ... and claim one of Bob's OTKs ...
expectAliceKeyClaim(getTestKeysClaimResponse("@bob:xyz"));
// ... and send an m.room_key message
@@ -2052,13 +2059,17 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string,
});
}
oldBackendOnly("Sending an event initiates a member list sync", async () => {
it("Sending an event initiates a member list sync", async () => {
const homeserverUrl = aliceClient.getHomeserverUrl();
keyResponder = new E2EKeyResponder(homeserverUrl);
keyResponder.addKeyReceiver("@alice:localhost", keyReceiver);
const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID");
keyResponder.addDeviceKeys(testDeviceKeys);
// we expect a call to the /members list...
const memberListPromise = expectMembershipRequest(ROOM_ID, ["@bob:xyz"]);
// then a request for bob's devices...
expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz"));
// then a to-device with the room_key
const inboundGroupSessionPromise = expectSendRoomKey("@bob:xyz", testOlmAccount, p2pSession);
@@ -2071,13 +2082,17 @@ describe.each(Object.entries(CRYPTO_BACKENDS))("crypto (%s)", (backend: string,
await Promise.all([sendPromise, megolmMessagePromise, memberListPromise]);
});
oldBackendOnly("loading the membership list inhibits a later load", async () => {
it("loading the membership list inhibits a later load", async () => {
const homeserverUrl = aliceClient.getHomeserverUrl();
keyResponder = new E2EKeyResponder(homeserverUrl);
keyResponder.addKeyReceiver("@alice:localhost", keyReceiver);
const testDeviceKeys = getTestOlmAccountKeys(testOlmAccount, "@bob:xyz", "DEVICE_ID");
keyResponder.addDeviceKeys(testDeviceKeys);
const room = aliceClient.getRoom(ROOM_ID)!;
await Promise.all([room.loadMembersIfNeeded(), expectMembershipRequest(ROOM_ID, ["@bob:xyz"])]);
// expect a request for bob's devices...
expectAliceKeyQuery(getTestKeysQueryResponse("@bob:xyz"));
// then a to-device with the room_key
const inboundGroupSessionPromise = expectSendRoomKey("@bob:xyz", testOlmAccount, p2pSession);

View File

@@ -0,0 +1,237 @@
/*
Copyright 2023 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { Mocked } from "jest-mock";
import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm";
import { OutgoingRequest, OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingRequestProcessor";
import { OutgoingRequestsManager } from "../../../src/rust-crypto/OutgoingRequestsManager";
import { defer, IDeferred } from "../../../src/utils";
import { logger } from "../../../src/logger";
describe("OutgoingRequestsManager", () => {
/** the OutgoingRequestsManager implementation under test */
let manager: OutgoingRequestsManager;
/** a mock OutgoingRequestProcessor */
let processor: Mocked<OutgoingRequestProcessor>;
/** a mocked-up OlmMachine which manager is connected to */
let olmMachine: Mocked<RustSdkCryptoJs.OlmMachine>;
beforeEach(async () => {
olmMachine = {
outgoingRequests: jest.fn(),
} as unknown as Mocked<RustSdkCryptoJs.OlmMachine>;
processor = {
makeOutgoingRequest: jest.fn(),
} as unknown as Mocked<OutgoingRequestProcessor>;
manager = new OutgoingRequestsManager(logger, olmMachine, processor);
});
describe("Call doProcessOutgoingRequests", () => {
it("The call triggers handling of the machine outgoing requests", async () => {
const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}");
const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}");
olmMachine.outgoingRequests.mockImplementationOnce(async () => {
return [request1, request2];
});
processor.makeOutgoingRequest.mockImplementationOnce(async () => {
return;
});
await manager.doProcessOutgoingRequests();
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1);
expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(2);
expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request1);
expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2);
});
it("Stack and batch calls to doProcessOutgoingRequests while one is already running", async () => {
const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}");
const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}");
const request3 = new RustSdkCryptoJs.KeysBackupRequest("foo3", "{}", "1");
const firstOutgoingRequestDefer = defer<OutgoingRequest[]>();
olmMachine.outgoingRequests
.mockImplementationOnce(async (): Promise<OutgoingRequest[]> => {
return firstOutgoingRequestDefer.promise;
})
.mockImplementationOnce(async () => {
return [request3];
});
const firstRequest = manager.doProcessOutgoingRequests();
// stack 2 additional requests while the first one is still running
const secondRequest = manager.doProcessOutgoingRequests();
const thirdRequest = manager.doProcessOutgoingRequests();
// let the first request complete
firstOutgoingRequestDefer.resolve([request1, request2]);
await firstRequest;
await secondRequest;
await thirdRequest;
// outgoingRequests should be called twice in total, as the second and third requests are
// processed in the same loop.
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2);
expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(3);
expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request1);
expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2);
expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request3);
});
it("Process 3 consecutive calls to doProcessOutgoingRequests while not blocking previous ones", async () => {
const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}");
const request2 = new RustSdkCryptoJs.KeysUploadRequest("foo2", "{}");
const request3 = new RustSdkCryptoJs.KeysBackupRequest("foo3", "{}", "1");
// promises which will resolve when OlmMachine.outgoingRequests is called
const outgoingRequestCalledPromises: Promise<void>[] = [];
// deferreds which will provide the results of OlmMachine.outgoingRequests
const outgoingRequestResultDeferreds: IDeferred<OutgoingRequest[]>[] = [];
for (let i = 0; i < 3; i++) {
const resultDeferred = defer<OutgoingRequest[]>();
const calledPromise = new Promise<void>((resolve) => {
olmMachine.outgoingRequests.mockImplementationOnce(() => {
resolve();
return resultDeferred.promise;
});
});
outgoingRequestCalledPromises.push(calledPromise);
outgoingRequestResultDeferreds.push(resultDeferred);
}
const call1 = manager.doProcessOutgoingRequests();
// First call will start an iteration and for now is awaiting on outgoingRequests
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1);
// Make a new call now: this will request a new iteration
const call2 = manager.doProcessOutgoingRequests();
// let the first iteration complete
outgoingRequestResultDeferreds[0].resolve([request1]);
// The first call should now complete
await call1;
expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(1);
expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request1);
// Wait for the second iteration to fire and be waiting on `outgoingRequests`
await outgoingRequestCalledPromises[1];
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2);
// Stack a new call that should be processed in an additional iteration.
const call3 = manager.doProcessOutgoingRequests();
outgoingRequestResultDeferreds[1].resolve([request2]);
await call2;
expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(2);
expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request2);
// Wait for the third iteration to fire and be waiting on `outgoingRequests`
await outgoingRequestCalledPromises[2];
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(3);
outgoingRequestResultDeferreds[2].resolve([request3]);
await call3;
expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(3);
expect(processor.makeOutgoingRequest).toHaveBeenCalledWith(request3);
// ensure that no other iteration is going on
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(3);
});
it("Should not bubble exceptions if server request is rejected", async () => {
const request = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}");
olmMachine.outgoingRequests.mockImplementationOnce(async () => {
return [request];
});
processor.makeOutgoingRequest.mockImplementationOnce(async () => {
throw new Error("Some network error");
});
await manager.doProcessOutgoingRequests();
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(1);
});
});
describe("Calling stop on the manager should stop ongoing work", () => {
it("When the manager is stopped after outgoingRequests() call, do not make sever requests", async () => {
const request1 = new RustSdkCryptoJs.KeysQueryRequest("foo", "{}");
const firstOutgoingRequestDefer = defer<OutgoingRequest[]>();
olmMachine.outgoingRequests.mockImplementationOnce(async (): Promise<OutgoingRequest[]> => {
return firstOutgoingRequestDefer.promise;
});
const firstRequest = manager.doProcessOutgoingRequests();
// stop
manager.stop();
// let the first request complete
firstOutgoingRequestDefer.resolve([request1]);
await firstRequest;
expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(0);
});
it("When the manager is stopped while doing server calls, it should stop before the next sever call", async () => {
const request1 = new RustSdkCryptoJs.KeysQueryRequest("11", "{}");
const request2 = new RustSdkCryptoJs.KeysUploadRequest("12", "{}");
const firstRequestDefer = defer<void>();
olmMachine.outgoingRequests.mockImplementationOnce(async (): Promise<OutgoingRequest[]> => {
return [request1, request2];
});
processor.makeOutgoingRequest
.mockImplementationOnce(async () => {
manager.stop();
return firstRequestDefer.promise;
})
.mockImplementationOnce(async () => {
return;
});
const firstRequest = manager.doProcessOutgoingRequests();
firstRequestDefer.resolve();
await firstRequest;
// should have been called once but not twice
expect(processor.makeOutgoingRequest).toHaveBeenCalledTimes(1);
});
});
});

View File

@@ -50,6 +50,7 @@ import {
import * as testData from "../../test-utils/test-data";
import { defer } from "../../../src/utils";
import { logger } from "../../../src/logger";
import { OutgoingRequestsManager } from "../../../src/rust-crypto/OutgoingRequestsManager";
const TEST_USER = "@alice:example.com";
const TEST_DEVICE_ID = "TEST_DEVICE";
@@ -347,6 +348,8 @@ describe("RustCrypto", () => {
makeOutgoingRequest: jest.fn(),
} as unknown as Mocked<OutgoingRequestProcessor>;
const outgoingRequestsManager = new OutgoingRequestsManager(logger, olmMachine, outgoingRequestProcessor);
rustCrypto = new RustCrypto(
logger,
olmMachine,
@@ -357,6 +360,7 @@ describe("RustCrypto", () => {
{} as CryptoCallbacks,
);
rustCrypto["outgoingRequestProcessor"] = outgoingRequestProcessor;
rustCrypto["outgoingRequestsManager"] = outgoingRequestsManager;
});
it("should poll for outgoing messages and send them", async () => {
@@ -395,50 +399,6 @@ describe("RustCrypto", () => {
await awaitCallToMakeOutgoingRequest();
expect(olmMachine.outgoingRequests).toHaveBeenCalledTimes(2);
});
it("stops looping when stop() is called", async () => {
for (let i = 0; i < 5; i++) {
outgoingRequestQueue.push([new KeysQueryRequest("1234", "{}")]);
}
let makeRequestPromise = awaitCallToMakeOutgoingRequest();
rustCrypto.onSyncCompleted({});
expect(rustCrypto["outgoingRequestLoopRunning"]).toBeTruthy();
// go a couple of times round the loop
let resolveMakeRequest = await makeRequestPromise;
makeRequestPromise = awaitCallToMakeOutgoingRequest();
resolveMakeRequest();
resolveMakeRequest = await makeRequestPromise;
makeRequestPromise = awaitCallToMakeOutgoingRequest();
resolveMakeRequest();
// a second sync while this is going on shouldn't make any difference
rustCrypto.onSyncCompleted({});
resolveMakeRequest = await makeRequestPromise;
outgoingRequestProcessor.makeOutgoingRequest.mockReset();
resolveMakeRequest();
// now stop...
rustCrypto.stop();
// which should (eventually) cause the loop to stop with no further calls to outgoingRequests
olmMachine.outgoingRequests.mockReset();
await new Promise((resolve) => {
setTimeout(resolve, 100);
});
expect(rustCrypto["outgoingRequestLoopRunning"]).toBeFalsy();
expect(outgoingRequestProcessor.makeOutgoingRequest).not.toHaveBeenCalled();
expect(olmMachine.outgoingRequests).not.toHaveBeenCalled();
// we sent three, so there should be 2 left
expect(outgoingRequestQueue.length).toEqual(2);
});
});
describe(".getEventEncryptionInfo", () => {

View File

@@ -0,0 +1,141 @@
/*
Copyright 2023 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { OlmMachine } from "@matrix-org/matrix-sdk-crypto-wasm";
import { OutgoingRequest, OutgoingRequestProcessor } from "./OutgoingRequestProcessor";
import { Logger } from "../logger";
import { defer, IDeferred } from "../utils";
/**
* OutgoingRequestsManager: responsible for processing outgoing requests from the OlmMachine.
* Ensure that only one loop is going on at once, and that the requests are processed in order.
*/
export class OutgoingRequestsManager {
/** whether {@link stop} has been called */
private stopped = false;
/** whether {@link outgoingRequestLoop} is currently running */
private outgoingRequestLoopRunning = false;
/**
* If there are additional calls to doProcessOutgoingRequests() while there is a current call running
* we need to remember in order to call `doProcessOutgoingRequests` again (as there could be new requests).
*
* If this is defined, it is an indication that we need to do another iteration; in this case the deferred
* will resolve once that next iteration completes. If it is undefined, there have been no new calls
* to `doProcessOutgoingRequests` since the current iteration started.
*/
private nextLoopDeferred?: IDeferred<void>;
public constructor(
private readonly logger: Logger,
private readonly olmMachine: OlmMachine,
public readonly outgoingRequestProcessor: OutgoingRequestProcessor,
) {}
/**
* Shut down as soon as possible the current loop of outgoing requests processing.
*/
public stop(): void {
this.stopped = true;
}
/**
* Process the OutgoingRequests from the OlmMachine.
*
* This should be called at the end of each sync, to process any OlmMachine OutgoingRequests created by the rust sdk.
* In some cases if OutgoingRequests need to be sent immediately, this can be called directly.
*
* Calls to doProcessOutgoingRequests() are processed synchronously, one after the other, in order.
* If doProcessOutgoingRequests() is called while another call is still being processed, it will be queued.
* Multiple calls to doProcessOutgoingRequests() when a call is already processing will be batched together.
*/
public doProcessOutgoingRequests(): Promise<void> {
// Flag that we need at least one more iteration of the loop.
//
// It is important that we do this even if the loop is currently running. There is potential for a race whereby
// a request is added to the queue *after* `OlmMachine.outgoingRequests` checks the queue, but *before* it
// returns. In such a case, the item could sit there unnoticed for some time.
//
// In order to circumvent the race, we set a flag which tells the loop to go round once again even if the
// queue appears to be empty.
if (!this.nextLoopDeferred) {
this.nextLoopDeferred = defer();
}
// ... and wait for it to complete.
const result = this.nextLoopDeferred.promise;
// set the loop going if it is not already.
if (!this.outgoingRequestLoopRunning) {
this.outgoingRequestLoop().catch((e) => {
// this should not happen; outgoingRequestLoop should return any errors via `nextLoopDeferred`.
/* istanbul ignore next */
this.logger.error("Uncaught error in outgoing request loop", e);
});
}
return result;
}
private async outgoingRequestLoop(): Promise<void> {
/* istanbul ignore if */
if (this.outgoingRequestLoopRunning) {
throw new Error("Cannot run two outgoing request loops");
}
this.outgoingRequestLoopRunning = true;
try {
while (!this.stopped && this.nextLoopDeferred) {
const deferred = this.nextLoopDeferred;
// reset `nextLoopDeferred` so that any future calls to `doProcessOutgoingRequests` are queued
// for another additional iteration.
this.nextLoopDeferred = undefined;
// make the requests and feed the results back to the `nextLoopDeferred`
await this.processOutgoingRequests().then(deferred.resolve, deferred.reject);
}
} finally {
this.outgoingRequestLoopRunning = false;
}
if (this.nextLoopDeferred) {
// the loop was stopped, but there was a call to `doProcessOutgoingRequests`. Make sure that
// we reject the promise in case anything is waiting for it.
this.nextLoopDeferred.reject(new Error("OutgoingRequestsManager was stopped"));
}
}
/**
* Make a single request to `olmMachine.outgoingRequests` and do the corresponding requests.
*/
private async processOutgoingRequests(): Promise<void> {
if (this.stopped) return;
const outgoingRequests: OutgoingRequest[] = await this.olmMachine.outgoingRequests();
for (const request of outgoingRequests) {
if (this.stopped) return;
try {
await this.outgoingRequestProcessor.makeOutgoingRequest(request);
} catch (e) {
// as part of the loop we silently ignore errors, but log them.
// The rust sdk will retry the request later as it won't have been marked as sent.
this.logger.error(`Failed to process outgoing request ${request.type}: ${e}`);
}
}
}
}

View File

@@ -23,6 +23,7 @@ import {
HistoryVisibility as RustHistoryVisibility,
ToDeviceRequest,
} from "@matrix-org/matrix-sdk-crypto-wasm";
import * as RustSdkCryptoJs from "@matrix-org/matrix-sdk-crypto-wasm";
import { EventType } from "../@types/event";
import { IContent, MatrixEvent } from "../models/event";
@@ -30,8 +31,8 @@ import { Room } from "../models/room";
import { Logger, logger } from "../logger";
import { KeyClaimManager } from "./KeyClaimManager";
import { RoomMember } from "../models/room-member";
import { OutgoingRequestProcessor } from "./OutgoingRequestProcessor";
import { HistoryVisibility } from "../@types/partials";
import { OutgoingRequestsManager } from "./OutgoingRequestsManager";
/**
* RoomEncryptor: responsible for encrypting messages to a given room
@@ -41,21 +42,34 @@ import { HistoryVisibility } from "../@types/partials";
export class RoomEncryptor {
private readonly prefixedLogger: Logger;
/** whether the room members have been loaded and tracked for the first time */
private lazyLoadedMembersResolved = false;
/**
* @param olmMachine - The rust-sdk's OlmMachine
* @param keyClaimManager - Our KeyClaimManager, which manages the queue of one-time-key claim requests
* @param outgoingRequestProcessor - The OutgoingRequestProcessor, which sends outgoing requests
* @param outgoingRequestManager - The OutgoingRequestManager, which manages the queue of outgoing requests.
* @param room - The room we want to encrypt for
* @param encryptionSettings - body of the m.room.encryption event currently in force in this room
*/
public constructor(
private readonly olmMachine: OlmMachine,
private readonly keyClaimManager: KeyClaimManager,
private readonly outgoingRequestProcessor: OutgoingRequestProcessor,
private readonly outgoingRequestManager: OutgoingRequestsManager,
private readonly room: Room,
private encryptionSettings: IContent,
) {
this.prefixedLogger = logger.getChild(`[${room.roomId} encryption]`);
// start tracking devices for any users already known to be in this room.
// Do not load members here, would defeat lazy loading.
const members = room.getJoinedMembers();
// At this point just mark the known members as tracked, it might not be the full list of members
// because of lazy loading. This is fine, because we will get a member list update when sending a message for
// the first time, see `RoomEncryptor#ensureEncryptionSession`
this.olmMachine.updateTrackedUsers(members.map((u) => new RustSdkCryptoJs.UserId(u.userId))).then(() => {
this.prefixedLogger.debug(`Updated tracked users for room ${room.roomId}`);
});
}
/**
@@ -104,6 +118,28 @@ export class RoomEncryptor {
}
const members = await this.room.getEncryptionTargetMembers();
// If this is the first time we are sending a message to the room, we may not yet have seen all the members
// (so the Crypto SDK might not have a device list for them). So, if this is the first time we are encrypting
// for this room, give the SDK the full list of members, to be on the safe side.
//
// This could end up being racy (if two calls to ensureEncryptionSession happen at the same time), but that's
// not a particular problem, since `OlmMachine.updateTrackedUsers` just adds any users that weren't already tracked.
if (!this.lazyLoadedMembersResolved) {
await this.olmMachine.updateTrackedUsers(members.map((u) => new RustSdkCryptoJs.UserId(u.userId)));
this.lazyLoadedMembersResolved = true;
this.prefixedLogger.debug(`Updated tracked users for room ${this.room.roomId}`);
}
// Query keys in case we don't have them for newly tracked members.
// This must be done before ensuring sessions. If not the devices of these users are not
// known yet and will not get the room key.
// We don't have API to only get the keys queries related to this member list, so we just
// process the pending requests from the olmMachine. (usually these are processed
// at the end of the sync, but we can't wait for that).
// XXX future improvement process only KeysQueryRequests for the tracked users.
await this.outgoingRequestManager.doProcessOutgoingRequests();
this.prefixedLogger.debug(
`Encrypting for users (shouldEncryptForInvitedMembers: ${this.room.shouldEncryptForInvitedMembers()}):`,
members.map((u) => `${u.userId} (${u.membership})`),
@@ -143,7 +179,7 @@ export class RoomEncryptor {
);
if (shareMessages) {
for (const m of shareMessages) {
await this.outgoingRequestProcessor.makeOutgoingRequest(m);
await this.outgoingRequestManager.outgoingRequestProcessor.makeOutgoingRequest(m);
}
}
}

View File

@@ -27,7 +27,7 @@ import { BackupDecryptor, CryptoBackend, OnSyncCompletedData } from "../common-c
import { Logger } from "../logger";
import { ClientPrefix, IHttpOpts, MatrixHttpApi, Method } from "../http-api";
import { RoomEncryptor } from "./RoomEncryptor";
import { OutgoingRequest, OutgoingRequestProcessor } from "./OutgoingRequestProcessor";
import { OutgoingRequestProcessor } from "./OutgoingRequestProcessor";
import { KeyClaimManager } from "./KeyClaimManager";
import { encodeUri, MapWithDefault } from "../utils";
import {
@@ -72,6 +72,7 @@ import { ClientStoppedError } from "../errors";
import { ISignatures } from "../@types/signed";
import { encodeBase64 } from "../base64";
import { DecryptionError } from "../crypto/algorithms";
import { OutgoingRequestsManager } from "./OutgoingRequestsManager";
const ALL_VERIFICATION_METHODS = ["m.sas.v1", "m.qr_code.scan.v1", "m.qr_code.show.v1", "m.reciprocate.v1"];
@@ -93,16 +94,6 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
/** whether {@link stop} has been called */
private stopped = false;
/** whether {@link outgoingRequestLoop} is currently running */
private outgoingRequestLoopRunning = false;
/**
* whether we check the outgoing requests queue again after the current check finishes.
*
* This should never be `true` unless `outgoingRequestLoopRunning` is also true.
*/
private outgoingRequestLoopOneMoreLoop = false;
/** mapping of roomId → encryptor class */
private roomEncryptors: Record<string, RoomEncryptor> = {};
@@ -111,6 +102,7 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
private outgoingRequestProcessor: OutgoingRequestProcessor;
private crossSigningIdentity: CrossSigningIdentity;
private readonly backupManager: RustBackupManager;
private outgoingRequestsManager: OutgoingRequestsManager;
private sessionLastCheckAttemptedTime: Record<string, number> = {}; // When did we last try to check the server for a given session id?
@@ -143,6 +135,12 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
) {
super();
this.outgoingRequestProcessor = new OutgoingRequestProcessor(olmMachine, http);
this.outgoingRequestsManager = new OutgoingRequestsManager(
this.logger,
olmMachine,
this.outgoingRequestProcessor,
);
this.keyClaimManager = new KeyClaimManager(olmMachine, this.outgoingRequestProcessor);
this.eventDecryptor = new EventDecryptor(this.logger, olmMachine, this);
@@ -267,6 +265,7 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
this.keyClaimManager.stop();
this.backupManager.stop();
this.outgoingRequestsManager.stop();
// make sure we close() the OlmMachine; doing so means that all the Rust objects will be
// cleaned up; in particular, the indexeddb connections will be closed, which means they
@@ -1270,15 +1269,11 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
this.roomEncryptors[room.roomId] = new RoomEncryptor(
this.olmMachine,
this.keyClaimManager,
this.outgoingRequestProcessor,
this.outgoingRequestsManager,
room,
config,
);
}
// start tracking devices for any users already known to be in this room.
const members = await room.getEncryptionTargetMembers();
await this.olmMachine.updateTrackedUsers(members.map((u) => new RustSdkCryptoJs.UserId(u.userId)));
}
/** called by the sync loop after processing each sync.
@@ -1290,7 +1285,9 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
public onSyncCompleted(syncState: OnSyncCompletedData): void {
// Processing the /sync may have produced new outgoing requests which need sending, so kick off the outgoing
// request loop, if it's not already running.
this.outgoingRequestLoop();
this.outgoingRequestsManager.doProcessOutgoingRequests().catch((e) => {
this.logger.warn("onSyncCompleted: Error processing outgoing requests", e);
});
}
/**
@@ -1540,68 +1537,10 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, RustCryptoEv
}
// that may have caused us to queue up outgoing requests, so make sure we send them.
this.outgoingRequestLoop();
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// Outgoing requests
//
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/** start the outgoing request loop if it is not already running */
private outgoingRequestLoop(): void {
if (this.outgoingRequestLoopRunning) {
// The loop is already running, but we have reason to believe that there may be new items in the queue.
//
// There is potential for a race whereby the item is added *after* `OlmMachine.outgoingRequests` checks
// the queue, but *before* it returns. In such a case, the item could sit there unnoticed for some time.
//
// In order to circumvent the race, we set a flag which tells the loop to go round once again even if the
// queue appears to be empty.
this.outgoingRequestLoopOneMoreLoop = true;
return;
}
// fire off the loop in the background
this.outgoingRequestLoopInner().catch((e) => {
this.logger.error("Error processing outgoing-message requests from rust crypto-sdk", e);
this.outgoingRequestsManager.doProcessOutgoingRequests().catch((e) => {
this.logger.warn("onKeyVerificationRequest: Error processing outgoing requests", e);
});
}
private async outgoingRequestLoopInner(): Promise<void> {
/* istanbul ignore if */
if (this.outgoingRequestLoopRunning) {
throw new Error("Cannot run two outgoing request loops");
}
this.outgoingRequestLoopRunning = true;
try {
while (!this.stopped) {
// we clear the "one more loop" flag just before calling `OlmMachine.outgoingRequests()`, so we can tell
// if `this.outgoingRequestLoop()` was called while `OlmMachine.outgoingRequests()` was running.
this.outgoingRequestLoopOneMoreLoop = false;
const outgoingRequests: Object[] = await this.olmMachine.outgoingRequests();
if (this.stopped) {
// we've been told to stop while `outgoingRequests` was running: exit the loop without processing
// any of the returned requests (anything important will happen next time the client starts.)
return;
}
if (outgoingRequests.length === 0 && !this.outgoingRequestLoopOneMoreLoop) {
// `OlmMachine.outgoingRequests` returned no messages, and there was no call to
// `this.outgoingRequestLoop()` while it was running. We can stop the loop for a while.
return;
}
for (const msg of outgoingRequests) {
await this.outgoingRequestProcessor.makeOutgoingRequest(msg as OutgoingRequest);
}
}
} finally {
this.outgoingRequestLoopRunning = false;
}
}
}
class EventDecryptor {