diff --git a/package.json b/package.json index 39370d4d6..c512fefb9 100644 --- a/package.json +++ b/package.json @@ -102,7 +102,7 @@ "jest-localstorage-mock": "^2.4.6", "jest-sonar-reporter": "^2.0.0", "jsdoc": "^3.6.6", - "matrix-mock-request": "^2.0.1", + "matrix-mock-request": "^2.1.0", "rimraf": "^3.0.2", "terser": "^5.5.1", "tsify": "^5.0.2", diff --git a/spec/integ/sliding-sync-sdk.spec.ts b/spec/integ/sliding-sync-sdk.spec.ts new file mode 100644 index 000000000..12ff0ae2e --- /dev/null +++ b/spec/integ/sliding-sync-sdk.spec.ts @@ -0,0 +1,732 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// eslint-disable-next-line no-restricted-imports +import MockHttpBackend from "matrix-mock-request"; +import { fail } from "assert"; + +import { SlidingSync, SlidingSyncEvent, MSC3575RoomData, SlidingSyncState, Extension } from "../../src/sliding-sync"; +import { TestClient } from "../TestClient"; +import { IRoomEvent, IStateEvent } from "../../src/sync-accumulator"; +import { + MatrixClient, MatrixEvent, NotificationCountType, JoinRule, MatrixError, + EventType, IPushRules, PushRuleKind, TweakName, ClientEvent, +} from "../../src"; +import { SlidingSyncSdk } from "../../src/sliding-sync-sdk"; +import { SyncState } from "../../src/sync"; +import { IStoredClientOpts } from "../../src/client"; + +describe("SlidingSyncSdk", () => { + let client: MatrixClient = null; + let httpBackend: MockHttpBackend = null; + let sdk: SlidingSyncSdk = null; + let mockSlidingSync: SlidingSync = null; + const selfUserId = "@alice:localhost"; + const selfAccessToken = "aseukfgwef"; + + const mockifySlidingSync = (s: SlidingSync): SlidingSync => { + s.getList = jest.fn(); + s.getListData = jest.fn(); + s.getRoomSubscriptions = jest.fn(); + s.listLength = jest.fn(); + s.modifyRoomSubscriptionInfo = jest.fn(); + s.modifyRoomSubscriptions = jest.fn(); + s.registerExtension = jest.fn(); + s.setList = jest.fn(); + s.setListRanges = jest.fn(); + s.start = jest.fn(); + s.stop = jest.fn(); + s.resend = jest.fn(); + return s; + }; + + // shorthand way to make events without filling in all the fields + let eventIdCounter = 0; + const mkOwnEvent = (evType: string, content: object): IRoomEvent => { + eventIdCounter++; + return { + type: evType, + content: content, + sender: selfUserId, + origin_server_ts: Date.now(), + event_id: "$" + eventIdCounter, + }; + }; + const mkOwnStateEvent = (evType: string, content: object, stateKey?: string): IStateEvent => { + eventIdCounter++; + return { + type: evType, + state_key: stateKey, + content: content, + sender: selfUserId, + origin_server_ts: Date.now(), + event_id: "$" + eventIdCounter, + }; + }; + const assertTimelineEvents = (got: MatrixEvent[], want: IRoomEvent[]): void => { + expect(got.length).toEqual(want.length); + got.forEach((m, i) => { + expect(m.getType()).toEqual(want[i].type); + expect(m.getSender()).toEqual(want[i].sender); + expect(m.getId()).toEqual(want[i].event_id); + expect(m.getContent()).toEqual(want[i].content); + expect(m.getTs()).toEqual(want[i].origin_server_ts); + if (want[i].unsigned) { + expect(m.getUnsigned()).toEqual(want[i].unsigned); + } + const maybeStateEvent = want[i] as IStateEvent; + if (maybeStateEvent.state_key) { + expect(m.getStateKey()).toEqual(maybeStateEvent.state_key); + } + }); + }; + + // assign client/httpBackend globals + const setupClient = async (testOpts?: Partial) => { + testOpts = testOpts || {}; + const testClient = new TestClient(selfUserId, "DEVICE", selfAccessToken); + httpBackend = testClient.httpBackend; + client = testClient.client; + mockSlidingSync = mockifySlidingSync(new SlidingSync("", [], {}, client, 0)); + if (testOpts.withCrypto) { + httpBackend.when("GET", "/room_keys/version").respond(404, {}); + await client.initCrypto(); + testOpts.crypto = client.crypto; + } + httpBackend.when("GET", "/_matrix/client/r0/pushrules").respond(200, {}); + sdk = new SlidingSyncSdk(mockSlidingSync, client, testOpts); + }; + + // tear down client/httpBackend globals + const teardownClient = () => { + client.stopClient(); + return httpBackend.stop(); + }; + + // find an extension on a SlidingSyncSdk instance + const findExtension = (name: string): Extension => { + expect(mockSlidingSync.registerExtension).toHaveBeenCalled(); + const mockFn = mockSlidingSync.registerExtension as jest.Mock; + // find the extension + for (let i = 0; i < mockFn.mock.calls.length; i++) { + const calledExtension = mockFn.mock.calls[i][0] as Extension; + if (calledExtension && calledExtension.name() === name) { + return calledExtension; + } + } + fail("cannot find extension " + name); + }; + + describe("sync/stop", () => { + beforeAll(async () => { + await setupClient(); + }); + afterAll(teardownClient); + it("can sync()", async () => { + const hasSynced = sdk.sync(); + await httpBackend.flushAllExpected(); + await hasSynced; + expect(mockSlidingSync.start).toBeCalled(); + }); + it("can stop()", async () => { + sdk.stop(); + expect(mockSlidingSync.stop).toBeCalled(); + }); + }); + + describe("rooms", () => { + beforeAll(async () => { + await setupClient(); + }); + afterAll(teardownClient); + + describe("initial", () => { + beforeAll(async () => { + const hasSynced = sdk.sync(); + await httpBackend.flushAllExpected(); + await hasSynced; + }); + // inject some rooms with different fields set. + // All rooms are new so they all have initial: true + const roomA = "!a_state_and_timeline:localhost"; + const roomB = "!b_timeline_only:localhost"; + const roomC = "!c_with_highlight_count:localhost"; + const roomD = "!d_with_notif_count:localhost"; + const roomE = "!e_with_invite:localhost"; + const roomF = "!f_calc_room_name:localhost"; + const data: Record = { + [roomA]: { + name: "A", + required_state: [ + mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""), + mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId), + mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""), + mkOwnStateEvent(EventType.RoomName, { name: "A" }, ""), + ], + timeline: [ + mkOwnEvent(EventType.RoomMessage, { body: "hello A" }), + mkOwnEvent(EventType.RoomMessage, { body: "world A" }), + ], + initial: true, + }, + [roomB]: { + name: "B", + required_state: [], + timeline: [ + mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""), + mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId), + mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""), + mkOwnEvent(EventType.RoomMessage, { body: "hello B" }), + mkOwnEvent(EventType.RoomMessage, { body: "world B" }), + + ], + initial: true, + }, + [roomC]: { + name: "C", + required_state: [], + timeline: [ + mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""), + mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId), + mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""), + mkOwnEvent(EventType.RoomMessage, { body: "hello C" }), + mkOwnEvent(EventType.RoomMessage, { body: "world C" }), + ], + highlight_count: 5, + initial: true, + }, + [roomD]: { + name: "D", + required_state: [], + timeline: [ + mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""), + mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId), + mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""), + mkOwnEvent(EventType.RoomMessage, { body: "hello D" }), + mkOwnEvent(EventType.RoomMessage, { body: "world D" }), + ], + notification_count: 5, + initial: true, + }, + [roomE]: { + name: "E", + required_state: [], + timeline: [], + invite_state: [ + { + type: EventType.RoomMember, + content: { membership: "invite" }, + state_key: selfUserId, + sender: "@bob:localhost", + event_id: "$room_e_invite", + origin_server_ts: 123456, + }, + { + type: "m.room.join_rules", + content: { join_rule: "invite" }, + state_key: "", + sender: "@bob:localhost", + event_id: "$room_e_join_rule", + origin_server_ts: 123456, + }, + ], + initial: true, + }, + [roomF]: { + name: "#foo:localhost", + required_state: [ + mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""), + mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId), + mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""), + mkOwnStateEvent(EventType.RoomCanonicalAlias, { alias: "#foo:localhost" }, ""), + mkOwnStateEvent(EventType.RoomName, { name: "This should be ignored" }, ""), + ], + timeline: [ + mkOwnEvent(EventType.RoomMessage, { body: "hello A" }), + mkOwnEvent(EventType.RoomMessage, { body: "world A" }), + ], + initial: true, + }, + }; + + it("can be created with required_state and timeline", () => { + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomA, data[roomA]); + const gotRoom = client.getRoom(roomA); + expect(gotRoom).toBeDefined(); + expect(gotRoom.name).toEqual(data[roomA].name); + expect(gotRoom.getMyMembership()).toEqual("join"); + assertTimelineEvents(gotRoom.getLiveTimeline().getEvents().slice(-2), data[roomA].timeline); + }); + + it("can be created with timeline only", () => { + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomB, data[roomB]); + const gotRoom = client.getRoom(roomB); + expect(gotRoom).toBeDefined(); + expect(gotRoom.name).toEqual(data[roomB].name); + expect(gotRoom.getMyMembership()).toEqual("join"); + assertTimelineEvents(gotRoom.getLiveTimeline().getEvents().slice(-5), data[roomB].timeline); + }); + + it("can be created with a highlight_count", () => { + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomC, data[roomC]); + const gotRoom = client.getRoom(roomC); + expect(gotRoom).toBeDefined(); + expect( + gotRoom.getUnreadNotificationCount(NotificationCountType.Highlight), + ).toEqual(data[roomC].highlight_count); + }); + + it("can be created with a notification_count", () => { + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomD, data[roomD]); + const gotRoom = client.getRoom(roomD); + expect(gotRoom).toBeDefined(); + expect( + gotRoom.getUnreadNotificationCount(NotificationCountType.Total), + ).toEqual(data[roomD].notification_count); + }); + + it("can be created with invite_state", () => { + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomE, data[roomE]); + const gotRoom = client.getRoom(roomE); + expect(gotRoom).toBeDefined(); + expect(gotRoom.getMyMembership()).toEqual("invite"); + expect(gotRoom.currentState.getJoinRule()).toEqual(JoinRule.Invite); + }); + + it("uses the 'name' field to caluclate the room name", () => { + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomF, data[roomF]); + const gotRoom = client.getRoom(roomF); + expect(gotRoom).toBeDefined(); + expect( + gotRoom.name, + ).toEqual(data[roomF].name); + }); + + describe("updating", () => { + it("can update with a new timeline event", async () => { + const newEvent = mkOwnEvent(EventType.RoomMessage, { body: "new event A" }); + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomA, { + timeline: [newEvent], + required_state: [], + name: data[roomA].name, + }); + const gotRoom = client.getRoom(roomA); + expect(gotRoom).toBeDefined(); + const newTimeline = data[roomA].timeline; + newTimeline.push(newEvent); + assertTimelineEvents(gotRoom.getLiveTimeline().getEvents().slice(-3), newTimeline); + }); + + it("can update with a new required_state event", async () => { + let gotRoom = client.getRoom(roomB); + expect(gotRoom.getJoinRule()).toEqual(JoinRule.Invite); // default + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomB, { + required_state: [ + mkOwnStateEvent("m.room.join_rules", { join_rule: "restricted" }, ""), + ], + timeline: [], + name: data[roomB].name, + }); + gotRoom = client.getRoom(roomB); + expect(gotRoom).toBeDefined(); + expect(gotRoom.getJoinRule()).toEqual(JoinRule.Restricted); + }); + + it("can update with a new highlight_count", async () => { + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomC, { + name: data[roomC].name, + required_state: [], + timeline: [], + highlight_count: 1, + }); + const gotRoom = client.getRoom(roomC); + expect(gotRoom).toBeDefined(); + expect( + gotRoom.getUnreadNotificationCount(NotificationCountType.Highlight), + ).toEqual(1); + }); + + it("can update with a new notification_count", async () => { + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomD, { + name: data[roomD].name, + required_state: [], + timeline: [], + notification_count: 1, + }); + const gotRoom = client.getRoom(roomD); + expect(gotRoom).toBeDefined(); + expect( + gotRoom.getUnreadNotificationCount(NotificationCountType.Total), + ).toEqual(1); + }); + }); + }); + }); + + describe("lifecycle", () => { + beforeAll(async () => { + await setupClient(); + const hasSynced = sdk.sync(); + await httpBackend.flushAllExpected(); + await hasSynced; + }); + const FAILED_SYNC_ERROR_THRESHOLD = 3; // would be nice to export the const in the actual class... + + it("emits SyncState.Reconnecting when < FAILED_SYNC_ERROR_THRESHOLD & SyncState.Error when over", async () => { + mockSlidingSync.emit( + SlidingSyncEvent.Lifecycle, SlidingSyncState.Complete, + { pos: "h", lists: [], rooms: {}, extensions: {} }, null, + ); + expect(sdk.getSyncState()).toEqual(SyncState.Syncing); + + mockSlidingSync.emit( + SlidingSyncEvent.Lifecycle, SlidingSyncState.RequestFinished, null, new Error("generic"), + ); + expect(sdk.getSyncState()).toEqual(SyncState.Reconnecting); + + for (let i = 0; i < FAILED_SYNC_ERROR_THRESHOLD; i++) { + mockSlidingSync.emit( + SlidingSyncEvent.Lifecycle, SlidingSyncState.RequestFinished, null, new Error("generic"), + ); + } + expect(sdk.getSyncState()).toEqual(SyncState.Error); + }); + + it("emits SyncState.Syncing after a previous SyncState.Error", async () => { + mockSlidingSync.emit( + SlidingSyncEvent.Lifecycle, + SlidingSyncState.Complete, + { pos: "i", lists: [], rooms: {}, extensions: {} }, + null, + ); + expect(sdk.getSyncState()).toEqual(SyncState.Syncing); + }); + + it("emits SyncState.Error immediately when receiving M_UNKNOWN_TOKEN and stops syncing", async () => { + expect(mockSlidingSync.stop).not.toBeCalled(); + mockSlidingSync.emit(SlidingSyncEvent.Lifecycle, SlidingSyncState.RequestFinished, null, new MatrixError({ + errcode: "M_UNKNOWN_TOKEN", + message: "Oh no your access token is no longer valid", + })); + expect(sdk.getSyncState()).toEqual(SyncState.Error); + expect(mockSlidingSync.stop).toBeCalled(); + }); + }); + + describe("opts", () => { + afterEach(teardownClient); + it("can resolveProfilesToInvites", async () => { + await setupClient({ + resolveInvitesToProfiles: true, + }); + const roomId = "!resolveProfilesToInvites:localhost"; + const invitee = "@invitee:localhost"; + const inviteeProfile = { + avatar_url: "mxc://foobar", + displayname: "The Invitee", + }; + httpBackend.when("GET", "/profile").respond(200, inviteeProfile); + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomId, { + initial: true, + name: "Room with Invite", + required_state: [], + timeline: [ + mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""), + mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId), + mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""), + mkOwnStateEvent(EventType.RoomMember, { membership: "invite" }, invitee), + ], + }); + await httpBackend.flush("/profile", 1, 1000); + const room = client.getRoom(roomId); + expect(room).toBeDefined(); + const inviteeMember = room.getMember(invitee); + expect(inviteeMember).toBeDefined(); + expect(inviteeMember.getMxcAvatarUrl()).toEqual(inviteeProfile.avatar_url); + expect(inviteeMember.name).toEqual(inviteeProfile.displayname); + }); + }); + + describe("ExtensionE2EE", () => { + let ext: Extension; + beforeAll(async () => { + await setupClient({ + withCrypto: true, + }); + const hasSynced = sdk.sync(); + await httpBackend.flushAllExpected(); + await hasSynced; + ext = findExtension("e2ee"); + }); + afterAll(async () => { + // needed else we do some async operations in the background which can cause Jest to whine: + // "Cannot log after tests are done. Did you forget to wait for something async in your test?" + // Attempted to log "Saving device tracking data null"." + client.crypto.stop(); + }); + it("gets enabled on the initial request only", () => { + expect(ext.onRequest(true)).toEqual({ + enabled: true, + }); + expect(ext.onRequest(false)).toEqual(undefined); + }); + it("can update device lists", () => { + ext.onResponse({ + device_lists: { + changed: ["@alice:localhost"], + left: ["@bob:localhost"], + }, + }); + // TODO: more assertions? + }); + it("can update OTK counts", () => { + client.crypto.updateOneTimeKeyCount = jest.fn(); + ext.onResponse({ + device_one_time_keys_count: { + signed_curve25519: 42, + }, + }); + expect(client.crypto.updateOneTimeKeyCount).toHaveBeenCalledWith(42); + ext.onResponse({ + device_one_time_keys_count: { + not_signed_curve25519: 42, + // missing field -> default to 0 + }, + }); + expect(client.crypto.updateOneTimeKeyCount).toHaveBeenCalledWith(0); + }); + it("can update fallback keys", () => { + ext.onResponse({ + device_unused_fallback_key_types: ["signed_curve25519"], + }); + expect(client.crypto.getNeedsNewFallback()).toEqual(false); + ext.onResponse({ + device_unused_fallback_key_types: ["not_signed_curve25519"], + }); + expect(client.crypto.getNeedsNewFallback()).toEqual(true); + }); + }); + describe("ExtensionAccountData", () => { + let ext: Extension; + beforeAll(async () => { + await setupClient(); + const hasSynced = sdk.sync(); + await httpBackend.flushAllExpected(); + await hasSynced; + ext = findExtension("account_data"); + }); + it("gets enabled on the initial request only", () => { + expect(ext.onRequest(true)).toEqual({ + enabled: true, + }); + expect(ext.onRequest(false)).toEqual(undefined); + }); + it("processes global account data", async () => { + const globalType = "global_test"; + const globalContent = { + info: "here", + }; + let globalData = client.getAccountData(globalType); + expect(globalData).toBeUndefined(); + ext.onResponse({ + global: [ + { + type: globalType, + content: globalContent, + }, + ], + }); + globalData = client.getAccountData(globalType); + expect(globalData).toBeDefined(); + expect(globalData.getContent()).toEqual(globalContent); + }); + it("processes rooms account data", async () => { + const roomId = "!room:id"; + mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomId, { + name: "Room with account data", + required_state: [], + timeline: [ + mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""), + mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId), + mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""), + mkOwnEvent(EventType.RoomMessage, { body: "hello" }), + + ], + initial: true, + }); + const roomContent = { + foo: "bar", + }; + const roomType = "test"; + ext.onResponse({ + rooms: { + [roomId]: [ + { + type: roomType, + content: roomContent, + }, + ], + }, + }); + const room = client.getRoom(roomId); + expect(room).toBeDefined(); + const event = room.getAccountData(roomType); + expect(event).toBeDefined(); + expect(event.getContent()).toEqual(roomContent); + }); + it("doesn't crash for unknown room account data", async () => { + const unknownRoomId = "!unknown:id"; + const roomType = "tester"; + ext.onResponse({ + rooms: { + [unknownRoomId]: [ + { + type: roomType, + content: { + foo: "Bar", + }, + }, + ], + }, + }); + const room = client.getRoom(unknownRoomId); + expect(room).toBeNull(); + expect(client.getAccountData(roomType)).toBeUndefined(); + }); + it("can update push rules via account data", async () => { + const roomId = "!foo:bar"; + const pushRulesContent: IPushRules = { + global: { + [PushRuleKind.RoomSpecific]: [{ + enabled: true, + default: true, + pattern: "monkey", + actions: [ + { + set_tweak: TweakName.Sound, + value: "default", + }, + ], + rule_id: roomId, + }], + }, + }; + let pushRule = client.getRoomPushRule("global", roomId); + expect(pushRule).toBeUndefined(); + ext.onResponse({ + global: [ + { + type: EventType.PushRules, + content: pushRulesContent, + }, + ], + }); + pushRule = client.getRoomPushRule("global", roomId); + expect(pushRule).toEqual(pushRulesContent.global[PushRuleKind.RoomSpecific][0]); + }); + }); + describe("ExtensionToDevice", () => { + let ext: Extension; + beforeAll(async () => { + await setupClient(); + const hasSynced = sdk.sync(); + await httpBackend.flushAllExpected(); + await hasSynced; + ext = findExtension("to_device"); + }); + it("gets enabled with a limit on the initial request only", () => { + const reqJson: any = ext.onRequest(true); + expect(reqJson.enabled).toEqual(true); + expect(reqJson.limit).toBeGreaterThan(0); + expect(reqJson.since).toBeUndefined(); + }); + it("updates the since value", async () => { + ext.onResponse({ + next_batch: "12345", + events: [], + }); + expect(ext.onRequest(false)).toEqual({ + since: "12345", + }); + }); + it("can handle missing fields", async () => { + ext.onResponse({ + next_batch: "23456", + // no events array + }); + }); + it("emits to-device events on the client", async () => { + const toDeviceType = "custom_test"; + const toDeviceContent = { + foo: "bar", + }; + let called = false; + client.once(ClientEvent.ToDeviceEvent, (ev) => { + expect(ev.getContent()).toEqual(toDeviceContent); + expect(ev.getType()).toEqual(toDeviceType); + called = true; + }); + ext.onResponse({ + next_batch: "34567", + events: [ + { + type: toDeviceType, + content: toDeviceContent, + }, + ], + }); + expect(called).toBe(true); + }); + it("can cancel key verification requests", async () => { + const seen: Record = {}; + client.on(ClientEvent.ToDeviceEvent, (ev) => { + const evType = ev.getType(); + expect(seen[evType]).toBeFalsy(); + seen[evType] = true; + if (evType === "m.key.verification.start" || evType === "m.key.verification.request") { + expect(ev.isCancelled()).toEqual(true); + } else { + expect(ev.isCancelled()).toEqual(false); + } + }); + ext.onResponse({ + next_batch: "45678", + events: [ + // someone tries to verify keys + { + type: "m.key.verification.start", + content: { + transaction_id: "a", + }, + }, + { + type: "m.key.verification.request", + content: { + transaction_id: "a", + }, + }, + // then gives up + { + type: "m.key.verification.cancel", + content: { + transaction_id: "a", + }, + }, + ], + }); + }); + }); +}); diff --git a/spec/integ/sliding-sync.spec.ts b/spec/integ/sliding-sync.spec.ts new file mode 100644 index 000000000..9cf6ff2e9 --- /dev/null +++ b/spec/integ/sliding-sync.spec.ts @@ -0,0 +1,758 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// eslint-disable-next-line no-restricted-imports +import EventEmitter from "events"; +import MockHttpBackend from "matrix-mock-request"; + +import { SlidingSync, SlidingSyncState, ExtensionState, SlidingSyncEvent } from "../../src/sliding-sync"; +import { TestClient } from "../TestClient"; +import { logger } from "../../src/logger"; +import { MatrixClient } from "../../src"; +import { sleep } from "../../src/utils"; + +/** + * Tests for sliding sync. These tests are broken down into sub-tests which are reliant upon one another. + * Each test suite (describe block) uses a single MatrixClient/HTTPBackend and a single SlidingSync class. + * Each test will call different functions on SlidingSync which may depend on state from previous tests. + */ +describe("SlidingSync", () => { + let client: MatrixClient = null; + let httpBackend: MockHttpBackend = null; + const selfUserId = "@alice:localhost"; + const selfAccessToken = "aseukfgwef"; + const proxyBaseUrl = "http://localhost:8008"; + const syncUrl = proxyBaseUrl + "/_matrix/client/unstable/org.matrix.msc3575/sync"; + + // assign client/httpBackend globals + const setupClient = () => { + const testClient = new TestClient(selfUserId, "DEVICE", selfAccessToken); + httpBackend = testClient.httpBackend; + client = testClient.client; + }; + + // tear down client/httpBackend globals + const teardownClient = () => { + httpBackend.verifyNoOutstandingExpectation(); + client.stopClient(); + return httpBackend.stop(); + }; + + describe("start/stop", () => { + beforeAll(setupClient); + afterAll(teardownClient); + let slidingSync: SlidingSync; + + it("should start the sync loop upon calling start()", async () => { + slidingSync = new SlidingSync(proxyBaseUrl, [], {}, client, 1); + const fakeResp = { + pos: "a", + lists: [], + rooms: {}, + extensions: {}, + }; + httpBackend.when("POST", syncUrl).respond(200, fakeResp); + const p = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state, resp, err) => { + expect(state).toEqual(SlidingSyncState.RequestFinished); + expect(resp).toEqual(fakeResp); + expect(err).toBeFalsy(); + return true; + }); + slidingSync.start(); + await httpBackend.flushAllExpected(); + await p; + }); + + it("should stop the sync loop upon calling stop()", () => { + slidingSync.stop(); + httpBackend.verifyNoOutstandingExpectation(); + }); + }); + + describe("room subscriptions", () => { + beforeAll(setupClient); + afterAll(teardownClient); + const roomId = "!foo:bar"; + const anotherRoomID = "!another:room"; + let roomSubInfo = { + timeline_limit: 1, + required_state: [ + ["m.room.name", ""], + ], + }; + const wantRoomData = { + name: "foo bar", + required_state: [], + timeline: [], + }; + + let slidingSync: SlidingSync; + + it("should be able to subscribe to a room", async () => { + // add the subscription + slidingSync = new SlidingSync(proxyBaseUrl, [], roomSubInfo, client, 1); + slidingSync.modifyRoomSubscriptions(new Set([roomId])); + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("room sub", body); + expect(body.room_subscriptions).toBeTruthy(); + expect(body.room_subscriptions[roomId]).toEqual(roomSubInfo); + }).respond(200, { + pos: "a", + lists: [], + extensions: {}, + rooms: { + [roomId]: wantRoomData, + }, + }); + + const p = listenUntil(slidingSync, "SlidingSync.RoomData", (gotRoomId, gotRoomData) => { + expect(gotRoomId).toEqual(roomId); + expect(gotRoomData).toEqual(wantRoomData); + return true; + }); + slidingSync.start(); + await httpBackend.flushAllExpected(); + await p; + }); + + it("should be possible to adjust room subscription info whilst syncing", async () => { + // listen for updated request + const newSubInfo = { + timeline_limit: 100, + required_state: [ + ["m.room.member", "*"], + ], + }; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("adjusted sub", body); + expect(body.room_subscriptions).toBeTruthy(); + expect(body.room_subscriptions[roomId]).toEqual(newSubInfo); + }).respond(200, { + pos: "a", + lists: [], + extensions: {}, + rooms: { + [roomId]: wantRoomData, + }, + }); + + const p = listenUntil(slidingSync, "SlidingSync.RoomData", (gotRoomId, gotRoomData) => { + expect(gotRoomId).toEqual(roomId); + expect(gotRoomData).toEqual(wantRoomData); + return true; + }); + + slidingSync.modifyRoomSubscriptionInfo(newSubInfo); + await httpBackend.flushAllExpected(); + await p; + // need to set what the new subscription info is for subsequent tests + roomSubInfo = newSubInfo; + }); + + it("should be possible to add room subscriptions whilst syncing", async () => { + // listen for updated request + const anotherRoomData = { + name: "foo bar 2", + room_id: anotherRoomID, + // we should not fall over if fields are missing. + // required_state: [], + // timeline: [], + }; + const anotherRoomDataFixed = { + name: anotherRoomData.name, + room_id: anotherRoomID, + required_state: [], + timeline: [], + }; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("new subs", body); + expect(body.room_subscriptions).toBeTruthy(); + // only the new room is sent, the other is sticky + expect(body.room_subscriptions[anotherRoomID]).toEqual(roomSubInfo); + expect(body.room_subscriptions[roomId]).toBeUndefined(); + }).respond(200, { + pos: "b", + lists: [], + extensions: {}, + rooms: { + [anotherRoomID]: anotherRoomData, + }, + }); + + const p = listenUntil(slidingSync, "SlidingSync.RoomData", (gotRoomId, gotRoomData) => { + expect(gotRoomId).toEqual(anotherRoomID); + expect(gotRoomData).toEqual(anotherRoomDataFixed); + return true; + }); + + const subs = slidingSync.getRoomSubscriptions(); + subs.add(anotherRoomID); + slidingSync.modifyRoomSubscriptions(subs); + await httpBackend.flushAllExpected(); + await p; + }); + + it("should be able to unsubscribe from a room", async () => { + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("unsub request", body); + expect(body.room_subscriptions).toBeFalsy(); + expect(body.unsubscribe_rooms).toEqual([roomId]); + }).respond(200, { + pos: "b", + lists: [], + }); + + const p = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => { + return state === SlidingSyncState.Complete; + }); + + // remove the subscription for the first room + slidingSync.modifyRoomSubscriptions(new Set([anotherRoomID])); + + await httpBackend.flushAllExpected(); + await p; + + slidingSync.stop(); + }); + }); + + describe("lists", () => { + beforeAll(setupClient); + afterAll(teardownClient); + + const roomA = "!a:localhost"; + const roomB = "!b:localhost"; + const roomC = "!c:localhost"; + const rooms = { + [roomA]: { + name: "A", + required_state: [], + timeline: [], + }, + [roomB]: { + name: "B", + required_state: [], + timeline: [], + }, + [roomC]: { + name: "C", + required_state: [], + timeline: [], + }, + }; + const newRanges = [[0, 2], [3, 5]]; + + let slidingSync: SlidingSync; + it("should be possible to subscribe to a list", async () => { + // request first 3 rooms + const listReq = { + ranges: [[0, 2]], + sort: ["by_name"], + timeline_limit: 1, + required_state: [ + ["m.room.topic", ""], + ], + filters: { + is_dm: true, + }, + }; + slidingSync = new SlidingSync(proxyBaseUrl, [listReq], {}, client, 1); + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("list", body); + expect(body.lists).toBeTruthy(); + expect(body.lists[0]).toEqual(listReq); + }).respond(200, { + pos: "a", + lists: [{ + count: 500, + ops: [{ + op: "SYNC", + range: [0, 2], + room_ids: Object.keys(rooms), + }], + }], + rooms: rooms, + }); + const listenerData = {}; + const dataListener = (roomId, roomData) => { + expect(listenerData[roomId]).toBeFalsy(); + listenerData[roomId] = roomData; + }; + slidingSync.on(SlidingSyncEvent.RoomData, dataListener); + const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => { + return state === SlidingSyncState.Complete; + }); + slidingSync.start(); + await httpBackend.flushAllExpected(); + await responseProcessed; + + expect(listenerData[roomA]).toEqual(rooms[roomA]); + expect(listenerData[roomB]).toEqual(rooms[roomB]); + expect(listenerData[roomC]).toEqual(rooms[roomC]); + + expect(slidingSync.listLength()).toEqual(1); + slidingSync.off(SlidingSyncEvent.RoomData, dataListener); + }); + + it("should be possible to retrieve list data", () => { + expect(slidingSync.getList(0)).toBeDefined(); + expect(slidingSync.getList(5)).toBeNull(); + expect(slidingSync.getListData(5)).toBeNull(); + const syncData = slidingSync.getListData(0); + expect(syncData.joinedCount).toEqual(500); // from previous test + expect(syncData.roomIndexToRoomId).toEqual({ + 0: roomA, + 1: roomB, + 2: roomC, + }); + }); + + it("should be possible to adjust list ranges", async () => { + // modify the list ranges + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("next ranges", body.lists[0].ranges); + expect(body.lists).toBeTruthy(); + expect(body.lists[0]).toEqual({ + // only the ranges should be sent as the rest are unchanged and sticky + ranges: newRanges, + }); + }).respond(200, { + pos: "b", + lists: [{ + count: 500, + ops: [{ + op: "SYNC", + range: [0, 2], + room_ids: Object.keys(rooms), + }], + }], + }); + + const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => { + return state === SlidingSyncState.RequestFinished; + }); + slidingSync.setListRanges(0, newRanges); + await httpBackend.flushAllExpected(); + await responseProcessed; + }); + + it("should be possible to add an extra list", async () => { + // add extra list + const extraListReq = { + ranges: [[0, 100]], + sort: ["by_name"], + filters: { + "is_dm": true, + }, + }; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("extra list", body); + expect(body.lists).toBeTruthy(); + expect(body.lists[0]).toEqual({ + // only the ranges should be sent as the rest are unchanged and sticky + ranges: newRanges, + }); + expect(body.lists[1]).toEqual(extraListReq); + }).respond(200, { + pos: "c", + lists: [ + { + count: 500, + }, + { + count: 50, + ops: [{ + op: "SYNC", + range: [0, 2], + room_ids: Object.keys(rooms), + }], + }, + ], + }); + listenUntil(slidingSync, "SlidingSync.List", (listIndex, joinedCount, roomIndexToRoomId) => { + expect(listIndex).toEqual(1); + expect(joinedCount).toEqual(50); + expect(roomIndexToRoomId).toEqual({ + 0: roomA, + 1: roomB, + 2: roomC, + }); + return true; + }); + const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => { + return state === SlidingSyncState.Complete; + }); + slidingSync.setList(1, extraListReq); + await httpBackend.flushAllExpected(); + await responseProcessed; + }); + + it("should be possible to get list DELETE/INSERTs", async () => { + // move C (2) to A (0) + httpBackend.when("POST", syncUrl).respond(200, { + pos: "e", + lists: [{ + count: 500, + ops: [{ + op: "DELETE", + index: 2, + }, { + op: "INSERT", + index: 0, + room_id: roomC, + }], + }, + { + count: 50, + }], + }); + let listPromise = listenUntil(slidingSync, "SlidingSync.List", + (listIndex, joinedCount, roomIndexToRoomId) => { + expect(listIndex).toEqual(0); + expect(joinedCount).toEqual(500); + expect(roomIndexToRoomId).toEqual({ + 0: roomC, + 1: roomA, + 2: roomB, + }); + return true; + }); + let responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => { + return state === SlidingSyncState.Complete; + }); + await httpBackend.flushAllExpected(); + await responseProcessed; + await listPromise; + + // move C (0) back to A (2) + httpBackend.when("POST", syncUrl).respond(200, { + pos: "f", + lists: [{ + count: 500, + ops: [{ + op: "DELETE", + index: 0, + }, { + op: "INSERT", + index: 2, + room_id: roomC, + }], + }, + { + count: 50, + }], + }); + listPromise = listenUntil(slidingSync, "SlidingSync.List", + (listIndex, joinedCount, roomIndexToRoomId) => { + expect(listIndex).toEqual(0); + expect(joinedCount).toEqual(500); + expect(roomIndexToRoomId).toEqual({ + 0: roomA, + 1: roomB, + 2: roomC, + }); + return true; + }); + responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => { + return state === SlidingSyncState.Complete; + }); + await httpBackend.flushAllExpected(); + await responseProcessed; + await listPromise; + }); + + it("should ignore invalid list indexes", async () => { + httpBackend.when("POST", syncUrl).respond(200, { + pos: "e", + lists: [{ + count: 500, + ops: [{ + op: "DELETE", + index: 2324324, + }], + }, + { + count: 50, + }], + }); + const listPromise = listenUntil(slidingSync, "SlidingSync.List", + (listIndex, joinedCount, roomIndexToRoomId) => { + expect(listIndex).toEqual(0); + expect(joinedCount).toEqual(500); + expect(roomIndexToRoomId).toEqual({ + 0: roomA, + 1: roomB, + 2: roomC, + }); + return true; + }); + const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => { + return state === SlidingSyncState.Complete; + }); + await httpBackend.flushAllExpected(); + await responseProcessed; + await listPromise; + }); + + it("should be possible to update a list", async () => { + httpBackend.when("POST", syncUrl).respond(200, { + pos: "g", + lists: [{ + count: 42, + ops: [ + { + op: "INVALIDATE", + range: [0, 2], + }, + { + op: "SYNC", + range: [0, 1], + room_ids: [roomB, roomC], + }, + ], + }, + { + count: 50, + }], + }); + // update the list with a new filter + slidingSync.setList(0, { + filters: { + is_encrypted: true, + }, + ranges: [[0, 100]], + }); + const listPromise = listenUntil(slidingSync, "SlidingSync.List", + (listIndex, joinedCount, roomIndexToRoomId) => { + expect(listIndex).toEqual(0); + expect(joinedCount).toEqual(42); + expect(roomIndexToRoomId).toEqual({ + 0: roomB, + 1: roomC, + }); + return true; + }); + const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => { + return state === SlidingSyncState.Complete; + }); + await httpBackend.flushAllExpected(); + await responseProcessed; + await listPromise; + slidingSync.stop(); + }); + }); + + describe("extensions", () => { + beforeAll(setupClient); + afterAll(teardownClient); + let slidingSync: SlidingSync; + const extReq = { + foo: "bar", + }; + const extResp = { + baz: "quuz", + }; + + // Pre-extensions get called BEFORE processing the sync response + const preExtName = "foobar"; + let onPreExtensionRequest; + let onPreExtensionResponse; + + // Post-extensions get called AFTER processing the sync response + const postExtName = "foobar2"; + let onPostExtensionRequest; + let onPostExtensionResponse; + + const extPre = { + name: () => preExtName, + onRequest: (initial) => { return onPreExtensionRequest(initial); }, + onResponse: (res) => { return onPreExtensionResponse(res); }, + when: () => ExtensionState.PreProcess, + }; + const extPost = { + name: () => postExtName, + onRequest: (initial) => { return onPostExtensionRequest(initial); }, + onResponse: (res) => { return onPostExtensionResponse(res); }, + when: () => ExtensionState.PostProcess, + }; + + it("should be able to register an extension", async () => { + slidingSync = new SlidingSync(proxyBaseUrl, [], {}, client, 1); + slidingSync.registerExtension(extPre); + + const callbackOrder = []; + let extensionOnResponseCalled = false; + onPreExtensionRequest = () => { + return extReq; + }; + onPreExtensionResponse = (resp) => { + extensionOnResponseCalled = true; + callbackOrder.push("onPreExtensionResponse"); + expect(resp).toEqual(extResp); + }; + + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("ext req", body); + expect(body.extensions).toBeTruthy(); + expect(body.extensions[preExtName]).toEqual(extReq); + }).respond(200, { + pos: "a", + ops: [], + counts: [], + extensions: { + [preExtName]: extResp, + }, + }); + + const p = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state, resp, err) => { + if (state === SlidingSyncState.Complete) { + callbackOrder.push("Lifecycle"); + return true; + } + }); + slidingSync.start(); + await httpBackend.flushAllExpected(); + await p; + expect(extensionOnResponseCalled).toBe(true); + expect(callbackOrder).toEqual(["onPreExtensionResponse", "Lifecycle"]); + }); + + it("should be able to send nothing in an extension request/response", async () => { + onPreExtensionRequest = () => { + return undefined; + }; + let responseCalled = false; + onPreExtensionResponse = (resp) => { + responseCalled = true; + }; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("ext req nothing", body); + expect(body.extensions).toBeTruthy(); + expect(body.extensions[preExtName]).toBeUndefined(); + }).respond(200, { + pos: "a", + ops: [], + counts: [], + extensions: {}, + }); + // we need to resend as sliding sync will already have a buffered request with the old + // extension values from the previous test. + slidingSync.resend(); + + const p = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state, resp, err) => { + return state === SlidingSyncState.Complete; + }); + await httpBackend.flushAllExpected(); + await p; + expect(responseCalled).toBe(false); + }); + + it("is possible to register extensions after start() has been called", async () => { + slidingSync.registerExtension(extPost); + onPostExtensionRequest = () => { + return extReq; + }; + let responseCalled = false; + const callbackOrder = []; + onPostExtensionResponse = (resp) => { + expect(resp).toEqual(extResp); + responseCalled = true; + callbackOrder.push("onPostExtensionResponse"); + }; + httpBackend.when("POST", syncUrl).check(function(req) { + const body = req.data; + logger.log("ext req after start", body); + expect(body.extensions).toBeTruthy(); + expect(body.extensions[preExtName]).toBeUndefined(); // from the earlier test + expect(body.extensions[postExtName]).toEqual(extReq); + }).respond(200, { + pos: "c", + ops: [], + counts: [], + extensions: { + [postExtName]: extResp, + }, + }); + // we need to resend as sliding sync will already have a buffered request with the old + // extension values from the previous test. + slidingSync.resend(); + + const p = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state, resp, err) => { + if (state === SlidingSyncState.Complete) { + callbackOrder.push("Lifecycle"); + return true; + } + }); + await httpBackend.flushAllExpected(); + await p; + expect(responseCalled).toBe(true); + expect(callbackOrder).toEqual(["Lifecycle", "onPostExtensionResponse"]); + slidingSync.stop(); + }); + + it("is not possible to register the same extension name twice", async () => { + slidingSync = new SlidingSync(proxyBaseUrl, [], {}, client, 1); + slidingSync.registerExtension(extPre); + expect(() => { slidingSync.registerExtension(extPre); }).toThrow(); + }); + }); +}); + +async function timeout(delayMs: number, reason: string): Promise { + await sleep(delayMs); + throw new Error(`timeout: ${delayMs}ms - ${reason}`); +} + +/** + * Listen until a callback returns data. + * @param {EventEmitter} emitter The event emitter + * @param {string} eventName The event to listen for + * @param {function} callback The callback which will be invoked when events fire. Return something truthy from this to resolve the promise. + * @param {number} timeoutMs The number of milliseconds to wait for the callback to return data. Default: 500ms. + * @returns {Promise} A promise which will be resolved when the callback returns data. If the callback throws or the timeout is reached, + * the promise is rejected. + */ +function listenUntil( + emitter: EventEmitter, + eventName: string, + callback: (...args: any[]) => T, + timeoutMs = 500, +): Promise { + const trace = new Error().stack.split(`\n`)[2]; + return Promise.race([new Promise((resolve, reject) => { + const wrapper = (...args) => { + try { + const data = callback(...args); + if (data) { + emitter.off(eventName, wrapper); + resolve(data); + } + } catch (err) { + reject(err); + } + }; + emitter.on(eventName, wrapper); + }), timeout(timeoutMs, "timed out waiting for event " + eventName + " " + trace)]); +} diff --git a/spec/unit/room.spec.ts b/spec/unit/room.spec.ts index 267c56109..4ed31380d 100644 --- a/spec/unit/room.spec.ts +++ b/spec/unit/room.spec.ts @@ -361,6 +361,71 @@ describe("Room", function() { expect(callCount).toEqual(2); }); + + it("should be able to update local echo without a txn ID (/send then /sync)", function() { + const eventJson = utils.mkMessage({ + room: roomId, user: userA, event: false, + }) as object; + delete eventJson["txn_id"]; + delete eventJson["event_id"]; + const localEvent = new MatrixEvent(Object.assign({ event_id: "$temp" }, eventJson)); + localEvent.status = EventStatus.SENDING; + expect(localEvent.getTxnId()).toBeNull(); + expect(room.timeline.length).toEqual(0); + + // first add the local echo. This is done before the /send request is even sent. + const txnId = "My_txn_id"; + room.addPendingEvent(localEvent, txnId); + expect(room.getEventForTxnId(txnId)).toEqual(localEvent); + expect(room.timeline.length).toEqual(1); + + // now the /send request returns the true event ID. + const realEventId = "$real-event-id"; + room.updatePendingEvent(localEvent, EventStatus.SENT, realEventId); + + // then /sync returns the remoteEvent, it should de-dupe based on the event ID. + const remoteEvent = new MatrixEvent(Object.assign({ event_id: realEventId }, eventJson)); + expect(remoteEvent.getTxnId()).toBeNull(); + room.addLiveEvents([remoteEvent]); + // the duplicate strategy code should ensure we don't add a 2nd event to the live timeline + expect(room.timeline.length).toEqual(1); + // but without the event ID matching we will still have the local event in pending events + expect(room.getEventForTxnId(txnId)).toBeUndefined(); + }); + + it("should be able to update local echo without a txn ID (/sync then /send)", function() { + const eventJson = utils.mkMessage({ + room: roomId, user: userA, event: false, + }) as object; + delete eventJson["txn_id"]; + delete eventJson["event_id"]; + const txnId = "My_txn_id"; + const localEvent = new MatrixEvent(Object.assign({ event_id: "$temp", txn_id: txnId }, eventJson)); + localEvent.status = EventStatus.SENDING; + expect(localEvent.getTxnId()).toEqual(txnId); + expect(room.timeline.length).toEqual(0); + + // first add the local echo. This is done before the /send request is even sent. + room.addPendingEvent(localEvent, txnId); + expect(room.getEventForTxnId(txnId)).toEqual(localEvent); + expect(room.timeline.length).toEqual(1); + + // now the /sync returns the remoteEvent, it is impossible for the JS SDK to de-dupe this. + const realEventId = "$real-event-id"; + const remoteEvent = new MatrixEvent(Object.assign({ event_id: realEventId }, eventJson)); + expect(remoteEvent.getUnsigned().transaction_id).toBeUndefined(); + room.addLiveEvents([remoteEvent]); + expect(room.timeline.length).toEqual(2); // impossible to de-dupe as no txn ID or matching event ID + + // then the /send request returns the real event ID. + // Now it is possible for the JS SDK to de-dupe this. + room.updatePendingEvent(localEvent, EventStatus.SENT, realEventId); + + // the 2nd event should be removed from the timeline. + expect(room.timeline.length).toEqual(1); + // but without the event ID matching we will still have the local event in pending events + expect(room.getEventForTxnId(txnId)).toBeUndefined(); + }); }); describe('addEphemeralEvents', () => { diff --git a/src/client.ts b/src/client.ts index f3f149ec6..1b89e9c2b 100644 --- a/src/client.ts +++ b/src/client.ts @@ -190,6 +190,8 @@ import { MediaHandler } from "./webrtc/mediaHandler"; import { IRefreshTokenResponse } from "./@types/auth"; import { TypedEventEmitter } from "./models/typed-event-emitter"; import { ReceiptType } from "./@types/read_receipts"; +import { MSC3575SlidingSyncRequest, MSC3575SlidingSyncResponse, SlidingSync } from "./sliding-sync"; +import { SlidingSyncSdk } from "./sliding-sync-sdk"; import { Thread, THREAD_RELATION_TYPE } from "./models/thread"; import { MBeaconInfoEventContent, M_BEACON_INFO } from "./@types/beacon"; @@ -411,6 +413,11 @@ export interface IStartClientOpts { * @experimental */ experimentalThreadSupport?: boolean; + + /** + * @experimental + */ + slidingSync?: SlidingSync; } export interface IStoredClientOpts extends IStartClientOpts { @@ -903,7 +910,7 @@ export class MatrixClient extends TypedEventEmitter; protected syncedLeftRooms = false; @@ -1176,7 +1183,11 @@ export class MatrixClient extends TypedEventEmitter { + const qps: Record = {}; + if (req.pos) { + qps.pos = req.pos; + delete req.pos; + } + if (req.timeout) { + qps.timeout = req.timeout; + delete req.timeout; + } + const clientTimeout = req.clientTimeout; + delete req.clientTimeout; + return this.http.authedRequest( + undefined, + Method.Post, + "/sync", + qps, + req, + { + prefix: "/_matrix/client/unstable/org.matrix.msc3575", + baseUrl: proxyBaseUrl, + localTimeoutMs: clientTimeout, + }, + ); + } + /** * @experimental */ diff --git a/src/http-api.ts b/src/http-api.ts index 23c692b1f..3e601c795 100644 --- a/src/http-api.ts +++ b/src/http-api.ts @@ -111,6 +111,7 @@ interface IRequest extends _Request { interface IRequestOpts { prefix?: string; + baseUrl?: string; localTimeoutMs?: number; headers?: Record; json?: boolean; // defaults to true @@ -576,6 +577,9 @@ export class MatrixHttpApi { * @param {string=} opts.prefix The full prefix to use e.g. * "/_matrix/client/v2_alpha". If not specified, uses this.opts.prefix. * + * @param {string=} opts.baseUrl The alternative base url to use. + * If not specified, uses this.opts.baseUrl + * * @param {Object=} opts.headers map of additional request headers * * @return {Promise} Resolves to {data: {Object}, @@ -671,7 +675,8 @@ export class MatrixHttpApi { opts?: O, ): IAbortablePromise> { const prefix = opts?.prefix ?? this.opts.prefix; - const fullUri = this.opts.baseUrl + prefix + path; + const baseUrl = opts?.baseUrl ?? this.opts.baseUrl; + const fullUri = baseUrl + prefix + path; return this.requestOtherUrl(callback, method, fullUri, queryParams, data, opts); } diff --git a/src/models/room-state.ts b/src/models/room-state.ts index 02632b1ae..c7d3ac325 100644 --- a/src/models/room-state.ts +++ b/src/models/room-state.ts @@ -370,7 +370,6 @@ export class RoomState extends TypedEventEmitter }); this.onBeaconLivenessChange(); - // update higher level data structures. This needs to be done AFTER the // core event dict as these structures may depend on other state events in // the given array (e.g. disambiguating display names in one go to do both @@ -401,7 +400,6 @@ export class RoomState extends TypedEventEmitter const member = this.getOrCreateMember(userId, event); member.setMembershipEvent(event, this); - this.updateMember(member); this.emit(RoomStateEvent.Members, event, this, member); } else if (event.getType() === EventType.RoomPowerLevels) { diff --git a/src/models/room.ts b/src/models/room.ts index cc0e43452..eaaa8f9ef 100644 --- a/src/models/room.ts +++ b/src/models/room.ts @@ -1917,6 +1917,27 @@ export class Room extends TypedEventEmitter // If any pending visibility change is waiting for this (older) event, this.applyPendingVisibilityEvents(event); + // Sliding Sync modifications: + // The proxy cannot guarantee every sent event will have a transaction_id field, so we need + // to check the event ID against the list of pending events if there is no transaction ID + // field. Only do this for events sent by us though as it's potentially expensive to loop + // the pending events map. + const txnId = event.getUnsigned().transaction_id; + if (!txnId && event.getSender() === this.myUserId) { + // check the txn map for a matching event ID + for (const tid in this.txnToEvent) { + const localEvent = this.txnToEvent[tid]; + if (localEvent.getId() === event.getId()) { + logger.debug("processLiveEvent: found sent event without txn ID: ", tid, event.getId()); + // update the unsigned field so we can re-use the same codepaths + const unsigned = event.getUnsigned(); + unsigned.transaction_id = tid; + event.setUnsigned(unsigned); + break; + } + } + } + if (event.getUnsigned().transaction_id) { const existingEvent = this.txnToEvent[event.getUnsigned().transaction_id]; if (existingEvent) { @@ -2173,7 +2194,22 @@ export class Room extends TypedEventEmitter const timeline = this.getTimelineForEvent(newEventId); if (timeline) { // we've already received the event via the event stream. - // nothing more to do here. + // nothing more to do here, assuming the transaction ID was correctly matched. + // Let's check that. + const remoteEvent = this.findEventById(newEventId); + const remoteTxnId = remoteEvent.getUnsigned().transaction_id; + if (!remoteTxnId) { + // This code path is mostly relevant for the Sliding Sync proxy. + // The remote event did not contain a transaction ID, so we did not handle + // the remote echo yet. Handle it now. + const unsigned = remoteEvent.getUnsigned(); + unsigned.transaction_id = event.getTxnId(); + remoteEvent.setUnsigned(unsigned); + // the remote event is _already_ in the timeline, so we need to remove it so + // we can convert the local event into the final event. + this.removeEvent(remoteEvent.getId()); + this.handleRemoteEcho(remoteEvent, event); + } return; } } diff --git a/src/sliding-sync-sdk.ts b/src/sliding-sync-sdk.ts new file mode 100644 index 000000000..39fafec90 --- /dev/null +++ b/src/sliding-sync-sdk.ts @@ -0,0 +1,835 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { NotificationCountType, Room, RoomEvent } from "./models/room"; +import { logger } from './logger'; +import * as utils from "./utils"; +import { EventTimeline } from "./models/event-timeline"; +import { ClientEvent, IStoredClientOpts, MatrixClient, PendingEventOrdering } from "./client"; +import { ISyncStateData, SyncState } from "./sync"; +import { MatrixEvent } from "./models/event"; +import { Crypto } from "./crypto"; +import { IMinimalEvent, IRoomEvent, IStateEvent, IStrippedState } from "./sync-accumulator"; +import { MatrixError } from "./http-api"; +import { RoomStateEvent } from "./models/room-state"; +import { RoomMemberEvent } from "./models/room-member"; +import { + Extension, + ExtensionState, + MSC3575RoomData, + MSC3575SlidingSyncResponse, + SlidingSync, + SlidingSyncEvent, + SlidingSyncState, +} from "./sliding-sync"; +import { EventType, IPushRules } from "./matrix"; +import { PushProcessor } from "./pushprocessor"; + +// Number of consecutive failed syncs that will lead to a syncState of ERROR as opposed +// to RECONNECTING. This is needed to inform the client of server issues when the +// keepAlive is successful but the server /sync fails. +const FAILED_SYNC_ERROR_THRESHOLD = 3; + +class ExtensionE2EE implements Extension { + constructor(private readonly crypto: Crypto) {} + + public name(): string { + return "e2ee"; + } + + public when(): ExtensionState { + return ExtensionState.PreProcess; + } + + public onRequest(isInitial: boolean): object { + if (!isInitial) { + return undefined; + } + return { + enabled: true, // this is sticky so only send it on the initial request + }; + } + + public async onResponse(data: object): Promise { + // Handle device list updates + if (data["device_lists"]) { + await this.crypto.handleDeviceListChanges({ + oldSyncToken: "yep", // XXX need to do this so the device list changes get processed :( + }, data["device_lists"]); + } + + // Handle one_time_keys_count + if (data["device_one_time_keys_count"]) { + const currentCount = data["device_one_time_keys_count"].signed_curve25519 || 0; + this.crypto.updateOneTimeKeyCount(currentCount); + } + if (data["device_unused_fallback_key_types"] || + data["org.matrix.msc2732.device_unused_fallback_key_types"]) { + // The presence of device_unused_fallback_key_types indicates that the + // server supports fallback keys. If there's no unused + // signed_curve25519 fallback key we need a new one. + const unusedFallbackKeys = data["device_unused_fallback_key_types"] || + data["org.matrix.msc2732.device_unused_fallback_key_types"]; + this.crypto.setNeedsNewFallback( + unusedFallbackKeys instanceof Array && + !unusedFallbackKeys.includes("signed_curve25519"), + ); + } + } +} + +class ExtensionToDevice implements Extension { + private nextBatch?: string = null; + + constructor(private readonly client: MatrixClient) {} + + public name(): string { + return "to_device"; + } + + public when(): ExtensionState { + return ExtensionState.PreProcess; + } + + public onRequest(isInitial: boolean): object { + const extReq = { + since: this.nextBatch !== null ? this.nextBatch : undefined, + }; + if (isInitial) { + extReq["limit"] = 100; + extReq["enabled"] = true; + } + return extReq; + } + + public async onResponse(data: object): Promise { + const cancelledKeyVerificationTxns = []; + data["events"] = data["events"] || []; + data["events"] + .map(this.client.getEventMapper()) + .map((toDeviceEvent) => { // map is a cheap inline forEach + // We want to flag m.key.verification.start events as cancelled + // if there's an accompanying m.key.verification.cancel event, so + // we pull out the transaction IDs from the cancellation events + // so we can flag the verification events as cancelled in the loop + // below. + if (toDeviceEvent.getType() === "m.key.verification.cancel") { + const txnId = toDeviceEvent.getContent()['transaction_id']; + if (txnId) { + cancelledKeyVerificationTxns.push(txnId); + } + } + + // as mentioned above, .map is a cheap inline forEach, so return + // the unmodified event. + return toDeviceEvent; + }) + .forEach( + (toDeviceEvent) => { + const content = toDeviceEvent.getContent(); + if ( + toDeviceEvent.getType() == "m.room.message" && + content.msgtype == "m.bad.encrypted" + ) { + // the mapper already logged a warning. + logger.log( + 'Ignoring undecryptable to-device event from ' + + toDeviceEvent.getSender(), + ); + return; + } + + if (toDeviceEvent.getType() === "m.key.verification.start" + || toDeviceEvent.getType() === "m.key.verification.request") { + const txnId = content['transaction_id']; + if (cancelledKeyVerificationTxns.includes(txnId)) { + toDeviceEvent.flagCancelled(); + } + } + + this.client.emit(ClientEvent.ToDeviceEvent, toDeviceEvent); + }, + ); + + this.nextBatch = data["next_batch"]; + } +} + +class ExtensionAccountData implements Extension { + constructor(private readonly client: MatrixClient) {} + + public name(): string { + return "account_data"; + } + + public when(): ExtensionState { + return ExtensionState.PostProcess; + } + + public onRequest(isInitial: boolean): object { + if (!isInitial) { + return undefined; + } + return { + enabled: true, + }; + } + + public onResponse(data: {global: object[], rooms: Record}): void { + if (data.global && data.global.length > 0) { + this.processGlobalAccountData(data.global); + } + + for (const roomId in data.rooms) { + const accountDataEvents = mapEvents(this.client, roomId, data.rooms[roomId]); + const room = this.client.getRoom(roomId); + if (!room) { + logger.warn("got account data for room but room doesn't exist on client:", roomId); + continue; + } + room.addAccountData(accountDataEvents); + accountDataEvents.forEach((e) => { + this.client.emit(ClientEvent.Event, e); + }); + } + } + + private processGlobalAccountData(globalAccountData: object[]): void { + const events = mapEvents(this.client, undefined, globalAccountData); + const prevEventsMap = events.reduce((m, c) => { + m[c.getId()] = this.client.store.getAccountData(c.getType()); + return m; + }, {}); + this.client.store.storeAccountDataEvents(events); + events.forEach( + (accountDataEvent) => { + // Honour push rules that come down the sync stream but also + // honour push rules that were previously cached. Base rules + // will be updated when we receive push rules via getPushRules + // (see sync) before syncing over the network. + if (accountDataEvent.getType() === EventType.PushRules) { + const rules = accountDataEvent.getContent(); + this.client.pushRules = PushProcessor.rewriteDefaultRules(rules); + } + const prevEvent = prevEventsMap[accountDataEvent.getId()]; + this.client.emit(ClientEvent.AccountData, accountDataEvent, prevEvent); + return accountDataEvent; + }, + ); + } +} + +/** + * A copy of SyncApi such that it can be used as a drop-in replacement for sync v2. For the actual + * sliding sync API, see sliding-sync.ts or the class SlidingSync. + */ +export class SlidingSyncSdk { + private syncState: SyncState = null; + private syncStateData: ISyncStateData; + private lastPos: string = null; + private failCount = 0; + private notifEvents: MatrixEvent[] = []; // accumulator of sync events in the current sync response + + constructor( + private readonly slidingSync: SlidingSync, + private readonly client: MatrixClient, + private readonly opts: Partial = {}, + ) { + this.opts.initialSyncLimit = this.opts.initialSyncLimit ?? 8; + this.opts.resolveInvitesToProfiles = this.opts.resolveInvitesToProfiles || false; + this.opts.pollTimeout = this.opts.pollTimeout || (30 * 1000); + this.opts.pendingEventOrdering = this.opts.pendingEventOrdering || PendingEventOrdering.Chronological; + this.opts.experimentalThreadSupport = this.opts.experimentalThreadSupport === true; + + if (!opts.canResetEntireTimeline) { + opts.canResetEntireTimeline = (_roomId: string) => { + return false; + }; + } + + if (client.getNotifTimelineSet()) { + client.reEmitter.reEmit(client.getNotifTimelineSet(), [ + RoomEvent.Timeline, + RoomEvent.TimelineReset, + ]); + } + + this.slidingSync.on(SlidingSyncEvent.Lifecycle, this.onLifecycle.bind(this)); + this.slidingSync.on(SlidingSyncEvent.RoomData, this.onRoomData.bind(this)); + const extensions: Extension[] = [ + new ExtensionToDevice(this.client), + new ExtensionAccountData(this.client), + ]; + if (this.opts.crypto) { + extensions.push( + new ExtensionE2EE(this.opts.crypto), + ); + } + extensions.forEach((ext) => { + this.slidingSync.registerExtension(ext); + }); + } + + private onRoomData(roomId: string, roomData: MSC3575RoomData): void { + let room = this.client.store.getRoom(roomId); + if (!room) { + if (!roomData.initial) { + logger.debug("initial flag not set but no stored room exists for room ", roomId, roomData); + return; + } + room = createRoom(this.client, roomId, this.opts); + } + this.processRoomData(this.client, room, roomData); + } + + private onLifecycle(state: SlidingSyncState, resp: MSC3575SlidingSyncResponse, err?: Error): void { + if (err) { + logger.debug("onLifecycle", state, err); + } + switch (state) { + case SlidingSyncState.Complete: + this.purgeNotifications(); + // Element won't stop showing the initial loading spinner unless we fire SyncState.Prepared + if (!this.lastPos) { + this.updateSyncState(SyncState.Prepared, { + oldSyncToken: this.lastPos, + nextSyncToken: resp.pos, + catchingUp: false, + fromCache: false, + }); + } + // Conversely, Element won't show the room list unless there is at least 1x SyncState.Syncing + // so hence for the very first sync we will fire prepared then immediately syncing. + this.updateSyncState(SyncState.Syncing, { + oldSyncToken: this.lastPos, + nextSyncToken: resp.pos, + catchingUp: false, + fromCache: false, + }); + this.lastPos = resp.pos; + break; + case SlidingSyncState.RequestFinished: + if (err) { + this.failCount += 1; + this.updateSyncState( + this.failCount > FAILED_SYNC_ERROR_THRESHOLD ? SyncState.Error : SyncState.Reconnecting, + { + error: new MatrixError(err), + }, + ); + if (this.shouldAbortSync(new MatrixError(err))) { + return; // shouldAbortSync actually stops syncing too so we don't need to do anything. + } + } else { + this.failCount = 0; + } + break; + } + } + + /** + * Sync rooms the user has left. + * @return {Promise} Resolved when they've been added to the store. + */ + public async syncLeftRooms() { + return []; // TODO + } + + /** + * Peek into a room. This will result in the room in question being synced so it + * is accessible via getRooms(). Live updates for the room will be provided. + * @param {string} roomId The room ID to peek into. + * @return {Promise} A promise which resolves once the room has been added to the + * store. + */ + public async peek(_roomId: string): Promise { + return null; // TODO + } + + /** + * Stop polling for updates in the peeked room. NOPs if there is no room being + * peeked. + */ + public stopPeeking(): void { + // TODO + } + + /** + * Returns the current state of this sync object + * @see module:client~MatrixClient#event:"sync" + * @return {?String} + */ + public getSyncState(): SyncState { + return this.syncState; + } + + /** + * Returns the additional data object associated with + * the current sync state, or null if there is no + * such data. + * Sync errors, if available, are put in the 'error' key of + * this object. + * @return {?Object} + */ + public getSyncStateData(): ISyncStateData { + return this.syncStateData; + } + + private shouldAbortSync(error: MatrixError): boolean { + if (error.errcode === "M_UNKNOWN_TOKEN") { + // The logout already happened, we just need to stop. + logger.warn("Token no longer valid - assuming logout"); + this.stop(); + this.updateSyncState(SyncState.Error, { error }); + return true; + } + return false; + } + + private async processRoomData(client: MatrixClient, room: Room, roomData: MSC3575RoomData) { + roomData = ensureNameEvent(client, room.roomId, roomData); + const stateEvents = mapEvents(this.client, room.roomId, roomData.required_state); + // Prevent events from being decrypted ahead of time + // this helps large account to speed up faster + // room::decryptCriticalEvent is in charge of decrypting all the events + // required for a client to function properly + const timelineEvents = mapEvents(this.client, room.roomId, roomData.timeline, false); + const ephemeralEvents = []; // TODO this.mapSyncEventsFormat(joinObj.ephemeral); + + const encrypted = this.client.isRoomEncrypted(room.roomId); + // we do this first so it's correct when any of the events fire + if (roomData.notification_count != null) { + room.setUnreadNotificationCount( + NotificationCountType.Total, + roomData.notification_count, + ); + } + + if (roomData.highlight_count != null) { + // We track unread notifications ourselves in encrypted rooms, so don't + // bother setting it here. We trust our calculations better than the + // server's for this case, and therefore will assume that our non-zero + // count is accurate. + if (!encrypted + || (encrypted && room.getUnreadNotificationCount(NotificationCountType.Highlight) <= 0)) { + room.setUnreadNotificationCount( + NotificationCountType.Highlight, + roomData.highlight_count, + ); + } + } + + if (roomData.invite_state) { + const inviteStateEvents = mapEvents(this.client, room.roomId, roomData.invite_state); + this.processRoomEvents(room, inviteStateEvents); + if (roomData.initial) { + room.recalculate(); + this.client.store.storeRoom(room); + this.client.emit(ClientEvent.Room, room); + } + inviteStateEvents.forEach((e) => { + this.client.emit(ClientEvent.Event, e); + }); + room.updateMyMembership("invite"); + return; + } + + if (roomData.initial) { + // set the back-pagination token. Do this *before* adding any + // events so that clients can start back-paginating. + room.getLiveTimeline().setPaginationToken( + roomData.prev_batch, EventTimeline.BACKWARDS); + } + + /* TODO + else if (roomData.limited) { + + let limited = true; + + // we've got a limited sync, so we *probably* have a gap in the + // timeline, so should reset. But we might have been peeking or + // paginating and already have some of the events, in which + // case we just want to append any subsequent events to the end + // of the existing timeline. + // + // This is particularly important in the case that we already have + // *all* of the events in the timeline - in that case, if we reset + // the timeline, we'll end up with an entirely empty timeline, + // which we'll try to paginate but not get any new events (which + // will stop us linking the empty timeline into the chain). + // + for (let i = timelineEvents.length - 1; i >= 0; i--) { + const eventId = timelineEvents[i].getId(); + if (room.getTimelineForEvent(eventId)) { + logger.debug("Already have event " + eventId + " in limited " + + "sync - not resetting"); + limited = false; + + // we might still be missing some of the events before i; + // we don't want to be adding them to the end of the + // timeline because that would put them out of order. + timelineEvents.splice(0, i); + + // XXX: there's a problem here if the skipped part of the + // timeline modifies the state set in stateEvents, because + // we'll end up using the state from stateEvents rather + // than the later state from timelineEvents. We probably + // need to wind stateEvents forward over the events we're + // skipping. + break; + } + } + + if (limited) { + deregisterStateListeners(room); + room.resetLiveTimeline( + roomData.prev_batch, + null, // TODO this.opts.canResetEntireTimeline(room.roomId) ? null : syncEventData.oldSyncToken, + ); + + // We have to assume any gap in any timeline is + // reason to stop incrementally tracking notifications and + // reset the timeline. + this.client.resetNotifTimelineSet(); + registerStateListeners(this.client, room); + } + } */ + + this.processRoomEvents(room, stateEvents, timelineEvents, false); + + // we deliberately don't add ephemeral events to the timeline + room.addEphemeralEvents(ephemeralEvents); + + room.recalculate(); + if (roomData.initial) { + client.store.storeRoom(room); + client.emit(ClientEvent.Room, room); + } + + // check if any timeline events should bing and add them to the notifEvents array: + // we'll purge this once we've fully processed the sync response + this.addNotifications(timelineEvents); + + const processRoomEvent = async (e: MatrixEvent) => { + client.emit(ClientEvent.Event, e); + if (e.isState() && e.getType() == EventType.RoomEncryption && this.opts.crypto) { + await this.opts.crypto.onCryptoEvent(e); + } + }; + + await utils.promiseMapSeries(stateEvents, processRoomEvent); + await utils.promiseMapSeries(timelineEvents, processRoomEvent); + ephemeralEvents.forEach(function(e) { + client.emit(ClientEvent.Event, e); + }); + + room.updateMyMembership("join"); + + // Decrypt only the last message in all rooms to make sure we can generate a preview + // And decrypt all events after the recorded read receipt to ensure an accurate + // notification count + room.decryptCriticalEvents(); + } + + /** + * @param {Room} room + * @param {MatrixEvent[]} stateEventList A list of state events. This is the state + * at the *START* of the timeline list if it is supplied. + * @param {MatrixEvent[]} [timelineEventList] A list of timeline events. Lower index + * @param {boolean} fromCache whether the sync response came from cache + * is earlier in time. Higher index is later. + */ + private processRoomEvents( + room: Room, + stateEventList: MatrixEvent[], + timelineEventList?: MatrixEvent[], + fromCache = false, + ): void { + timelineEventList = timelineEventList || []; + stateEventList = stateEventList || []; + + // If there are no events in the timeline yet, initialise it with + // the given state events + const liveTimeline = room.getLiveTimeline(); + const timelineWasEmpty = liveTimeline.getEvents().length == 0; + if (timelineWasEmpty) { + // Passing these events into initialiseState will freeze them, so we need + // to compute and cache the push actions for them now, otherwise sync dies + // with an attempt to assign to read only property. + // XXX: This is pretty horrible and is assuming all sorts of behaviour from + // these functions that it shouldn't be. We should probably either store the + // push actions cache elsewhere so we can freeze MatrixEvents, or otherwise + // find some solution where MatrixEvents are immutable but allow for a cache + // field. + for (const ev of stateEventList) { + this.client.getPushActionsForEvent(ev); + } + liveTimeline.initialiseState(stateEventList); + } + + // If the timeline wasn't empty, we process the state events here: they're + // defined as updates to the state before the start of the timeline, so this + // starts to roll the state forward. + // XXX: That's what we *should* do, but this can happen if we were previously + // peeking in a room, in which case we obviously do *not* want to add the + // state events here onto the end of the timeline. Historically, the js-sdk + // has just set these new state events on the old and new state. This seems + // very wrong because there could be events in the timeline that diverge the + // state, in which case this is going to leave things out of sync. However, + // for now I think it;s best to behave the same as the code has done previously. + if (!timelineWasEmpty) { + // XXX: As above, don't do this... + //room.addLiveEvents(stateEventList || []); + // Do this instead... + room.oldState.setStateEvents(stateEventList); + room.currentState.setStateEvents(stateEventList); + } + + // execute the timeline events. This will continue to diverge the current state + // if the timeline has any state events in it. + // This also needs to be done before running push rules on the events as they need + // to be decorated with sender etc. + room.addLiveEvents(timelineEventList, { + fromCache: fromCache, + }); + + room.recalculate(); + + // resolve invites now we have set the latest state + this.resolveInvites(room); + } + + private resolveInvites(room: Room): void { + if (!room || !this.opts.resolveInvitesToProfiles) { + return; + } + const client = this.client; + // For each invited room member we want to give them a displayname/avatar url + // if they have one (the m.room.member invites don't contain this). + room.getMembersWithMembership("invite").forEach(function(member) { + if (member._requestedProfileInfo) return; + member._requestedProfileInfo = true; + // try to get a cached copy first. + const user = client.getUser(member.userId); + let promise; + if (user) { + promise = Promise.resolve({ + avatar_url: user.avatarUrl, + displayname: user.displayName, + }); + } else { + promise = client.getProfileInfo(member.userId); + } + promise.then(function(info) { + // slightly naughty by doctoring the invite event but this means all + // the code paths remain the same between invite/join display name stuff + // which is a worthy trade-off for some minor pollution. + const inviteEvent = member.events.member; + if (inviteEvent.getContent().membership !== "invite") { + // between resolving and now they have since joined, so don't clobber + return; + } + inviteEvent.getContent().avatar_url = info.avatar_url; + inviteEvent.getContent().displayname = info.displayname; + // fire listeners + member.setMembershipEvent(inviteEvent, room.currentState); + }, function(_err) { + // OH WELL. + }); + }); + } + + public retryImmediately(): boolean { + return true; + } + + /** + * Main entry point. Blocks until stop() is called. + */ + public async sync() { + logger.debug("Sliding sync init loop"); + + // 1) We need to get push rules so we can check if events should bing as we get + // them from /sync. + while (!this.client.isGuest()) { + try { + logger.debug("Getting push rules..."); + const result = await this.client.getPushRules(); + logger.debug("Got push rules"); + this.client.pushRules = result; + break; + } catch (err) { + logger.error("Getting push rules failed", err); + if (this.shouldAbortSync(err)) { + return; + } + } + } + + // start syncing + await this.slidingSync.start(); + } + + /** + * Stops the sync object from syncing. + */ + public stop(): void { + logger.debug("SyncApi.stop"); + this.slidingSync.stop(); + } + + /** + * Sets the sync state and emits an event to say so + * @param {String} newState The new state string + * @param {Object} data Object of additional data to emit in the event + */ + private updateSyncState(newState: SyncState, data?: ISyncStateData): void { + const old = this.syncState; + this.syncState = newState; + this.syncStateData = data; + this.client.emit(ClientEvent.Sync, this.syncState, old, data); + } + + /** + * Takes a list of timelineEvents and adds and adds to notifEvents + * as appropriate. + * This must be called after the room the events belong to has been stored. + * + * @param {MatrixEvent[]} [timelineEventList] A list of timeline events. Lower index + * is earlier in time. Higher index is later. + */ + private addNotifications(timelineEventList: MatrixEvent[]): void { + // gather our notifications into this.notifEvents + if (!this.client.getNotifTimelineSet()) { + return; + } + for (const timelineEvent of timelineEventList) { + const pushActions = this.client.getPushActionsForEvent(timelineEvent); + if (pushActions && pushActions.notify && + pushActions.tweaks && pushActions.tweaks.highlight) { + this.notifEvents.push(timelineEvent); + } + } + } + + /** + * Purge any events in the notifEvents array. Used after a /sync has been complete. + * This should not be called at a per-room scope (e.g in onRoomData) because otherwise the ordering + * will be messed up e.g room A gets a bing, room B gets a newer bing, but both in the same /sync + * response. If we purge at a per-room scope then we could process room B before room A leading to + * room B appearing earlier in the notifications timeline, even though it has the higher origin_server_ts. + */ + private purgeNotifications(): void { + this.notifEvents.sort(function(a, b) { + return a.getTs() - b.getTs(); + }); + this.notifEvents.forEach((event) => { + this.client.getNotifTimelineSet().addLiveEvent(event); + }); + this.notifEvents = []; + } +} + +function ensureNameEvent(client: MatrixClient, roomId: string, roomData: MSC3575RoomData): MSC3575RoomData { + // make sure m.room.name is in required_state if there is a name, replacing anything previously + // there if need be. This ensures clients transparently 'calculate' the right room name. Native + // sliding sync clients should just read the "name" field. + if (!roomData.name) { + return roomData; + } + for (const stateEvent of roomData.required_state) { + if (stateEvent.type === EventType.RoomName && stateEvent.state_key === "") { + stateEvent.content = { + name: roomData.name, + }; + return roomData; + } + } + roomData.required_state.push({ + event_id: "$fake-sliding-sync-name-event-" + roomId, + state_key: "", + type: EventType.RoomName, + content: { + name: roomData.name, + }, + sender: client.getUserId(), + origin_server_ts: new Date().getTime(), + }); + return roomData; +} + +// Helper functions which set up JS SDK structs are below and are identical to the sync v2 counterparts, +// just outside the class. + +function createRoom(client: MatrixClient, roomId: string, opts: Partial): Room { // XXX cargoculted from sync.ts + const { timelineSupport } = client; + const room = new Room(roomId, client, client.getUserId(), { + lazyLoadMembers: opts.lazyLoadMembers, + pendingEventOrdering: opts.pendingEventOrdering, + timelineSupport, + }); + client.reEmitter.reEmit(room, [ + RoomEvent.Name, + RoomEvent.Redaction, + RoomEvent.RedactionCancelled, + RoomEvent.Receipt, + RoomEvent.Tags, + RoomEvent.LocalEchoUpdated, + RoomEvent.AccountData, + RoomEvent.MyMembership, + RoomEvent.Timeline, + RoomEvent.TimelineReset, + ]); + registerStateListeners(client, room); + return room; +} + +function registerStateListeners(client: MatrixClient, room: Room): void { // XXX cargoculted from sync.ts + // we need to also re-emit room state and room member events, so hook it up + // to the client now. We need to add a listener for RoomState.members in + // order to hook them correctly. + client.reEmitter.reEmit(room.currentState, [ + RoomStateEvent.Events, + RoomStateEvent.Members, + RoomStateEvent.NewMember, + RoomStateEvent.Update, + ]); + room.currentState.on(RoomStateEvent.NewMember, function(event, state, member) { + member.user = client.getUser(member.userId); + client.reEmitter.reEmit(member, [ + RoomMemberEvent.Name, + RoomMemberEvent.Typing, + RoomMemberEvent.PowerLevel, + RoomMemberEvent.Membership, + ]); + }); +} + +/* +function deregisterStateListeners(room: Room): void { // XXX cargoculted from sync.ts + // could do with a better way of achieving this. + room.currentState.removeAllListeners(RoomStateEvent.Events); + room.currentState.removeAllListeners(RoomStateEvent.Members); + room.currentState.removeAllListeners(RoomStateEvent.NewMember); +} */ + +function mapEvents(client: MatrixClient, roomId: string, events: object[], decrypt = true): MatrixEvent[] { + const mapper = client.getEventMapper({ decrypt }); + return (events as Array).map(function(e) { + e["room_id"] = roomId; + return mapper(e); + }); +} diff --git a/src/sliding-sync.ts b/src/sliding-sync.ts new file mode 100644 index 000000000..9d19ddfed --- /dev/null +++ b/src/sliding-sync.ts @@ -0,0 +1,757 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { logger } from './logger'; +import { IAbortablePromise } from "./@types/partials"; +import { MatrixClient } from "./client"; +import { IRoomEvent, IStateEvent } from "./sync-accumulator"; +import { TypedEventEmitter } from "./models//typed-event-emitter"; +import { sleep } from "./utils"; + +// /sync requests allow you to set a timeout= but the request may continue +// beyond that and wedge forever, so we need to track how long we are willing +// to keep open the connection. This constant is *ADDED* to the timeout= value +// to determine the max time we're willing to wait. +const BUFFER_PERIOD_MS = 10 * 1000; + +/** + * Represents a subscription to a room or set of rooms. Controls which events are returned. + */ +export interface MSC3575RoomSubscription { + required_state?: string[][]; + timeline_limit?: number; +} + +/** + * Controls which rooms are returned in a given list. + */ +export interface MSC3575Filter { + is_dm?: boolean; + is_encrypted?: boolean; + is_invite?: boolean; + is_tombstoned?: boolean; + room_name_like?: string; +} + +/** + * Represents a list subscription. + */ +export interface MSC3575List extends MSC3575RoomSubscription { + ranges: number[][]; + sort?: string[]; + filters?: MSC3575Filter; + slow_get_all_rooms?: boolean; +} + +/** + * A complete Sliding Sync request. + */ +export interface MSC3575SlidingSyncRequest { + // json body params + lists?: MSC3575List[]; + unsubscribe_rooms?: string[]; + room_subscriptions?: Record; + extensions?: object; + + // query params + pos?: string; + timeout?: number; + clientTimeout?: number; +} + +export interface MSC3575RoomData { + name: string; + required_state: IStateEvent[]; + timeline: (IRoomEvent | IStateEvent)[]; + notification_count?: number; + highlight_count?: number; + invite_state?: IStateEvent[]; + initial?: boolean; + limited?: boolean; + is_dm?: boolean; + prev_batch?: string; +} + +interface ListResponse { + count: number; + ops: Operation[]; +} + +interface BaseOperation { + op: string; +} + +interface DeleteOperation extends BaseOperation { + op: "DELETE"; + index: number; +} + +interface InsertOperation extends BaseOperation { + op: "INSERT"; + index: number; + room_id: string; +} + +interface InvalidateOperation extends BaseOperation { + op: "INVALIDATE"; + range: [number, number]; +} + +interface SyncOperation extends BaseOperation { + op: "SYNC"; + range: [number, number]; + room_ids: string[]; +} + +type Operation = DeleteOperation | InsertOperation | InvalidateOperation | SyncOperation; + +/** + * A complete Sliding Sync response + */ +export interface MSC3575SlidingSyncResponse { + pos: string; + lists: ListResponse[]; + rooms: Record; + extensions: object; +} + +export enum SlidingSyncState { + /** + * Fired by SlidingSyncEvent.Lifecycle event immediately before processing the response. + */ + RequestFinished = "FINISHED", + /** + * Fired by SlidingSyncEvent.Lifecycle event immediately after all room data listeners have been + * invoked, but before list listeners. + */ + Complete = "COMPLETE", +} + +/** + * Internal Class. SlidingList represents a single list in sliding sync. The list can have filters, + * multiple sliding windows, and maintains the index->room_id mapping. + */ +class SlidingList { + private list: MSC3575List; + private isModified: boolean; + + // returned data + public roomIndexToRoomId: Record; + public joinedCount: number; + + /** + * Construct a new sliding list. + * @param {MSC3575List} list The range, sort and filter values to use for this list. + */ + constructor(list: MSC3575List) { + this.replaceList(list); + } + + /** + * Mark this list as modified or not. Modified lists will return sticky params with calls to getList. + * This is useful for the first time the list is sent, or if the list has changed in some way. + * @param modified True to mark this list as modified so all sticky parameters will be re-sent. + */ + public setModified(modified: boolean): void { + this.isModified = modified; + } + + /** + * Update the list range for this list. Does not affect modified status as list ranges are non-sticky. + * @param newRanges The new ranges for the list + */ + public updateListRange(newRanges: number[][]): void { + this.list.ranges = JSON.parse(JSON.stringify(newRanges)); + } + + /** + * Replace list parameters. All fields will be replaced with the new list parameters. + * @param list The new list parameters + */ + public replaceList(list: MSC3575List): void { + list.filters = list.filters || {}; + list.ranges = list.ranges || []; + this.list = JSON.parse(JSON.stringify(list)); + this.isModified = true; + + // reset values as the join count may be very different (if filters changed) including the rooms + // (e.g. sort orders or sliding window ranges changed) + + // the constantly changing sliding window ranges. Not an array for performance reasons + // E.g. tracking ranges 0-99, 500-599, we don't want to have a 600 element array + this.roomIndexToRoomId = {}; + // the total number of joined rooms according to the server, always >= len(roomIndexToRoomId) + this.joinedCount = 0; + } + + /** + * Return a copy of the list suitable for a request body. + * @param {boolean} forceIncludeAllParams True to forcibly include all params even if the list + * hasn't been modified. Callers may want to do this if they are modifying the list prior to calling + * updateList. + */ + public getList(forceIncludeAllParams: boolean): MSC3575List { + let list = { + ranges: JSON.parse(JSON.stringify(this.list.ranges)), + }; + if (this.isModified || forceIncludeAllParams) { + list = JSON.parse(JSON.stringify(this.list)); + } + return list; + } + + /** + * Check if a given index is within the list range. This is required even though the /sync API + * provides explicit updates with index positions because of the following situation: + * 0 1 2 3 4 5 6 7 8 indexes + * a b c d e f COMMANDS: SYNC 0 2 a b c; SYNC 6 8 d e f; + * a b c d _ f COMMAND: DELETE 7; + * e a b c d f COMMAND: INSERT 0 e; + * c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it + * @param i The index to check + * @returns True if the index is within a sliding window + */ + public isIndexInRange(i: number): boolean { + for (const r of this.list.ranges) { + if (r[0] <= i && i <= r[1]) { + return true; + } + } + return false; + } +} + +/** + * When onResponse extensions should be invoked: before or after processing the main response. + */ +export enum ExtensionState { + // Call onResponse before processing the response body. This is useful when your extension is + // preparing the ground for the response body e.g. processing to-device messages before the + // encrypted event arrives. + PreProcess = "ExtState.PreProcess", + // Call onResponse after processing the response body. This is useful when your extension is + // decorating data from the client, and you rely on MatrixClient.getRoom returning the Room object + // e.g. room account data. + PostProcess = "ExtState.PostProcess", +} + +/** + * An interface that must be satisfied to register extensions + */ +export interface Extension { + /** + * The extension name to go under 'extensions' in the request body. + * @returns The JSON key. + */ + name(): string; + /** + * A function which is called when the request JSON is being formed. + * Returns the data to insert under this key. + * @param isInitial True when this is part of the initial request (send sticky params) + * @returns The request JSON to send. + */ + onRequest(isInitial: boolean): object; + /** + * A function which is called when there is response JSON under this extension. + * @param data The response JSON under the extension name. + */ + onResponse(data: object); + /** + * Controls when onResponse should be called. + * @returns The state when it should be called. + */ + when(): ExtensionState; +} + +/** + * Events which can be fired by the SlidingSync class. These are designed to provide different levels + * of information when processing sync responses. + * - RoomData: concerns rooms, useful for SlidingSyncSdk to update its knowledge of rooms. + * - Lifecycle: concerns callbacks at various well-defined points in the sync process. + * - List: concerns lists, useful for UI layers to re-render room lists. + * Specifically, the order of event invocation is: + * - Lifecycle (state=RequestFinished) + * - RoomData (N times) + * - Lifecycle (state=Complete) + * - List (at most once per list) + */ +export enum SlidingSyncEvent { + /** + * This event fires when there are updates for a room. Fired as and when rooms are encountered + * in the response. + */ + RoomData = "SlidingSync.RoomData", + /** + * This event fires at various points in the /sync loop lifecycle. + * - SlidingSyncState.RequestFinished: Fires after we receive a valid response but before the + * response has been processed. Perform any pre-process steps here. If there was a problem syncing, + * `err` will be set (e.g network errors). + * - SlidingSyncState.Complete: Fires after all SlidingSyncEvent.RoomData have been fired but before + * SlidingSyncEvent.List. + */ + Lifecycle = "SlidingSync.Lifecycle", + /** + * This event fires whenever there has been a change to this list index. It fires exactly once + * per list, even if there were multiple operations for the list. + * It fires AFTER Lifecycle and RoomData events. + */ + List = "SlidingSync.List", +} + +export type SlidingSyncEventHandlerMap = { + [SlidingSyncEvent.RoomData]: (roomId: string, roomData: MSC3575RoomData) => void; + [SlidingSyncEvent.Lifecycle]: (state: SlidingSyncState, resp: MSC3575SlidingSyncResponse, err: Error) => void; + [SlidingSyncEvent.List]: ( + listIndex: number, joinedCount: number, roomIndexToRoomId: Record, + ) => void; +}; + +/** + * SlidingSync is a high-level data structure which controls the majority of sliding sync. + * It has no hooks into JS SDK except for needing a MatrixClient to perform the HTTP request. + * This means this class (and everything it uses) can be used in isolation from JS SDK if needed. + * To hook this up with the JS SDK, you need to use SlidingSyncSdk. + */ +export class SlidingSync extends TypedEventEmitter { + private lists: SlidingList[]; + private listModifiedCount = 0; + private terminated = false; + // flag set when resend() is called because we cannot rely on detecting AbortError in JS SDK :( + private needsResend = false; + // map of extension name to req/resp handler + private extensions: Record = {}; + + private desiredRoomSubscriptions = new Set(); // the *desired* room subscriptions + private confirmedRoomSubscriptions = new Set(); + + private pendingReq?: IAbortablePromise; + + /** + * Create a new sliding sync instance + * @param {string} proxyBaseUrl The base URL of the sliding sync proxy + * @param {MSC3575List[]} lists The lists to use for sliding sync. + * @param {MSC3575RoomSubscription} roomSubscriptionInfo The params to use for room subscriptions. + * @param {MatrixClient} client The client to use for /sync calls. + * @param {number} timeoutMS The number of milliseconds to wait for a response. + */ + constructor( + private readonly proxyBaseUrl: string, + lists: MSC3575List[], + private roomSubscriptionInfo: MSC3575RoomSubscription, + private readonly client: MatrixClient, + private readonly timeoutMS: number, + ) { + super(); + this.lists = lists.map((l) => new SlidingList(l)); + } + + /** + * Get the length of the sliding lists. + * @returns The number of lists in the sync request + */ + public listLength(): number { + return this.lists.length; + } + + /** + * Get the room data for a list. + * @param index The list index + * @returns The list data which contains the rooms in this list + */ + public getListData(index: number): {joinedCount: number, roomIndexToRoomId: Record} { + if (!this.lists[index]) { + return null; + } + return { + joinedCount: this.lists[index].joinedCount, + roomIndexToRoomId: Object.assign({}, this.lists[index].roomIndexToRoomId), + }; + } + + /** + * Get the full list parameters for a list index. This function is provided for callers to use + * in conjunction with setList to update fields on an existing list. + * @param index The list index to get the list for. + * @returns A copy of the list or undefined. + */ + public getList(index: number): MSC3575List { + if (!this.lists[index]) { + return null; + } + return this.lists[index].getList(true); + } + + /** + * Set new ranges for an existing list. Calling this function when _only_ the ranges have changed + * is more efficient than calling setList(index,list) as this function won't resend sticky params, + * whereas setList always will. + * @param index The list index to modify + * @param ranges The new ranges to apply. + */ + public setListRanges(index: number, ranges: number[][]): void { + this.lists[index].updateListRange(ranges); + this.resend(); + } + + /** + * Add or replace a list. Calling this function will interrupt the /sync request to resend new + * lists. + * @param index The index to modify + * @param list The new list parameters. + */ + public setList(index: number, list: MSC3575List): void { + if (this.lists[index]) { + this.lists[index].replaceList(list); + } else { + this.lists[index] = new SlidingList(list); + } + this.listModifiedCount += 1; + this.resend(); + } + + /** + * Get the room subscriptions for the sync API. + * @returns A copy of the desired room subscriptions. + */ + public getRoomSubscriptions(): Set { + return new Set(Array.from(this.desiredRoomSubscriptions)); + } + + /** + * Modify the room subscriptions for the sync API. Calling this function will interrupt the + * /sync request to resend new subscriptions. If the /sync stream has not started, this will + * prepare the room subscriptions for when start() is called. + * @param s The new desired room subscriptions. + */ + public modifyRoomSubscriptions(s: Set) { + this.desiredRoomSubscriptions = s; + this.resend(); + } + + /** + * Modify which events to retrieve for room subscriptions. Invalidates all room subscriptions + * such that they will be sent up afresh. + * @param rs The new room subscription fields to fetch. + */ + public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): void { + this.roomSubscriptionInfo = rs; + this.confirmedRoomSubscriptions = new Set(); + this.resend(); + } + + /** + * Register an extension to send with the /sync request. + * @param ext The extension to register. + */ + public registerExtension(ext: Extension): void { + if (this.extensions[ext.name()]) { + throw new Error(`registerExtension: ${ext.name()} already exists as an extension`); + } + this.extensions[ext.name()] = ext; + } + + private getExtensionRequest(isInitial: boolean): object { + const ext = {}; + Object.keys(this.extensions).forEach((extName) => { + ext[extName] = this.extensions[extName].onRequest(isInitial); + }); + return ext; + } + + private onPreExtensionsResponse(ext: object): void { + Object.keys(ext).forEach((extName) => { + if (this.extensions[extName].when() == ExtensionState.PreProcess) { + this.extensions[extName].onResponse(ext[extName]); + } + }); + } + + private onPostExtensionsResponse(ext: object): void { + Object.keys(ext).forEach((extName) => { + if (this.extensions[extName].when() == ExtensionState.PostProcess) { + this.extensions[extName].onResponse(ext[extName]); + } + }); + } + + /** + * Invoke all attached room data listeners. + * @param {string} roomId The room which received some data. + * @param {object} roomData The raw sliding sync response JSON. + */ + private invokeRoomDataListeners(roomId: string, roomData: MSC3575RoomData): void { + if (!roomData.required_state) { roomData.required_state = []; } + if (!roomData.timeline) { roomData.timeline = []; } + this.emit(SlidingSyncEvent.RoomData, roomId, roomData); + } + + /** + * Invoke all attached lifecycle listeners. + * @param {SlidingSyncState} state The Lifecycle state + * @param {object} resp The raw sync response JSON + * @param {Error?} err Any error that occurred when making the request e.g. network errors. + */ + private invokeLifecycleListeners(state: SlidingSyncState, resp: MSC3575SlidingSyncResponse, err?: Error): void { + this.emit(SlidingSyncEvent.Lifecycle, state, resp, err); + } + + private processListOps(list: ListResponse, listIndex: number): void { + let gapIndex = -1; + list.ops.forEach((op: Operation) => { + switch (op.op) { + case "DELETE": { + logger.debug("DELETE", listIndex, op.index, ";"); + delete this.lists[listIndex].roomIndexToRoomId[op.index]; + gapIndex = op.index; + break; + } + case "INSERT": { + logger.debug( + "INSERT", + listIndex, + op.index, + op.room_id, + ";", + ); + if (this.lists[listIndex].roomIndexToRoomId[op.index]) { + // something is in this space, shift items out of the way + if (gapIndex < 0) { + logger.debug( + "cannot work out where gap is, INSERT without previous DELETE! List: ", + listIndex, + ); + return; + } + // 0,1,2,3 index + // [A,B,C,D] + // DEL 3 + // [A,B,C,_] + // INSERT E 0 + // [E,A,B,C] + // gapIndex=3, op.index=0 + if (gapIndex > op.index) { + // the gap is further down the list, shift every element to the right + // starting at the gap so we can just shift each element in turn: + // [A,B,C,_] gapIndex=3, op.index=0 + // [A,B,C,C] i=3 + // [A,B,B,C] i=2 + // [A,A,B,C] i=1 + // Terminate. We'll assign into op.index next. + for (let i = gapIndex; i > op.index; i--) { + if (this.lists[listIndex].isIndexInRange(i)) { + this.lists[listIndex].roomIndexToRoomId[i] = + this.lists[listIndex].roomIndexToRoomId[ + i - 1 + ]; + } + } + } else if (gapIndex < op.index) { + // the gap is further up the list, shift every element to the left + // starting at the gap so we can just shift each element in turn + for (let i = gapIndex; i < op.index; i++) { + if (this.lists[listIndex].isIndexInRange(i)) { + this.lists[listIndex].roomIndexToRoomId[i] = + this.lists[listIndex].roomIndexToRoomId[ + i + 1 + ]; + } + } + } + } + this.lists[listIndex].roomIndexToRoomId[op.index] = op.room_id; + break; + } + case "INVALIDATE": { + const startIndex = op.range[0]; + for (let i = startIndex; i <= op.range[1]; i++) { + delete this.lists[listIndex].roomIndexToRoomId[i]; + } + logger.debug( + "INVALIDATE", + listIndex, + op.range[0], + op.range[1], + ";", + ); + break; + } + case "SYNC": { + const startIndex = op.range[0]; + for (let i = startIndex; i <= op.range[1]; i++) { + const roomId = op.room_ids[i - startIndex]; + if (!roomId) { + break; // we are at the end of list + } + this.lists[listIndex].roomIndexToRoomId[i] = roomId; + } + logger.debug( + "SYNC", + listIndex, + op.range[0], + op.range[1], + op.room_ids.join(" "), + ";", + ); + break; + } + } + }); + } + + /** + * Resend a Sliding Sync request. Used when something has changed in the request. + */ + public resend(): void { + this.needsResend = true; + this.pendingReq?.abort(); + } + + /** + * Stop syncing with the server. + */ + public stop(): void { + this.terminated = true; + this.pendingReq?.abort(); + // remove all listeners so things can be GC'd + this.removeAllListeners(SlidingSyncEvent.Lifecycle); + this.removeAllListeners(SlidingSyncEvent.List); + this.removeAllListeners(SlidingSyncEvent.RoomData); + } + + /** + * Start syncing with the server. Blocks until stopped. + */ + public async start() { + let currentPos: string; + while (!this.terminated) { + this.needsResend = false; + let doNotUpdateList = false; + let resp: MSC3575SlidingSyncResponse; + try { + const listModifiedCount = this.listModifiedCount; + const reqBody: MSC3575SlidingSyncRequest = { + lists: this.lists.map((l) => { + return l.getList(false); + }), + pos: currentPos, + timeout: this.timeoutMS, + clientTimeout: this.timeoutMS + BUFFER_PERIOD_MS, + extensions: this.getExtensionRequest(currentPos === undefined), + }; + // check if we are (un)subscribing to a room and modify request this one time for it + const newSubscriptions = difference(this.desiredRoomSubscriptions, this.confirmedRoomSubscriptions); + const unsubscriptions = difference(this.confirmedRoomSubscriptions, this.desiredRoomSubscriptions); + if (unsubscriptions.size > 0) { + reqBody.unsubscribe_rooms = Array.from(unsubscriptions); + } + if (newSubscriptions.size > 0) { + reqBody.room_subscriptions = {}; + for (const roomId of newSubscriptions) { + reqBody.room_subscriptions[roomId] = this.roomSubscriptionInfo; + } + } + this.pendingReq = this.client.slidingSync(reqBody, this.proxyBaseUrl); + resp = await this.pendingReq; + logger.debug(resp); + currentPos = resp.pos; + // update what we think we're subscribed to. + for (const roomId of newSubscriptions) { + this.confirmedRoomSubscriptions.add(roomId); + } + for (const roomId of unsubscriptions) { + this.confirmedRoomSubscriptions.delete(roomId); + } + if (listModifiedCount !== this.listModifiedCount) { + // the lists have been modified whilst we were waiting for 'await' to return, but the abort() + // call did nothing. It is NOT SAFE to modify the list array now. We'll process the response but + // not update list pointers. + logger.debug("list modified during await call, not updating list"); + doNotUpdateList = true; + } + // mark all these lists as having been sent as sticky so we don't keep sending sticky params + this.lists.forEach((l) => { + l.setModified(false); + }); + // set default empty values so we don't need to null check + resp.lists = resp.lists || []; + resp.rooms = resp.rooms || {}; + resp.extensions = resp.extensions || {}; + resp.lists.forEach((val, i) => { + this.lists[i].joinedCount = val.count; + }); + this.invokeLifecycleListeners( + SlidingSyncState.RequestFinished, + resp, + ); + } catch (err) { + if (err.httpStatus) { + this.invokeLifecycleListeners( + SlidingSyncState.RequestFinished, + null, + err, + ); + await sleep(3000); + } else if (this.needsResend || err === "aborted") { + // don't sleep as we caused this error by abort()ing the request. + // we check for 'aborted' because that's the error Jest returns and without it + // we get warnings about not exiting fast enough. + continue; + } else { + logger.error(err); + await sleep(3000); + } + } + if (!resp) { + continue; + } + this.onPreExtensionsResponse(resp.extensions); + + Object.keys(resp.rooms).forEach((roomId) => { + this.invokeRoomDataListeners( + roomId, + resp.rooms[roomId], + ); + }); + + const listIndexesWithUpdates: Set = new Set(); + if (!doNotUpdateList) { + resp.lists.forEach((list, listIndex) => { + list.ops = list.ops || []; + if (list.ops.length > 0) { + listIndexesWithUpdates.add(listIndex); + } + this.processListOps(list, listIndex); + }); + } + this.invokeLifecycleListeners(SlidingSyncState.Complete, resp); + this.onPostExtensionsResponse(resp.extensions); + listIndexesWithUpdates.forEach((i) => { + this.emit( + SlidingSyncEvent.List, + i, this.lists[i].joinedCount, Object.assign({}, this.lists[i].roomIndexToRoomId), + ); + }); + } + } +} + +const difference = (setA: Set, setB: Set): Set => { + const diff = new Set(setA); + for (const elem of setB) { + diff.delete(elem); + } + return diff; +}; diff --git a/yarn.lock b/yarn.lock index 9a7a58023..c40a7a501 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4807,7 +4807,7 @@ matrix-events-sdk@^0.0.1-beta.7: resolved "https://registry.yarnpkg.com/matrix-events-sdk/-/matrix-events-sdk-0.0.1-beta.7.tgz#5ffe45eba1f67cc8d7c2377736c728b322524934" integrity sha512-9jl4wtWanUFSy2sr2lCjErN/oC8KTAtaeaozJtrgot1JiQcEI4Rda9OLgQ7nLKaqb4Z/QUx/fR3XpDzm5Jy1JA== -matrix-mock-request@^2.0.1: +matrix-mock-request@^2.1.0: version "2.1.0" resolved "https://registry.yarnpkg.com/matrix-mock-request/-/matrix-mock-request-2.1.0.tgz#86f5b0ef846865d0767d3a8e64f5bcd6ca94c178" integrity sha512-Cjpl3yP6h0yu5GKG89m1XZXZlm69Kg/qHV41N/t6SrQsgcfM3Bfavqx9YrtG0UnuXGy4bBSZIe1QiWVeFPZw1A==