diff --git a/CHANGELOG.md b/CHANGELOG.md index 065109f51..12ebda0ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,18 @@ +Changes in [19.2.0](https://github.com/matrix-org/matrix-js-sdk/releases/tag/v19.2.0) (2022-08-02) +================================================================================================== + +## 🦖 Deprecations + * Remove unstable support for `m.room_key.withheld` ([\#2512](https://github.com/matrix-org/matrix-js-sdk/pull/2512)). Fixes #2233. + +## ✨ Features + * Sliding sync: add missing filters from latest MSC ([\#2555](https://github.com/matrix-org/matrix-js-sdk/pull/2555)). + * Use stable prefixes for MSC3827 ([\#2537](https://github.com/matrix-org/matrix-js-sdk/pull/2537)). + * Add support for MSC3575: Sliding Sync ([\#2242](https://github.com/matrix-org/matrix-js-sdk/pull/2242)). + +## 🐛 Bug Fixes + * Correct the units in TURN servers expiry documentation ([\#2520](https://github.com/matrix-org/matrix-js-sdk/pull/2520)). + * Re-insert room IDs when decrypting bundled redaction events returned by `/sync` ([\#2531](https://github.com/matrix-org/matrix-js-sdk/pull/2531)). Contributed by @duxovni. + Changes in [19.1.0](https://github.com/matrix-org/matrix-js-sdk/releases/tag/v19.1.0) (2022-07-26) ================================================================================================== diff --git a/package.json b/package.json index a8ddbc092..787e776bd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "matrix-js-sdk", - "version": "19.1.0", + "version": "19.2.0", "description": "Matrix Client-Server SDK for Javascript", "engines": { "node": ">=12.9.0" @@ -104,7 +104,7 @@ "jest-localstorage-mock": "^2.4.6", "jest-sonar-reporter": "^2.0.0", "jsdoc": "^3.6.6", - "matrix-mock-request": "^2.1.0", + "matrix-mock-request": "^2.1.2", "rimraf": "^3.0.2", "terser": "^5.5.1", "tsify": "^5.0.2", diff --git a/spec/unit/crypto.spec.ts b/spec/unit/crypto.spec.ts index b579b7f38..19217cdda 100644 --- a/spec/unit/crypto.spec.ts +++ b/spec/unit/crypto.spec.ts @@ -2,6 +2,7 @@ import '../olm-loader'; // eslint-disable-next-line no-restricted-imports import { EventEmitter } from "events"; +import { MatrixClient } from "../../src/client"; import { Crypto } from "../../src/crypto"; import { MemoryCryptoStore } from "../../src/crypto/store/memory-crypto-store"; import { MockStorageApi } from "../MockStorageApi"; @@ -64,6 +65,10 @@ describe("Crypto", function() { return Olm.init(); }); + afterEach(() => { + jest.useRealTimers(); + }); + it("Crypto exposes the correct olm library version", function() { expect(Crypto.getOlmVersion()[0]).toEqual(3); }); @@ -225,8 +230,8 @@ describe("Crypto", function() { }); describe('Key requests', function() { - let aliceClient; - let bobClient; + let aliceClient: MatrixClient; + let bobClient: MatrixClient; beforeEach(async function() { aliceClient = (new TestClient( @@ -313,7 +318,7 @@ describe("Crypto", function() { expect(events[0].getContent().msgtype).toBe("m.bad.encrypted"); expect(events[1].getContent().msgtype).not.toBe("m.bad.encrypted"); - const cryptoStore = bobClient.cryptoStore; + const cryptoStore = bobClient.crypto.cryptoStore; const eventContent = events[0].getWireContent(); const senderKey = eventContent.sender_key; const sessionId = eventContent.session_id; @@ -383,9 +388,9 @@ describe("Crypto", function() { const ksEvent = await keyshareEventForEvent(aliceClient, event, 1); ksEvent.getContent().sender_key = undefined; // test - bobClient.crypto.addInboundGroupSession = jest.fn(); + bobClient.crypto.olmDevice.addInboundGroupSession = jest.fn(); await bobDecryptor.onRoomKeyEvent(ksEvent); - expect(bobClient.crypto.addInboundGroupSession).not.toHaveBeenCalled(); + expect(bobClient.crypto.olmDevice.addInboundGroupSession).not.toHaveBeenCalled(); }); it("creates a new keyshare request if we request a keyshare", async function() { @@ -401,7 +406,7 @@ describe("Crypto", function() { }, }); await aliceClient.cancelAndResendEventRoomKeyRequest(event); - const cryptoStore = aliceClient.cryptoStore; + const cryptoStore = aliceClient.crypto.cryptoStore; const roomKeyRequestBody = { algorithm: olmlib.MEGOLM_ALGORITHM, room_id: "!someroom", @@ -425,7 +430,8 @@ describe("Crypto", function() { }, }); // replace Alice's sendToDevice function with a mock - aliceClient.sendToDevice = jest.fn().mockResolvedValue(undefined); + const aliceSendToDevice = jest.fn().mockResolvedValue(undefined); + aliceClient.sendToDevice = aliceSendToDevice; aliceClient.startClient(); // make a room key request, and record the transaction ID for the @@ -434,11 +440,12 @@ describe("Crypto", function() { // key requests get queued until the sync has finished, but we don't // let the client set up enough for that to happen, so gut-wrench a bit // to force it to send now. + // @ts-ignore aliceClient.crypto.outgoingRoomKeyRequestManager.sendQueuedRequests(); jest.runAllTimers(); await Promise.resolve(); - expect(aliceClient.sendToDevice).toBeCalledTimes(1); - const txnId = aliceClient.sendToDevice.mock.calls[0][2]; + expect(aliceSendToDevice).toBeCalledTimes(1); + const txnId = aliceSendToDevice.mock.calls[0][2]; // give the room key request manager time to update the state // of the request @@ -451,8 +458,8 @@ describe("Crypto", function() { // cancelAndResend will call sendToDevice twice: // the first call to sendToDevice will be the cancellation // the second call to sendToDevice will be the key request - expect(aliceClient.sendToDevice).toBeCalledTimes(3); - expect(aliceClient.sendToDevice.mock.calls[2][2]).not.toBe(txnId); + expect(aliceSendToDevice).toBeCalledTimes(3); + expect(aliceSendToDevice.mock.calls[2][2]).not.toBe(txnId); }); }); @@ -480,4 +487,105 @@ describe("Crypto", function() { client.stopClient(); }); }); + + describe("encryptAndSendToDevices", () => { + let client: TestClient; + let ensureOlmSessionsForDevices: jest.SpiedFunction; + let encryptMessageForDevice: jest.SpiedFunction; + const payload = { hello: "world" }; + let encryptedPayload: object; + + beforeEach(async () => { + ensureOlmSessionsForDevices = jest.spyOn(olmlib, "ensureOlmSessionsForDevices"); + ensureOlmSessionsForDevices.mockResolvedValue({}); + encryptMessageForDevice = jest.spyOn(olmlib, "encryptMessageForDevice"); + encryptMessageForDevice.mockImplementation(async (...[result,,,,,, payload]) => { + result.plaintext = JSON.stringify(payload); + }); + + client = new TestClient("@alice:example.org", "aliceweb"); + await client.client.initCrypto(); + + encryptedPayload = { + algorithm: "m.olm.v1.curve25519-aes-sha2", + sender_key: client.client.crypto.olmDevice.deviceCurve25519Key, + ciphertext: { plaintext: JSON.stringify(payload) }, + }; + }); + + afterEach(async () => { + ensureOlmSessionsForDevices.mockRestore(); + encryptMessageForDevice.mockRestore(); + await client.stop(); + }); + + it("encrypts and sends to devices", async () => { + client.httpBackend + .when("PUT", "/sendToDevice/m.room.encrypted", { + messages: { + "@bob:example.org": { + bobweb: encryptedPayload, + bobmobile: encryptedPayload, + }, + "@carol:example.org": { + caroldesktop: encryptedPayload, + }, + }, + }) + .respond(200, {}); + + await Promise.all([ + client.client.encryptAndSendToDevices( + [ + { userId: "@bob:example.org", deviceInfo: new DeviceInfo("bobweb") }, + { userId: "@bob:example.org", deviceInfo: new DeviceInfo("bobmobile") }, + { userId: "@carol:example.org", deviceInfo: new DeviceInfo("caroldesktop") }, + ], + payload, + ), + client.httpBackend.flushAllExpected(), + ]); + }); + + it("sends nothing to devices that couldn't be encrypted to", async () => { + encryptMessageForDevice.mockImplementation(async (...[result,,,, userId, device, payload]) => { + // Refuse to encrypt to Carol's desktop device + if (userId === "@carol:example.org" && device.deviceId === "caroldesktop") return; + result.plaintext = JSON.stringify(payload); + }); + + client.httpBackend + .when("PUT", "/sendToDevice/m.room.encrypted", { + // Carol is nowhere to be seen + messages: { "@bob:example.org": { bobweb: encryptedPayload } }, + }) + .respond(200, {}); + + await Promise.all([ + client.client.encryptAndSendToDevices( + [ + { userId: "@bob:example.org", deviceInfo: new DeviceInfo("bobweb") }, + { userId: "@carol:example.org", deviceInfo: new DeviceInfo("caroldesktop") }, + ], + payload, + ), + client.httpBackend.flushAllExpected(), + ]); + }); + + it("no-ops if no devices can be encrypted to", async () => { + // Refuse to encrypt to anybody + encryptMessageForDevice.mockResolvedValue(undefined); + + // Get the room keys version request out of the way + client.httpBackend.when("GET", "/room_keys/version").respond(404, {}); + await client.httpBackend.flush("/room_keys/version", 1); + + await client.client.encryptAndSendToDevices( + [{ userId: "@bob:example.org", deviceInfo: new DeviceInfo("bobweb") }], + payload, + ); + client.httpBackend.verifyNoOutstandingRequests(); + }); + }); }); diff --git a/spec/unit/crypto/algorithms/megolm.spec.ts b/spec/unit/crypto/algorithms/megolm.spec.ts index 8e8250c44..9aa3c5c78 100644 --- a/spec/unit/crypto/algorithms/megolm.spec.ts +++ b/spec/unit/crypto/algorithms/megolm.spec.ts @@ -59,6 +59,7 @@ describe("MegolmDecryption", function() { mockBaseApis = { claimOneTimeKeys: jest.fn(), sendToDevice: jest.fn(), + queueToDevice: jest.fn(), } as unknown as MockedObject; const cryptoStore = new MemoryCryptoStore(); @@ -179,6 +180,7 @@ describe("MegolmDecryption", function() { }); mockBaseApis.sendToDevice.mockReset(); + mockBaseApis.queueToDevice.mockReset(); // do the share megolmDecryption.shareKeysWithDevice(keyRequest); @@ -324,6 +326,7 @@ describe("MegolmDecryption", function() { }, }); mockBaseApis.sendToDevice.mockResolvedValue(undefined); + mockBaseApis.queueToDevice.mockResolvedValue(undefined); aliceDeviceInfo = { deviceId: 'aliceDevice', @@ -413,7 +416,7 @@ describe("MegolmDecryption", function() { expect(mockCrypto.downloadKeys).toHaveBeenCalledWith( ['@alice:home.server'], false, ); - expect(mockBaseApis.sendToDevice).toHaveBeenCalled(); + expect(mockBaseApis.queueToDevice).toHaveBeenCalled(); expect(mockBaseApis.claimOneTimeKeys).toHaveBeenCalledWith( [['@alice:home.server', 'aliceDevice']], 'signed_curve25519', 2000, ); @@ -456,7 +459,7 @@ describe("MegolmDecryption", function() { 'YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWI', ); - mockBaseApis.sendToDevice.mockClear(); + mockBaseApis.queueToDevice.mockClear(); await megolmEncryption.reshareKeyWithDevice( olmDevice.deviceCurve25519Key, ct1.session_id, @@ -464,7 +467,7 @@ describe("MegolmDecryption", function() { aliceDeviceInfo, ); - expect(mockBaseApis.sendToDevice).not.toHaveBeenCalled(); + expect(mockBaseApis.queueToDevice).not.toHaveBeenCalled(); }); }); }); diff --git a/spec/unit/matrix-client.spec.ts b/spec/unit/matrix-client.spec.ts index fbe8c67d7..db3377501 100644 --- a/spec/unit/matrix-client.spec.ts +++ b/spec/unit/matrix-client.spec.ts @@ -27,6 +27,7 @@ import { UNSTABLE_MSC3089_TREE_SUBTYPE, } from "../../src/@types/event"; import { MEGOLM_ALGORITHM } from "../../src/crypto/olmlib"; +import { Crypto } from "../../src/crypto"; import { EventStatus, MatrixEvent } from "../../src/models/event"; import { Preset } from "../../src/@types/partials"; import { ReceiptType } from "../../src/@types/read_receipts"; @@ -1297,4 +1298,19 @@ describe("MatrixClient", function() { expect(result!.aliases).toEqual(response.aliases); }); }); + + describe("encryptAndSendToDevices", () => { + it("throws an error if crypto is unavailable", () => { + client.crypto = undefined; + expect(() => client.encryptAndSendToDevices([], {})).toThrow(); + }); + + it("is an alias for the crypto method", async () => { + client.crypto = testUtils.mock(Crypto, "Crypto"); + const deviceInfos = []; + const payload = {}; + await client.encryptAndSendToDevices(deviceInfos, payload); + expect(client.crypto.encryptAndSendToDevices).toHaveBeenLastCalledWith(deviceInfos, payload); + }); + }); }); diff --git a/spec/unit/queueToDevice.spec.ts b/spec/unit/queueToDevice.spec.ts new file mode 100644 index 000000000..ff22d29d4 --- /dev/null +++ b/spec/unit/queueToDevice.spec.ts @@ -0,0 +1,338 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import MockHttpBackend from 'matrix-mock-request'; +import { indexedDB as fakeIndexedDB } from 'fake-indexeddb'; + +import { IHttpOpts, IndexedDBStore, MatrixEvent, MemoryStore, Room } from "../../src"; +import { MatrixClient } from "../../src/client"; +import { ToDeviceBatch } from '../../src/models/ToDeviceMessage'; +import { logger } from '../../src/logger'; +import { IStore } from '../../src/store'; + +const FAKE_USER = "@alice:example.org"; +const FAKE_DEVICE_ID = "AAAAAAAA"; +const FAKE_PAYLOAD = { + "foo": 42, +}; +const EXPECTED_BODY = { + messages: { + [FAKE_USER]: { + [FAKE_DEVICE_ID]: FAKE_PAYLOAD, + }, + }, +}; + +const FAKE_MSG = { + userId: FAKE_USER, + deviceId: FAKE_DEVICE_ID, + payload: FAKE_PAYLOAD, +}; + +enum StoreType { + Memory = 'Memory', + IndexedDB = 'IndexedDB', +} + +// Jest now uses @sinonjs/fake-timers which exposes tickAsync() and a number of +// other async methods which break the event loop, letting scheduled promise +// callbacks run. Unfortunately, Jest doesn't expose these, so we have to do +// it manually (this is what sinon does under the hood). We do both in a loop +// until the thing we expect happens: hopefully this is the least flakey way +// and avoids assuming anything about the app's behaviour. +const realSetTimeout = setTimeout; +function flushPromises() { + return new Promise(r => { + realSetTimeout(r, 1); + }); +} + +async function flushAndRunTimersUntil(cond: () => boolean) { + while (!cond()) { + await flushPromises(); + if (cond()) break; + jest.advanceTimersToNextTimer(); + } +} + +describe.each([ + [StoreType.Memory], [StoreType.IndexedDB], +])("queueToDevice (%s store)", function(storeType) { + let httpBackend: MockHttpBackend; + let client: MatrixClient; + + beforeEach(async function() { + httpBackend = new MockHttpBackend(); + + let store: IStore; + if (storeType === StoreType.IndexedDB) { + const idbStore = new IndexedDBStore({ indexedDB: fakeIndexedDB }); + await idbStore.startup(); + store = idbStore; + } else { + store = new MemoryStore(); + } + + client = new MatrixClient({ + baseUrl: "https://my.home.server", + accessToken: "my.access.token", + request: httpBackend.requestFn as IHttpOpts["request"], + store, + }); + }); + + afterEach(function() { + jest.useRealTimers(); + client.stopClient(); + }); + + it("sends a to-device message", async function() { + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(request.data).toEqual(EXPECTED_BODY); + }).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + + await httpBackend.flushAllExpected(); + }); + + it("retries on error", async function() { + jest.useFakeTimers(); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(request.data).toEqual(EXPECTED_BODY); + }).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); + expect(httpBackend.flushSync(null, 1)).toEqual(1); + + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); + + expect(httpBackend.flushSync(null, 1)).toEqual(1); + }); + + it("stops retrying on 4xx errors", async function() { + jest.useFakeTimers(); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(400); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); + expect(httpBackend.flushSync(null, 1)).toEqual(1); + + // Asserting that another request is never made is obviously + // a bit tricky - we just flush the queue what should hopefully + // be plenty of times and assert that nothing comes through. + let tries = 0; + await flushAndRunTimersUntil(() => ++tries === 10); + + expect(httpBackend.requests.length).toEqual(0); + }); + + it("honours ratelimiting", async function() { + jest.useFakeTimers(); + + // pick something obscure enough it's unlikley to clash with a + // retry delay the algorithm uses anyway + const retryDelay = 279 * 1000; + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(429, { + errcode: "M_LIMIT_EXCEEDED", + retry_after_ms: retryDelay, + }); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); + expect(httpBackend.flushSync(null, 1)).toEqual(1); + await flushPromises(); + + logger.info("Advancing clock to just before expected retry time..."); + + jest.advanceTimersByTime(retryDelay - 1000); + await flushPromises(); + + expect(httpBackend.requests.length).toEqual(0); + + logger.info("Advancing clock past expected retry time..."); + + jest.advanceTimersByTime(2000); + await flushPromises(); + + expect(httpBackend.flushSync(null, 1)).toEqual(1); + }); + + it("retries on retryImmediately()", async function() { + httpBackend.when("GET", "/_matrix/client/versions").respond(200, { + versions: ["r0.0.1"], + }); + + await Promise.all([client.startClient(), httpBackend.flush(null, 1, 20)]); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); + + client.retryImmediately(); + + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); + }); + + it("retries on when client is started", async function() { + httpBackend.when("GET", "/_matrix/client/versions").respond(200, { + versions: ["r0.0.1"], + }); + + await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); + + client.stopClient(); + await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]); + + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); + }); + + it("retries when a message is retried", async function() { + httpBackend.when("GET", "/_matrix/client/versions").respond(200, { + versions: ["r0.0.1"], + }); + + await Promise.all([client.startClient(), httpBackend.flush(null, 1, 20)]); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); + + const dummyEvent = new MatrixEvent({ + event_id: "!fake:example.org", + }); + const mockRoom = { + updatePendingEvent: jest.fn(), + } as unknown as Room; + client.resendEvent(dummyEvent, mockRoom); + + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); + }); + + it("splits many messages into multiple HTTP requests", async function() { + const batch: ToDeviceBatch = { + eventType: "org.example.foo", + batch: [], + }; + + for (let i = 0; i <= 20; ++i) { + batch.batch.push({ + userId: `@user${i}:example.org`, + deviceId: FAKE_DEVICE_ID, + payload: FAKE_PAYLOAD, + }); + } + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(Object.keys(request.data.messages).length).toEqual(20); + }).respond(200, {}); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(Object.keys(request.data.messages).length).toEqual(1); + }).respond(200, {}); + + await client.queueToDevice(batch); + await httpBackend.flushAllExpected(); + }); +}); diff --git a/spec/unit/webrtc/call.spec.ts b/spec/unit/webrtc/call.spec.ts index 6fc8a5a5d..9fa33e3ec 100644 --- a/spec/unit/webrtc/call.spec.ts +++ b/spec/unit/webrtc/call.spec.ts @@ -415,71 +415,6 @@ describe('Call', function() { }).track.id).toBe("video_track"); }); - describe("should handle stream replacement", () => { - it("with both purpose and id", async () => { - await startVoiceCall(client, call); - - call.updateRemoteSDPStreamMetadata({ - "remote_stream1": { - purpose: SDPStreamMetadataPurpose.Usermedia, - }, - }); - call.pushRemoteFeed(new MockMediaStream("remote_stream1", [])); - const feed = call.getFeeds().find((feed) => feed.stream.id === "remote_stream1"); - - call.updateRemoteSDPStreamMetadata({ - "remote_stream2": { - purpose: SDPStreamMetadataPurpose.Usermedia, - }, - }); - call.pushRemoteFeed(new MockMediaStream("remote_stream2", [])); - - expect(feed?.stream?.id).toBe("remote_stream2"); - }); - - it("with just purpose", async () => { - await startVoiceCall(client, call); - - call.updateRemoteSDPStreamMetadata({ - "remote_stream1": { - purpose: SDPStreamMetadataPurpose.Usermedia, - }, - }); - call.pushRemoteFeed(new MockMediaStream("remote_stream1", [])); - const feed = call.getFeeds().find((feed) => feed.stream.id === "remote_stream1"); - - call.updateRemoteSDPStreamMetadata({ - "remote_stream2": { - purpose: SDPStreamMetadataPurpose.Usermedia, - }, - }); - call.pushRemoteFeed(new MockMediaStream("remote_stream2", [])); - - expect(feed?.stream?.id).toBe("remote_stream2"); - }); - - it("should not replace purpose is different", async () => { - await startVoiceCall(client, call); - - call.updateRemoteSDPStreamMetadata({ - "remote_stream1": { - purpose: SDPStreamMetadataPurpose.Usermedia, - }, - }); - call.pushRemoteFeed(new MockMediaStream("remote_stream1", [])); - const feed = call.getFeeds().find((feed) => feed.stream.id === "remote_stream1"); - - call.updateRemoteSDPStreamMetadata({ - "remote_stream2": { - purpose: SDPStreamMetadataPurpose.Screenshare, - }, - }); - call.pushRemoteFeed(new MockMediaStream("remote_stream2", [])); - - expect(feed?.stream?.id).toBe("remote_stream1"); - }); - }); - it("should handle SDPStreamMetadata changes", async () => { await startVoiceCall(client, call); @@ -794,4 +729,64 @@ describe('Call', function() { expect(supportsMatrixCall()).toBe(false); }); }); + + describe("ignoring streams with ids for which we already have a feed", () => { + const STREAM_ID = "stream_id"; + const FEEDS_CHANGED_CALLBACK = jest.fn(); + + beforeEach(async () => { + await startVoiceCall(client, call); + call.on(CallEvent.FeedsChanged, FEEDS_CHANGED_CALLBACK); + jest.spyOn(call, "pushLocalFeed"); + }); + + afterEach(() => { + FEEDS_CHANGED_CALLBACK.mockReset(); + }); + + it("should ignore stream passed to pushRemoteFeed()", async () => { + await call.onAnswerReceived({ + getContent: () => { + return { + version: 1, + call_id: call.callId, + party_id: 'party_id', + answer: { + sdp: DUMMY_SDP, + }, + [SDPStreamMetadataKey]: { + [STREAM_ID]: { + purpose: SDPStreamMetadataPurpose.Usermedia, + }, + }, + }; + }, + getSender: () => "@test:foo", + }); + + call.pushRemoteFeed(new MockMediaStream(STREAM_ID)); + call.pushRemoteFeed(new MockMediaStream(STREAM_ID)); + + expect(call.getRemoteFeeds().length).toBe(1); + expect(FEEDS_CHANGED_CALLBACK).toHaveBeenCalledTimes(1); + }); + + it("should ignore stream passed to pushRemoteFeedWithoutMetadata()", async () => { + call.pushRemoteFeedWithoutMetadata(new MockMediaStream(STREAM_ID)); + call.pushRemoteFeedWithoutMetadata(new MockMediaStream(STREAM_ID)); + + expect(call.getRemoteFeeds().length).toBe(1); + expect(FEEDS_CHANGED_CALLBACK).toHaveBeenCalledTimes(1); + }); + + it("should ignore stream passed to pushNewLocalFeed()", async () => { + call.pushNewLocalFeed(new MockMediaStream(STREAM_ID), SDPStreamMetadataPurpose.Screenshare); + call.pushNewLocalFeed(new MockMediaStream(STREAM_ID), SDPStreamMetadataPurpose.Screenshare); + + // We already have one local feed from placeVoiceCall() + expect(call.getLocalFeeds().length).toBe(2); + expect(FEEDS_CHANGED_CALLBACK).toHaveBeenCalledTimes(1); + expect(call.pushLocalFeed).toHaveBeenCalled(); + }); + }); }); diff --git a/spec/unit/webrtc/callFeed.spec.ts b/spec/unit/webrtc/callFeed.spec.ts index e8881781d..635fa14fd 100644 --- a/spec/unit/webrtc/callFeed.spec.ts +++ b/spec/unit/webrtc/callFeed.spec.ts @@ -15,13 +15,11 @@ limitations under the License. */ import { SDPStreamMetadataPurpose } from "../../../src/webrtc/callEventTypes"; -import { CallFeed, CallFeedEvent } from "../../../src/webrtc/callFeed"; -import { MockMediaStream, MockMediaStreamTrack } from "../../test-utils/webrtc"; +import { CallFeed } from "../../../src/webrtc/callFeed"; import { TestClient } from "../../TestClient"; +import { MockMediaStream, MockMediaStreamTrack } from "../../test-utils/webrtc"; describe("CallFeed", () => { - const roomId = "room_id"; - let client; beforeEach(() => { @@ -32,30 +30,60 @@ describe("CallFeed", () => { client.stop(); }); - it("should handle stream replacement", () => { - const feedNewStreamCallback = jest.fn(); - const feed = new CallFeed({ - client, - roomId, - userId: "user1", - // @ts-ignore Mock - stream: new MockMediaStream("stream1"), - id: "id", - purpose: SDPStreamMetadataPurpose.Usermedia, - audioMuted: false, - videoMuted: false, + describe("muting", () => { + let feed: CallFeed; + + beforeEach(() => { + feed = new CallFeed({ + client, + roomId: "room1", + userId: "user1", + // @ts-ignore Mock + stream: new MockMediaStream("stream1"), + purpose: SDPStreamMetadataPurpose.Usermedia, + audioMuted: false, + videoMuted: false, + }); }); - feed.on(CallFeedEvent.NewStream, feedNewStreamCallback); - const replacementStream = new MockMediaStream("stream2"); - // @ts-ignore Mock - feed.setNewStream(replacementStream); - expect(feedNewStreamCallback).toHaveBeenCalledWith(replacementStream); - expect(feed.stream).toBe(replacementStream); + describe("muting by default", () => { + it("should mute audio by default", () => { + expect(feed.isAudioMuted()).toBeTruthy(); + }); - feedNewStreamCallback.mockReset(); + it("should mute video by default", () => { + expect(feed.isVideoMuted()).toBeTruthy(); + }); + }); - replacementStream.addTrack(new MockMediaStreamTrack("track_id", "audio")); - expect(feedNewStreamCallback).toHaveBeenCalledWith(replacementStream); + describe("muting after adding a track", () => { + it("should un-mute audio", () => { + // @ts-ignore Mock + feed.stream.addTrack(new MockMediaStreamTrack("track", "audio", true)); + expect(feed.isAudioMuted()).toBeFalsy(); + }); + + it("should un-mute video", () => { + // @ts-ignore Mock + feed.stream.addTrack(new MockMediaStreamTrack("track", "video", true)); + expect(feed.isVideoMuted()).toBeFalsy(); + }); + }); + + describe("muting after calling setAudioVideoMuted()", () => { + it("should mute audio by default ", () => { + // @ts-ignore Mock + feed.stream.addTrack(new MockMediaStreamTrack("track", "audio", true)); + feed.setAudioVideoMuted(true, false); + expect(feed.isAudioMuted()).toBeTruthy(); + }); + + it("should mute video by default", () => { + // @ts-ignore Mock + feed.stream.addTrack(new MockMediaStreamTrack("track", "video", true)); + feed.setAudioVideoMuted(false, true); + expect(feed.isVideoMuted()).toBeTruthy(); + }); + }); }); }); diff --git a/src/ToDeviceMessageQueue.ts b/src/ToDeviceMessageQueue.ts new file mode 100644 index 000000000..12827d8bb --- /dev/null +++ b/src/ToDeviceMessageQueue.ts @@ -0,0 +1,125 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { logger } from "./logger"; +import { MatrixClient } from "./matrix"; +import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage"; +import { MatrixScheduler } from "./scheduler"; + +const MAX_BATCH_SIZE = 20; + +/** + * Maintains a queue of outgoing to-device messages, sending them + * as soon as the homeserver is reachable. + */ +export class ToDeviceMessageQueue { + private sending = false; + private running = true; + private retryTimeout: number = null; + private retryAttempts = 0; + + constructor(private client: MatrixClient) { + } + + public start(): void { + this.running = true; + this.sendQueue(); + } + + public stop(): void { + this.running = false; + if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); + this.retryTimeout = null; + } + + public async queueBatch(batch: ToDeviceBatch): Promise { + const batches: ToDeviceBatchWithTxnId[] = []; + for (let i = 0; i < batch.batch.length; i += MAX_BATCH_SIZE) { + batches.push({ + eventType: batch.eventType, + batch: batch.batch.slice(i, i + MAX_BATCH_SIZE), + txnId: this.client.makeTxnId(), + }); + } + + await this.client.store.saveToDeviceBatches(batches); + this.sendQueue(); + } + + public sendQueue = async (): Promise => { + if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); + this.retryTimeout = null; + + if (this.sending || !this.running) return; + + logger.debug("Attempting to send queued to-device messages"); + + this.sending = true; + let headBatch; + try { + while (this.running) { + headBatch = await this.client.store.getOldestToDeviceBatch(); + if (headBatch === null) break; + await this.sendBatch(headBatch); + await this.client.store.removeToDeviceBatch(headBatch.id); + this.retryAttempts = 0; + } + + // Make sure we're still running after the async tasks: if not, stop. + if (!this.running) return; + + logger.debug("All queued to-device messages sent"); + } catch (e) { + ++this.retryAttempts; + // eslint-disable-next-line @typescript-eslint/naming-convention + // eslint-disable-next-line new-cap + const retryDelay = MatrixScheduler.RETRY_BACKOFF_RATELIMIT(null, this.retryAttempts, e); + if (retryDelay === -1) { + // the scheduler function doesn't differentiate between fatal errors and just getting + // bored and giving up for now + if (Math.floor(e.httpStatus / 100) === 4) { + logger.error("Fatal error when sending to-device message - dropping to-device batch!", e); + await this.client.store.removeToDeviceBatch(headBatch.id); + } else { + logger.info("Automatic retry limit reached for to-device messages."); + } + return; + } + + logger.info(`Failed to send batch of to-device messages. Will retry in ${retryDelay}ms`, e); + this.retryTimeout = setTimeout(this.sendQueue, retryDelay); + } finally { + this.sending = false; + } + }; + + /** + * Attempts to send a batch of to-device messages. + */ + private async sendBatch(batch: IndexedToDeviceBatch): Promise { + const contentMap: Record> = {}; + for (const item of batch.batch) { + if (!contentMap[item.userId]) { + contentMap[item.userId] = {}; + } + contentMap[item.userId][item.deviceId] = item.payload; + } + + logger.info(`Sending batch of ${batch.batch.length} to-device messages with ID ${batch.id}`); + + await this.client.sendToDevice(batch.eventType, contentMap, batch.txnId); + } +} diff --git a/src/client.ts b/src/client.ts index 03f573249..589914687 100644 --- a/src/client.ts +++ b/src/client.ts @@ -41,9 +41,11 @@ import { sleep } from './utils'; import { Direction, EventTimeline } from "./models/event-timeline"; import { IActionsObject, PushProcessor } from "./pushprocessor"; import { AutoDiscovery, AutoDiscoveryAction } from "./autodiscovery"; +import { IEncryptAndSendToDevicesResult } from "./crypto"; import * as olmlib from "./crypto/olmlib"; import { decodeBase64, encodeBase64 } from "./crypto/olmlib"; -import { IExportedDevice as IOlmDevice } from "./crypto/OlmDevice"; +import { IExportedDevice as IExportedOlmDevice } from "./crypto/OlmDevice"; +import { IOlmDevice } from "./crypto/algorithms/megolm"; import { TypedReEmitter } from './ReEmitter'; import { IRoomEncryption, RoomList } from './crypto/RoomList'; import { logger } from './logger'; @@ -202,6 +204,8 @@ import { MSC3575SlidingSyncRequest, MSC3575SlidingSyncResponse, SlidingSync } fr import { SlidingSyncSdk } from "./sliding-sync-sdk"; import { Thread, THREAD_RELATION_TYPE } from "./models/thread"; import { MBeaconInfoEventContent, M_BEACON_INFO } from "./@types/beacon"; +import { ToDeviceMessageQueue } from "./ToDeviceMessageQueue"; +import { ToDeviceBatch } from "./models/ToDeviceMessage"; export type Store = IStore; @@ -214,7 +218,7 @@ const CAPABILITIES_CACHE_MS = 21600000; // 6 hours - an arbitrary value const TURN_CHECK_INTERVAL = 10 * 60 * 1000; // poll for turn credentials every 10 minutes interface IExportedDevice { - olmDevice: IOlmDevice; + olmDevice: IExportedOlmDevice; userId: string; deviceId: string; } @@ -955,13 +959,14 @@ export class MatrixClient extends TypedEventEmitter; - protected exportedOlmDeviceToImport: IOlmDevice; + protected exportedOlmDeviceToImport: IExportedOlmDevice; protected txnCtr = 0; protected mediaHandler = new MediaHandler(this); protected sessionId: string; protected pendingEventEncryption = new Map>(); private useE2eForGroupCall = true; + private toDeviceMessageQueue: ToDeviceMessageQueue; constructor(opts: IMatrixClientCreateOpts) { super(); @@ -1061,6 +1066,8 @@ export class MatrixClient extends TypedEventEmitterexplicitly attempts to retry their lost connection. + * Will also retry any outbound to-device messages currently in the queue to be sent + * (retries of regular outgoing events are handled separately, per-event). * @return {boolean} True if this resulted in a request being retried. */ public retryImmediately(): boolean { + // don't await for this promise: we just want to kick it off + this.toDeviceMessageQueue.sendQueue(); return this.syncApi.retryImmediately(); } @@ -2635,6 +2650,30 @@ export class MatrixClient extends TypedEventEmitter} Promise which + * resolves once the message has been encrypted and sent to the given + * userDeviceMap, and returns the { contentMap, deviceInfoByDeviceId } + * of the successfully sent messages. + */ + public encryptAndSendToDevices( + userDeviceInfoArr: IOlmDevice[], + payload: object, + ): Promise { + if (!this.crypto) { + throw new Error("End-to-End encryption disabled"); + } + return this.crypto.encryptAndSendToDevices(userDeviceInfoArr, payload); + } + /** * Forces the current outbound group session to be discarded such * that another one will be created next time an event is sent. @@ -3591,7 +3630,7 @@ export class MatrixClient extends TypedEventEmitter { + // also kick the to-device queue to retry + this.toDeviceMessageQueue.sendQueue(); + this.updatePendingEventStatus(room, event, EventStatus.SENDING); return this.encryptAndSendEvent(room, event); } @@ -8786,7 +8828,10 @@ export class MatrixClient extends TypedEventEmitter>} contentMap @@ -8818,6 +8863,17 @@ export class MatrixClient extends TypedEventEmitter { + return this.toDeviceMessageQueue.queueBatch(batch); + } + /** * Get the third party protocols that can be reached using * this HS diff --git a/src/crypto/algorithms/megolm.ts b/src/crypto/algorithms/megolm.ts index d47938335..0df456f0a 100644 --- a/src/crypto/algorithms/megolm.ts +++ b/src/crypto/algorithms/megolm.ts @@ -606,17 +606,15 @@ class MegolmEncryption extends EncryptionAlgorithm { return this.crypto.encryptAndSendToDevices( userDeviceMap, payload, - ).then(({ contentMap, deviceInfoByUserIdAndDeviceId }) => { + ).then(({ toDeviceBatch, deviceInfoByUserIdAndDeviceId }) => { // store that we successfully uploaded the keys of the current slice - for (const userId of Object.keys(contentMap)) { - for (const deviceId of Object.keys(contentMap[userId])) { - session.markSharedWithDevice( - userId, - deviceId, - deviceInfoByUserIdAndDeviceId.get(userId).get(deviceId).getIdentityKey(), - chainIndex, - ); - } + for (const msg of toDeviceBatch.batch) { + session.markSharedWithDevice( + msg.userId, + msg.deviceId, + deviceInfoByUserIdAndDeviceId.get(msg.userId).get(msg.deviceId).getIdentityKey(), + chainIndex, + ); } }).catch((error) => { logger.error("failed to encryptAndSendToDevices", error); diff --git a/src/crypto/index.ts b/src/crypto/index.ts index 126fc89e9..ff5ab43fe 100644 --- a/src/crypto/index.ts +++ b/src/crypto/index.ts @@ -23,6 +23,7 @@ limitations under the License. import anotherjson from "another-json"; +import { EventType } from "../@types/event"; import { TypedReEmitter } from '../ReEmitter'; import { logger } from '../logger'; import { IExportedDevice, OlmDevice } from "./OlmDevice"; @@ -69,6 +70,7 @@ import { IStore } from "../store"; import { Room, RoomEvent } from "../models/room"; import { RoomMember, RoomMemberEvent } from "../models/room-member"; import { EventStatus, IClearEvent, IEvent, MatrixEvent, MatrixEventEvent } from "../models/event"; +import { ToDeviceBatch } from "../models/ToDeviceMessage"; import { ClientEvent, ICrossSigningKey, @@ -210,8 +212,8 @@ export interface IEncryptedContent { } /* eslint-enable camelcase */ -interface IEncryptAndSendToDevicesResult { - contentMap: Record>; +export interface IEncryptAndSendToDevicesResult { + toDeviceBatch: ToDeviceBatch; deviceInfoByUserIdAndDeviceId: Map>; } @@ -3115,106 +3117,91 @@ export class Crypto extends TypedEventEmitter} Promise which * resolves once the message has been encrypted and sent to the given * userDeviceMap, and returns the { contentMap, deviceInfoByDeviceId } * of the successfully sent messages. */ - public encryptAndSendToDevices( + public async encryptAndSendToDevices( userDeviceInfoArr: IOlmDevice[], payload: object, ): Promise { - const contentMap: Record> = {}; + const toDeviceBatch: ToDeviceBatch = { + eventType: EventType.RoomMessageEncrypted, + batch: [], + }; const deviceInfoByUserIdAndDeviceId = new Map>(); - const promises: Promise[] = []; - for (const { userId, deviceInfo } of userDeviceInfoArr) { - const deviceId = deviceInfo.deviceId; - const encryptedContent: IEncryptedContent = { - algorithm: olmlib.OLM_ALGORITHM, - sender_key: this.olmDevice.deviceCurve25519Key, - ciphertext: {}, - }; + try { + await Promise.all(userDeviceInfoArr.map(async ({ userId, deviceInfo }) => { + const deviceId = deviceInfo.deviceId; + const encryptedContent: IEncryptedContent = { + algorithm: olmlib.OLM_ALGORITHM, + sender_key: this.olmDevice.deviceCurve25519Key, + ciphertext: {}, + }; - // Assign to temp value to make type-checking happy - let userIdDeviceInfo = deviceInfoByUserIdAndDeviceId.get(userId); + // Assign to temp value to make type-checking happy + let userIdDeviceInfo = deviceInfoByUserIdAndDeviceId.get(userId); - if (userIdDeviceInfo === undefined) { - userIdDeviceInfo = new Map(); - deviceInfoByUserIdAndDeviceId.set(userId, userIdDeviceInfo); - } + if (userIdDeviceInfo === undefined) { + userIdDeviceInfo = new Map(); + deviceInfoByUserIdAndDeviceId.set(userId, userIdDeviceInfo); + } - // We hold by reference, this updates deviceInfoByUserIdAndDeviceId[userId] - userIdDeviceInfo.set(deviceId, deviceInfo); + // We hold by reference, this updates deviceInfoByUserIdAndDeviceId[userId] + userIdDeviceInfo.set(deviceId, deviceInfo); - if (!contentMap[userId]) { - contentMap[userId] = {}; - } - contentMap[userId][deviceId] = encryptedContent; + toDeviceBatch.batch.push({ + userId, + deviceId, + payload: encryptedContent, + }); - promises.push( - olmlib.ensureOlmSessionsForDevices( + await olmlib.ensureOlmSessionsForDevices( this.olmDevice, this.baseApis, { [userId]: [deviceInfo] }, - ).then(() => - olmlib.encryptMessageForDevice( - encryptedContent.ciphertext, - this.userId, - this.deviceId, - this.olmDevice, - userId, - deviceInfo, - payload, - ), - ), - ); - } + ); + await olmlib.encryptMessageForDevice( + encryptedContent.ciphertext, + this.userId, + this.deviceId, + this.olmDevice, + userId, + deviceInfo, + payload, + ); + })); - return Promise.all(promises).then(() => { // prune out any devices that encryptMessageForDevice could not encrypt for, // in which case it will have just not added anything to the ciphertext object. // There's no point sending messages to devices if we couldn't encrypt to them, // since that's effectively a blank message. - for (const userId of Object.keys(contentMap)) { - for (const deviceId of Object.keys(contentMap[userId])) { - if (Object.keys(contentMap[userId][deviceId].ciphertext).length === 0) { - logger.log(`No ciphertext for device ${userId}:${deviceId}: pruning`); - delete contentMap[userId][deviceId]; - } + toDeviceBatch.batch = toDeviceBatch.batch.filter(msg => { + if (Object.keys(msg.payload.ciphertext).length > 0) { + return true; + } else { + logger.log(`No ciphertext for device ${msg.userId}:${msg.deviceId}: pruning`); + return false; } - // No devices left for that user? Strip that too. - if (Object.keys(contentMap[userId]).length === 0) { - logger.log(`Pruned all devices for user ${userId}`); - delete contentMap[userId]; - } - } - - // Is there anything left? - if (Object.keys(contentMap).length === 0) { - logger.log("No users left to send to: aborting"); - return; - } - - return this.baseApis.sendToDevice("m.room.encrypted", contentMap).then( - (response) => ({ contentMap, deviceInfoByUserIdAndDeviceId }), - ).catch(error => { - logger.error("sendToDevice failed", error); - throw error; }); - }).catch(error => { - logger.error("encryptAndSendToDevices promises failed", error); - throw error; - }); + + try { + await this.baseApis.queueToDevice(toDeviceBatch); + return { toDeviceBatch, deviceInfoByUserIdAndDeviceId }; + } catch (e) { + logger.error("sendToDevice failed", e); + throw e; + } + } catch (e) { + logger.error("encryptAndSendToDevices promises failed", e); + throw e; + } } private onMembership = (event: MatrixEvent, member: RoomMember, oldMembership?: string) => { diff --git a/src/models/ToDeviceMessage.ts b/src/models/ToDeviceMessage.ts new file mode 100644 index 000000000..8efc3ed4e --- /dev/null +++ b/src/models/ToDeviceMessage.ts @@ -0,0 +1,38 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +export type ToDevicePayload = Record; + +export interface ToDeviceMessage { + userId: string; + deviceId: string; + payload: ToDevicePayload; +} + +export interface ToDeviceBatch { + eventType: string; + batch: ToDeviceMessage[]; +} + +// Only used internally +export interface ToDeviceBatchWithTxnId extends ToDeviceBatch { + txnId: string; +} + +// Only used internally +export interface IndexedToDeviceBatch extends ToDeviceBatchWithTxnId { + id: number; +} diff --git a/src/models/room.ts b/src/models/room.ts index eaaa8f9ef..2992db587 100644 --- a/src/models/room.ts +++ b/src/models/room.ts @@ -72,7 +72,7 @@ function synthesizeReceipt(userId: string, event: MatrixEvent, receiptType: Rece }, }, }, - type: "m.receipt", + type: EventType.Receipt, room_id: event.getRoomId(), }); } @@ -2423,9 +2423,9 @@ export class Room extends TypedEventEmitter */ public addEphemeralEvents(events: MatrixEvent[]): void { for (const event of events) { - if (event.getType() === 'm.typing') { + if (event.getType() === EventType.Typing) { this.currentState.setTypingEvent(event); - } else if (event.getType() === 'm.receipt') { + } else if (event.getType() === EventType.Receipt) { this.addReceipt(event); } // else ignore - life is too short for us to care about these events } diff --git a/src/scheduler.ts b/src/scheduler.ts index d0249b6cc..271982b74 100644 --- a/src/scheduler.ts +++ b/src/scheduler.ts @@ -57,7 +57,7 @@ export class MatrixScheduler { * failure was due to a rate limited request, the time specified in the error is * waited before being retried. * @param {MatrixEvent} event - * @param {Number} attempts + * @param {Number} attempts Number of attempts that have been made, including the one that just failed (ie. starting at 1) * @param {MatrixError} err * @return {Number} * @see module:scheduler~retryAlgorithm diff --git a/src/sliding-sync.ts b/src/sliding-sync.ts index 9d19ddfed..5254a077d 100644 --- a/src/sliding-sync.ts +++ b/src/sliding-sync.ts @@ -44,6 +44,9 @@ export interface MSC3575Filter { is_invite?: boolean; is_tombstoned?: boolean; room_name_like?: string; + room_types?: string[]; + not_room_types?: string[]; + spaces?: string[]; } /** @@ -602,7 +605,7 @@ export class SlidingSync extends TypedEventEmitter; // type : content @@ -57,21 +57,21 @@ export interface IStore { setSyncToken(token: string): void; /** - * No-op. - * @param {Room} room + * Store the given room. + * @param {Room} room The room to be stored. All properties must be stored. */ storeRoom(room: Room): void; /** - * No-op. - * @param {string} roomId - * @return {null} + * Retrieve a room by its' room ID. + * @param {string} roomId The room ID. + * @return {Room} The room or null. */ getRoom(roomId: string): Room | null; /** - * No-op. - * @return {Array} An empty array. + * Retrieve all known rooms. + * @return {Room[]} A list of rooms, which may be empty. */ getRooms(): Room[]; @@ -82,35 +82,36 @@ export interface IStore { removeRoom(roomId: string): void; /** - * No-op. - * @return {Array} An empty array. + * Retrieve a summary of all the rooms. + * @return {RoomSummary[]} A summary of each room. */ getRoomSummaries(): RoomSummary[]; /** - * No-op. - * @param {User} user + * Store a User. + * @param {User} user The user to store. */ storeUser(user: User): void; /** - * No-op. - * @param {string} userId - * @return {null} + * Retrieve a User by its' user ID. + * @param {string} userId The user ID. + * @return {User} The user or null. */ getUser(userId: string): User | null; /** - * No-op. - * @return {User[]} + * Retrieve all known users. + * @return {User[]} A list of users, which may be empty. */ getUsers(): User[]; /** - * No-op. - * @param {Room} room - * @param {number} limit - * @return {Array} + * Retrieve scrollback for this room. + * @param {Room} room The matrix room + * @param {number} limit The max number of old events to retrieve. + * @return {Array} An array of objects which will be at most 'limit' + * length and at least 0. The objects are the raw event JSON. */ scrollback(room: Room, limit: number): MatrixEvent[]; @@ -209,8 +210,23 @@ export interface IStore { */ deleteAllData(): Promise; + /** + * Returns the out-of-band membership events for this room that + * were previously loaded. + * @param {string} roomId + * @returns {event[]} the events, potentially an empty array if OOB loading didn't yield any new members + * @returns {null} in case the members for this room haven't been stored yet + */ getOutOfBandMembers(roomId: string): Promise; + /** + * Stores the out-of-band membership events for this room. Note that + * it still makes sense to store an empty array as the OOB status for the room is + * marked as fetched, and getOutOfBandMembers will return an empty array instead of null + * @param {string} roomId + * @param {event[]} membershipEvents the membership events to store + * @returns {Promise} when all members have been stored + */ setOutOfBandMembers(roomId: string, membershipEvents: IStateEventWithRoomId[]): Promise; clearOutOfBandMembers(roomId: string): Promise; @@ -222,4 +238,19 @@ export interface IStore { getPendingEvents(roomId: string): Promise[]>; setPendingEvents(roomId: string, events: Partial[]): Promise; + + /** + * Stores batches of outgoing to-device messages + */ + saveToDeviceBatches(batch: ToDeviceBatchWithTxnId[]): Promise; + + /** + * Fetches the oldest batch of to-device messages in the queue + */ + getOldestToDeviceBatch(): Promise; + + /** + * Removes a specific batch of to-device messages from the queue + */ + removeToDeviceBatch(id: number): Promise; } diff --git a/src/store/indexeddb-backend.ts b/src/store/indexeddb-backend.ts index 83470d72a..93d1cb3ab 100644 --- a/src/store/indexeddb-backend.ts +++ b/src/store/indexeddb-backend.ts @@ -16,6 +16,7 @@ limitations under the License. import { ISavedSync } from "./index"; import { IEvent, IStartClientOpts, IStateEventWithRoomId, ISyncResponse } from ".."; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; export interface IIndexedDBBackend { connect(): Promise; @@ -31,6 +32,9 @@ export interface IIndexedDBBackend { getUserPresenceEvents(): Promise; getClientOptions(): Promise; storeClientOptions(options: IStartClientOpts): Promise; + saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise; + getOldestToDeviceBatch(): Promise; + removeToDeviceBatch(id: number): Promise; } export type UserTuple = [userId: string, presenceEvent: Partial]; diff --git a/src/store/indexeddb-local-backend.ts b/src/store/indexeddb-local-backend.ts index 178931ff8..bd646c372 100644 --- a/src/store/indexeddb-local-backend.ts +++ b/src/store/indexeddb-local-backend.ts @@ -21,8 +21,9 @@ import { logger } from '../logger'; import { IStartClientOpts, IStateEventWithRoomId } from ".."; import { ISavedSync } from "./index"; import { IIndexedDBBackend, UserTuple } from "./indexeddb-backend"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; -const VERSION = 3; +const VERSION = 4; function createDatabase(db: IDBDatabase): void { // Make user store, clobber based on user ID. (userId property of User objects) @@ -49,6 +50,10 @@ function upgradeSchemaV3(db: IDBDatabase): void { { keyPath: ["clobber"] }); } +function upgradeSchemaV4(db: IDBDatabase): void { + db.createObjectStore("to_device_queue", { autoIncrement: true }); +} + /** * Helper method to collect results from a Cursor and promiseify it. * @param {ObjectStore|Index} store The store to perform openCursor on. @@ -112,7 +117,7 @@ function reqAsPromise(req: IDBRequest): Promise { }); } -function reqAsCursorPromise(req: IDBRequest): Promise { +function reqAsCursorPromise(req: IDBRequest): Promise { return reqAsEventPromise(req).then((event) => req.result); } @@ -177,6 +182,9 @@ export class LocalIndexedDBStoreBackend implements IIndexedDBBackend { if (oldVersion < 3) { upgradeSchemaV3(db); } + if (oldVersion < 4) { + upgradeSchemaV4(db); + } // Expand as needed. }; @@ -561,4 +569,36 @@ export class LocalIndexedDBStoreBackend implements IIndexedDBBackend { }); // put == UPSERT await txnAsPromise(txn); } + + public async saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + const txn = this.db.transaction(["to_device_queue"], "readwrite"); + const store = txn.objectStore("to_device_queue"); + for (const batch of batches) { + store.add(batch); + } + await txnAsPromise(txn); + } + + public async getOldestToDeviceBatch(): Promise { + const txn = this.db.transaction(["to_device_queue"], "readonly"); + const store = txn.objectStore("to_device_queue"); + const cursor = await reqAsCursorPromise(store.openCursor()); + if (!cursor) return null; + + const resultBatch = cursor.value as ToDeviceBatchWithTxnId; + + return { + id: cursor.key as number, + txnId: resultBatch.txnId, + eventType: resultBatch.eventType, + batch: resultBatch.batch, + }; + } + + public async removeToDeviceBatch(id: number): Promise { + const txn = this.db.transaction(["to_device_queue"], "readwrite"); + const store = txn.objectStore("to_device_queue"); + store.delete(id); + await txnAsPromise(txn); + } } diff --git a/src/store/indexeddb-remote-backend.ts b/src/store/indexeddb-remote-backend.ts index 9c06105a1..67ab2ccd2 100644 --- a/src/store/indexeddb-remote-backend.ts +++ b/src/store/indexeddb-remote-backend.ts @@ -20,6 +20,7 @@ import { ISavedSync } from "./index"; import { IStartClientOpts } from "../client"; import { IStateEventWithRoomId, ISyncResponse } from ".."; import { IIndexedDBBackend, UserTuple } from "./indexeddb-backend"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; export class RemoteIndexedDBStoreBackend implements IIndexedDBBackend { private worker: Worker; @@ -133,6 +134,18 @@ export class RemoteIndexedDBStoreBackend implements IIndexedDBBackend { return this.doCmd('getUserPresenceEvents'); } + public async saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + return this.doCmd('saveToDeviceBatches', [batches]); + } + + public async getOldestToDeviceBatch(): Promise { + return this.doCmd('getOldestToDeviceBatch'); + } + + public async removeToDeviceBatch(id: number): Promise { + return this.doCmd('removeToDeviceBatch', [id]); + } + private ensureStarted(): Promise { if (this.startPromise === null) { this.worker = this.workerFactory(); diff --git a/src/store/indexeddb-store-worker.ts b/src/store/indexeddb-store-worker.ts index 0d37dbce9..ced776961 100644 --- a/src/store/indexeddb-store-worker.ts +++ b/src/store/indexeddb-store-worker.ts @@ -103,6 +103,15 @@ export class IndexedDBStoreWorker { case 'storeClientOptions': prom = this.backend.storeClientOptions(msg.args[0]); break; + case 'saveToDeviceBatches': + prom = this.backend.saveToDeviceBatches(msg.args[0]); + break; + case 'getOldestToDeviceBatch': + prom = this.backend.getOldestToDeviceBatch(); + break; + case 'removeToDeviceBatch': + prom = this.backend.removeToDeviceBatch(msg.args[0]); + break; } if (prom === undefined) { diff --git a/src/store/indexeddb.ts b/src/store/indexeddb.ts index 09a85fd1b..44f684bdf 100644 --- a/src/store/indexeddb.ts +++ b/src/store/indexeddb.ts @@ -27,6 +27,7 @@ import { IIndexedDBBackend } from "./indexeddb-backend"; import { ISyncResponse } from "../sync-accumulator"; import { TypedEventEmitter } from "../models/typed-event-emitter"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; /** * This is an internal module. See {@link IndexedDBStore} for the public class. @@ -351,6 +352,18 @@ export class IndexedDBStore extends MemoryStore { this.localStorage.removeItem(pendingEventsKey(roomId)); } } + + public saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + return this.backend.saveToDeviceBatches(batches); + } + + public getOldestToDeviceBatch(): Promise { + return this.backend.getOldestToDeviceBatch(); + } + + public removeToDeviceBatch(id: number): Promise { + return this.backend.removeToDeviceBatch(id); + } } /** diff --git a/src/store/memory.ts b/src/store/memory.ts index cb49e425f..0ed43a5b5 100644 --- a/src/store/memory.ts +++ b/src/store/memory.ts @@ -30,6 +30,7 @@ import { ISavedSync, IStore } from "./index"; import { RoomSummary } from "../models/room-summary"; import { ISyncResponse } from "../sync-accumulator"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; function isValidFilterId(filterId: string): boolean { const isValidStr = typeof filterId === "string" && @@ -64,6 +65,8 @@ export class MemoryStore implements IStore { private oobMembers: Record = {}; // roomId: [member events] private pendingEvents: { [roomId: string]: Partial[] } = {}; private clientOptions = {}; + private pendingToDeviceBatches: IndexedToDeviceBatch[] = []; + private nextToDeviceBatchId = 0; constructor(opts: IOpts = {}) { this.localStorage = opts.localStorage; @@ -429,4 +432,26 @@ export class MemoryStore implements IStore { public async setPendingEvents(roomId: string, events: Partial[]): Promise { this.pendingEvents[roomId] = events; } + + public saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + for (const batch of batches) { + this.pendingToDeviceBatches.push({ + id: this.nextToDeviceBatchId++, + eventType: batch.eventType, + txnId: batch.txnId, + batch: batch.batch, + }); + } + return Promise.resolve(); + } + + public async getOldestToDeviceBatch(): Promise { + if (this.pendingToDeviceBatches.length === 0) return null; + return this.pendingToDeviceBatches[0]; + } + + public removeToDeviceBatch(id: number): Promise { + this.pendingToDeviceBatches = this.pendingToDeviceBatches.filter(batch => batch.id !== id); + return Promise.resolve(); + } } diff --git a/src/store/stub.ts b/src/store/stub.ts index c9fc57055..eb988a973 100644 --- a/src/store/stub.ts +++ b/src/store/stub.ts @@ -28,6 +28,7 @@ import { ISavedSync, IStore } from "./index"; import { RoomSummary } from "../models/room-summary"; import { ISyncResponse } from "../sync-accumulator"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatch } from "../models/ToDeviceMessage"; /** * Construct a stub store. This does no-ops on most store methods. @@ -270,4 +271,16 @@ export class StubStore implements IStore { public setPendingEvents(roomId: string, events: Partial[]): Promise { return Promise.resolve(); } + + public async saveToDeviceBatches(batch: ToDeviceBatch[]): Promise { + return Promise.resolve(); + } + + public getOldestToDeviceBatch(): Promise { + return Promise.resolve(null); + } + + public async removeToDeviceBatch(id: number): Promise { + return Promise.resolve(); + } } diff --git a/src/webrtc/call.ts b/src/webrtc/call.ts index 526eaf767..9ea6a6711 100644 --- a/src/webrtc/call.ts +++ b/src/webrtc/call.ts @@ -614,26 +614,26 @@ export class MatrixCall extends TypedEventEmitter feed.purpose === purpose); - if (existingFeed) { - existingFeed.setNewStream(stream); - } else { - this.feeds.push(new CallFeed({ - client: this.client, - roomId: this.roomId, - userId, - stream, - purpose, - audioMuted, - videoMuted, - })); - this.emit(CallEvent.FeedsChanged, this.feeds); + if (this.getFeedByStreamId(stream.id)) { + logger.warn(`Ignoring stream with id ${stream.id} because we already have a feed for it`); + return; } - logger.info(`Call ${this.callId} Pushed remote stream (id="${ - stream.id}", active="${stream.active}", purpose=${purpose})`); + this.feeds.push(new CallFeed({ + client: this.client, + roomId: this.roomId, + userId, + stream, + purpose, + audioMuted, + videoMuted, + })); + this.emit(CallEvent.FeedsChanged, this.feeds); + + logger.info( + `Call ${this.callId} pushed remote stream (id="${stream.id}", ` + + `active="${stream.active}", purpose=${purpose})`, + ); } /** @@ -655,25 +655,23 @@ export class MatrixCall extends TypedEventEmitter feed.purpose === purpose); - if (existingFeed) { - existingFeed.setNewStream(stream); - } else { - this.pushLocalFeed( - new CallFeed({ - client: this.client, - roomId: this.roomId, - audioMuted: false, - videoMuted: false, - userId, - stream, - purpose, - }), - addToPeerConnection, - ); - this.emit(CallEvent.FeedsChanged, this.feeds); + if (this.getFeedByStreamId(stream.id)) { + logger.warn(`Ignoring stream with id ${stream.id} because we already have a feed for it`); + return; } + + this.pushLocalFeed( + new CallFeed({ + client: this.client, + roomId: this.roomId, + audioMuted: false, + videoMuted: false, + userId, + stream, + purpose, + }), + addToPeerConnection, + ); } /** diff --git a/src/webrtc/callFeed.ts b/src/webrtc/callFeed.ts index c91338952..44a568cea 100644 --- a/src/webrtc/callFeed.ts +++ b/src/webrtc/callFeed.ts @@ -181,7 +181,9 @@ export class CallFeed extends TypedEventEmitter /** * Replaces the current MediaStream with a new one. - * This method should be only used by MatrixCall. + * The stream will be different and new stream as remore parties are + * concerned, but this can be used for convenience locally to set up + * volume listeners automatically on the new stream etc. * @param newStream new stream with which to replace the current one */ public setNewStream(newStream: MediaStream): void { diff --git a/yarn.lock b/yarn.lock index 437517387..9476b4d34 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1307,7 +1307,6 @@ "@matrix-org/olm@https://gitlab.matrix.org/api/v4/projects/27/packages/npm/@matrix-org/olm/-/@matrix-org/olm-3.2.12.tgz": version "3.2.12" - uid "0bce3c86f9d36a4984d3c3e07df1c3fb4c679bd9" resolved "https://gitlab.matrix.org/api/v4/projects/27/packages/npm/@matrix-org/olm/-/@matrix-org/olm-3.2.12.tgz#0bce3c86f9d36a4984d3c3e07df1c3fb4c679bd9" "@nicolo-ribaudo/chokidar-2@2.1.8-no-fsevents.3": @@ -1438,9 +1437,9 @@ "@octokit/openapi-types" "^12.10.0" "@sinclair/typebox@^0.24.1": - version "0.24.20" - resolved "https://registry.yarnpkg.com/@sinclair/typebox/-/typebox-0.24.20.tgz#11a657875de6008622d53f56e063a6347c51a6dd" - integrity sha512-kVaO5aEFZb33nPMTZBxiPEkY+slxiPtqC7QX8f9B3eGOMBvEfuMfxp9DSTTCsRJPumPKjrge4yagyssO4q6qzQ== + version "0.24.26" + resolved "https://registry.yarnpkg.com/@sinclair/typebox/-/typebox-0.24.26.tgz#84f9e8c1d93154e734a7947609a1dc7c7a81cc22" + integrity sha512-1ZVIyyS1NXDRVT8GjWD5jULjhDyM3IsIHef2VGUMdnWOlX2tkPjyEX/7K0TGSH2S8EaPhp1ylFdjSjUGQ+gecg== "@sinonjs/commons@^1.7.0": version "1.8.3" @@ -1581,9 +1580,9 @@ integrity sha512-eC4U9MlIcu2q0KQmXszyn5Akca/0jrQmwDRgpAMJai7qBWq4amIQhZyNau4VYGtCeALvW1/NtjzJJ567aZxfKA== "@types/node@*": - version "18.6.1" - resolved "https://registry.yarnpkg.com/@types/node/-/node-18.6.1.tgz#828e4785ccca13f44e2fb6852ae0ef11e3e20ba5" - integrity sha512-z+2vB6yDt1fNwKOeGbckpmirO+VBDuQqecXkgeIqDlaOtmKn6hPR/viQ8cxCfqLU4fTlvM3+YjM367TukWdxpg== + version "18.6.3" + resolved "https://registry.yarnpkg.com/@types/node/-/node-18.6.3.tgz#4e4a95b6fe44014563ceb514b2598b3e623d1c98" + integrity sha512-6qKpDtoaYLM+5+AFChLhHermMQxc3TOEFIDzrZLPRGHPrLEwqFkkT5Kx3ju05g6X7uDPazz3jHbKPX0KzCjntg== "@types/node@16": version "16.11.45" @@ -1610,6 +1609,11 @@ resolved "https://registry.yarnpkg.com/@types/retry/-/retry-0.12.0.tgz#2b35eccfcee7d38cd72ad99232fbd58bffb3c84d" integrity sha512-wWKOClTTiizcZhXnPY4wikVAwmdYHp8q6DmC+EJUzAMsycb7HB32Kh9RN4+0gExjmPmZSAQjgURXIGATPegAvA== +"@types/sdp-transform@^2.4.5": + version "2.4.5" + resolved "https://registry.yarnpkg.com/@types/sdp-transform/-/sdp-transform-2.4.5.tgz#3167961e0a1a5265545e278627aa37c606003f53" + integrity sha512-GVO0gnmbyO3Oxm2HdPsYUNcyihZE3GyCY8ysMYHuQGfLhGZq89Nm4lSzULWTzZoyHtg+VO/IdrnxZHPnPSGnAg== + "@types/stack-utils@^2.0.0": version "2.0.1" resolved "https://registry.yarnpkg.com/@types/stack-utils/-/stack-utils-2.0.1.tgz#20f18294f797f2209b5f65c8e3b5c8e8261d127c" @@ -4802,10 +4806,10 @@ matrix-events-sdk@^0.0.1-beta.7: resolved "https://registry.yarnpkg.com/matrix-events-sdk/-/matrix-events-sdk-0.0.1-beta.7.tgz#5ffe45eba1f67cc8d7c2377736c728b322524934" integrity sha512-9jl4wtWanUFSy2sr2lCjErN/oC8KTAtaeaozJtrgot1JiQcEI4Rda9OLgQ7nLKaqb4Z/QUx/fR3XpDzm5Jy1JA== -matrix-mock-request@^2.1.0: - version "2.1.0" - resolved "https://registry.yarnpkg.com/matrix-mock-request/-/matrix-mock-request-2.1.0.tgz#86f5b0ef846865d0767d3a8e64f5bcd6ca94c178" - integrity sha512-Cjpl3yP6h0yu5GKG89m1XZXZlm69Kg/qHV41N/t6SrQsgcfM3Bfavqx9YrtG0UnuXGy4bBSZIe1QiWVeFPZw1A== +matrix-mock-request@^2.1.2: + version "2.1.2" + resolved "https://registry.yarnpkg.com/matrix-mock-request/-/matrix-mock-request-2.1.2.tgz#11e38ed1233dced88a6f2bfba1684d5c5b3aa2c2" + integrity sha512-/OXCIzDGSLPJ3fs+uzDrtaOHI/Sqp4iEuniRn31U8S06mPXbvAnXknHqJ4c6A/KVwJj/nPFbGXpK4wPM038I6A== dependencies: expect "^28.1.0" @@ -5816,6 +5820,11 @@ safer-buffer@^2.0.2, safer-buffer@^2.1.0, safer-buffer@~2.1.0: resolved "https://registry.yarnpkg.com/safer-buffer/-/safer-buffer-2.1.2.tgz#44fa161b0187b9549dd84bb91802f9bd8385cd6a" integrity sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg== +sdp-transform@^2.14.1: + version "2.14.1" + resolved "https://registry.yarnpkg.com/sdp-transform/-/sdp-transform-2.14.1.tgz#2bb443583d478dee217df4caa284c46b870d5827" + integrity sha512-RjZyX3nVwJyCuTo5tGPx+PZWkDMCg7oOLpSlhjDdZfwUoNqG1mM8nyj31IGHyaPWXhjbP7cdK3qZ2bmkJ1GzRw== + semver@7.0.0: version "7.0.0" resolved "https://registry.yarnpkg.com/semver/-/semver-7.0.0.tgz#5f3ca35761e47e05b206c6daff2cf814f0316b8e"