1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-07-31 15:24:23 +03:00

Add support for MSC3575: Sliding Sync (#2242)

* sliding sync: add client function and add stub sliding-sync.ts

Mostly c/p from sync.ts. Define interfaces for MSC3575 sliding
sync types. Complete WIP!

* Add core sliding sync classes

* Add integration tests for sliding sync api basics

* gut unused code; add more types

* Use SlidingSync in MatrixClient; stub functions for Sync

Enough to make ele-web actually load okay with 0 rooms.

* Start feeding through room data to the client

* Bugfixes so it sorta ish works

* Refactor the public API for sliding sync

Still needs some work but it's a start.

* Use EventEmitter for callbacks. Add ability to adjust lists and listen for list updates.

- Have atomic getList/setList operations on SlidingSync to update windows etc
- Add a list callback which is invoked with the list indicies and joined count.

* Add stub tests; add listenUntil to make tests easier to read

* No need to resend now

* Add more sliding sync tests; add new setListRanges function

* build tests upon one another to reduce boilerplate and c/p

* More thorough sliding sync tests

* Dependency inject SlidingSync in Client opts when calling startClient()

* Linting

* Fix crash when opts is undefined

* Fix up docs to make CI happy

* Remove all listeners when stop()d to allow for GC

* Add support for extensions

* Add ExtensionE2EE automatically if opts.crypto is present

* Add ExtensionToDevice automatically

* Bugfixes for to_device message processing

* default events to []

* bugfix: don't tightloop when the server is down

Caused by not detecting abort() correctly

* Return null for bad index positions

* Add getListData to get the initial calculated list response

* Add is_tombstoned

* More comments

* Add support for account data extension; rejig extension interface

* Handle invite_state

* Feed through prev_batch tokens

* Linting

* Fix tests

* Linting

* Iterate PR

* Iterate tests and remove unused code

* Update matrix-mock-request

* Make tests happier

* Remove DEBUG/debuglog and use logger.debug

* Update the API to the latest MSC; fixup tests

* Use undefined not null to make it work with the latest changes

* Don't recreate rooms when initial: true

* Add defensive code when unsigned.transaction_id is missing

We can still pair up events by looking at the event_id. We need
to do this in Sliding Sync because the proxy has limitations that
means it cannot guarantee it will always incude a transaction_id
in unsigned. The main reason why is due to the following race condition:
 - A and B are in a DM room.
 - Both are using the proxy.
 - A says "hello".
 - B's sync stream gets "hello" on the proxy. At this point the proxy
   knows it needs to deliver it to A. It does so, but this event has
   no transaction_id as it came down B's sync stream, not A's.
 - If instead, A's sync stream gets "hello" on the proxy, the proxy
   will deliver this message with the transaction_id correctly set.

There are no guarantees that A's sync stream will get the event in a
timely manner, hence the decision to just deliver the events as soon
as the proxy gets the event. This will not be an issue for native
Sliding Sync implementations; this is just a proxy issue.

* Linting

* Add additional sliding sync tests

* Begin adding SlidingSyncSdk tests

* Linting

* Add more sliding sync sdk tests

* Prep work for extension tests

* Linting

* Add account data extension tests

* add to-device tests

* Add E2EE extension tests

* Code smell fixes and extra tests

* Add test for no-txn-id local echo

* Add tests for resolveProfilesToInvites

* Add tests for moving entries down as well as up the list

* Remove conn-management.ts

* Actually verify the event was removed from the txn map

* Handle the case when /sync returns before /send without a txn_id

And ensure all the tests actually test the right things.

* Linting

Co-authored-by: Michael Telatynski <7t3chguy@gmail.com>
This commit is contained in:
kegsay
2022-07-12 15:09:58 +01:00
committed by GitHub
parent 7a18991342
commit 8d7eaa769a
11 changed files with 3243 additions and 8 deletions

View File

@ -102,7 +102,7 @@
"jest-localstorage-mock": "^2.4.6",
"jest-sonar-reporter": "^2.0.0",
"jsdoc": "^3.6.6",
"matrix-mock-request": "^2.0.1",
"matrix-mock-request": "^2.1.0",
"rimraf": "^3.0.2",
"terser": "^5.5.1",
"tsify": "^5.0.2",

View File

@ -0,0 +1,732 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// eslint-disable-next-line no-restricted-imports
import MockHttpBackend from "matrix-mock-request";
import { fail } from "assert";
import { SlidingSync, SlidingSyncEvent, MSC3575RoomData, SlidingSyncState, Extension } from "../../src/sliding-sync";
import { TestClient } from "../TestClient";
import { IRoomEvent, IStateEvent } from "../../src/sync-accumulator";
import {
MatrixClient, MatrixEvent, NotificationCountType, JoinRule, MatrixError,
EventType, IPushRules, PushRuleKind, TweakName, ClientEvent,
} from "../../src";
import { SlidingSyncSdk } from "../../src/sliding-sync-sdk";
import { SyncState } from "../../src/sync";
import { IStoredClientOpts } from "../../src/client";
describe("SlidingSyncSdk", () => {
let client: MatrixClient = null;
let httpBackend: MockHttpBackend = null;
let sdk: SlidingSyncSdk = null;
let mockSlidingSync: SlidingSync = null;
const selfUserId = "@alice:localhost";
const selfAccessToken = "aseukfgwef";
const mockifySlidingSync = (s: SlidingSync): SlidingSync => {
s.getList = jest.fn();
s.getListData = jest.fn();
s.getRoomSubscriptions = jest.fn();
s.listLength = jest.fn();
s.modifyRoomSubscriptionInfo = jest.fn();
s.modifyRoomSubscriptions = jest.fn();
s.registerExtension = jest.fn();
s.setList = jest.fn();
s.setListRanges = jest.fn();
s.start = jest.fn();
s.stop = jest.fn();
s.resend = jest.fn();
return s;
};
// shorthand way to make events without filling in all the fields
let eventIdCounter = 0;
const mkOwnEvent = (evType: string, content: object): IRoomEvent => {
eventIdCounter++;
return {
type: evType,
content: content,
sender: selfUserId,
origin_server_ts: Date.now(),
event_id: "$" + eventIdCounter,
};
};
const mkOwnStateEvent = (evType: string, content: object, stateKey?: string): IStateEvent => {
eventIdCounter++;
return {
type: evType,
state_key: stateKey,
content: content,
sender: selfUserId,
origin_server_ts: Date.now(),
event_id: "$" + eventIdCounter,
};
};
const assertTimelineEvents = (got: MatrixEvent[], want: IRoomEvent[]): void => {
expect(got.length).toEqual(want.length);
got.forEach((m, i) => {
expect(m.getType()).toEqual(want[i].type);
expect(m.getSender()).toEqual(want[i].sender);
expect(m.getId()).toEqual(want[i].event_id);
expect(m.getContent()).toEqual(want[i].content);
expect(m.getTs()).toEqual(want[i].origin_server_ts);
if (want[i].unsigned) {
expect(m.getUnsigned()).toEqual(want[i].unsigned);
}
const maybeStateEvent = want[i] as IStateEvent;
if (maybeStateEvent.state_key) {
expect(m.getStateKey()).toEqual(maybeStateEvent.state_key);
}
});
};
// assign client/httpBackend globals
const setupClient = async (testOpts?: Partial<IStoredClientOpts&{withCrypto: boolean}>) => {
testOpts = testOpts || {};
const testClient = new TestClient(selfUserId, "DEVICE", selfAccessToken);
httpBackend = testClient.httpBackend;
client = testClient.client;
mockSlidingSync = mockifySlidingSync(new SlidingSync("", [], {}, client, 0));
if (testOpts.withCrypto) {
httpBackend.when("GET", "/room_keys/version").respond(404, {});
await client.initCrypto();
testOpts.crypto = client.crypto;
}
httpBackend.when("GET", "/_matrix/client/r0/pushrules").respond(200, {});
sdk = new SlidingSyncSdk(mockSlidingSync, client, testOpts);
};
// tear down client/httpBackend globals
const teardownClient = () => {
client.stopClient();
return httpBackend.stop();
};
// find an extension on a SlidingSyncSdk instance
const findExtension = (name: string): Extension => {
expect(mockSlidingSync.registerExtension).toHaveBeenCalled();
const mockFn = mockSlidingSync.registerExtension as jest.Mock;
// find the extension
for (let i = 0; i < mockFn.mock.calls.length; i++) {
const calledExtension = mockFn.mock.calls[i][0] as Extension;
if (calledExtension && calledExtension.name() === name) {
return calledExtension;
}
}
fail("cannot find extension " + name);
};
describe("sync/stop", () => {
beforeAll(async () => {
await setupClient();
});
afterAll(teardownClient);
it("can sync()", async () => {
const hasSynced = sdk.sync();
await httpBackend.flushAllExpected();
await hasSynced;
expect(mockSlidingSync.start).toBeCalled();
});
it("can stop()", async () => {
sdk.stop();
expect(mockSlidingSync.stop).toBeCalled();
});
});
describe("rooms", () => {
beforeAll(async () => {
await setupClient();
});
afterAll(teardownClient);
describe("initial", () => {
beforeAll(async () => {
const hasSynced = sdk.sync();
await httpBackend.flushAllExpected();
await hasSynced;
});
// inject some rooms with different fields set.
// All rooms are new so they all have initial: true
const roomA = "!a_state_and_timeline:localhost";
const roomB = "!b_timeline_only:localhost";
const roomC = "!c_with_highlight_count:localhost";
const roomD = "!d_with_notif_count:localhost";
const roomE = "!e_with_invite:localhost";
const roomF = "!f_calc_room_name:localhost";
const data: Record<string, MSC3575RoomData> = {
[roomA]: {
name: "A",
required_state: [
mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""),
mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId),
mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""),
mkOwnStateEvent(EventType.RoomName, { name: "A" }, ""),
],
timeline: [
mkOwnEvent(EventType.RoomMessage, { body: "hello A" }),
mkOwnEvent(EventType.RoomMessage, { body: "world A" }),
],
initial: true,
},
[roomB]: {
name: "B",
required_state: [],
timeline: [
mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""),
mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId),
mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""),
mkOwnEvent(EventType.RoomMessage, { body: "hello B" }),
mkOwnEvent(EventType.RoomMessage, { body: "world B" }),
],
initial: true,
},
[roomC]: {
name: "C",
required_state: [],
timeline: [
mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""),
mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId),
mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""),
mkOwnEvent(EventType.RoomMessage, { body: "hello C" }),
mkOwnEvent(EventType.RoomMessage, { body: "world C" }),
],
highlight_count: 5,
initial: true,
},
[roomD]: {
name: "D",
required_state: [],
timeline: [
mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""),
mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId),
mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""),
mkOwnEvent(EventType.RoomMessage, { body: "hello D" }),
mkOwnEvent(EventType.RoomMessage, { body: "world D" }),
],
notification_count: 5,
initial: true,
},
[roomE]: {
name: "E",
required_state: [],
timeline: [],
invite_state: [
{
type: EventType.RoomMember,
content: { membership: "invite" },
state_key: selfUserId,
sender: "@bob:localhost",
event_id: "$room_e_invite",
origin_server_ts: 123456,
},
{
type: "m.room.join_rules",
content: { join_rule: "invite" },
state_key: "",
sender: "@bob:localhost",
event_id: "$room_e_join_rule",
origin_server_ts: 123456,
},
],
initial: true,
},
[roomF]: {
name: "#foo:localhost",
required_state: [
mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""),
mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId),
mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""),
mkOwnStateEvent(EventType.RoomCanonicalAlias, { alias: "#foo:localhost" }, ""),
mkOwnStateEvent(EventType.RoomName, { name: "This should be ignored" }, ""),
],
timeline: [
mkOwnEvent(EventType.RoomMessage, { body: "hello A" }),
mkOwnEvent(EventType.RoomMessage, { body: "world A" }),
],
initial: true,
},
};
it("can be created with required_state and timeline", () => {
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomA, data[roomA]);
const gotRoom = client.getRoom(roomA);
expect(gotRoom).toBeDefined();
expect(gotRoom.name).toEqual(data[roomA].name);
expect(gotRoom.getMyMembership()).toEqual("join");
assertTimelineEvents(gotRoom.getLiveTimeline().getEvents().slice(-2), data[roomA].timeline);
});
it("can be created with timeline only", () => {
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomB, data[roomB]);
const gotRoom = client.getRoom(roomB);
expect(gotRoom).toBeDefined();
expect(gotRoom.name).toEqual(data[roomB].name);
expect(gotRoom.getMyMembership()).toEqual("join");
assertTimelineEvents(gotRoom.getLiveTimeline().getEvents().slice(-5), data[roomB].timeline);
});
it("can be created with a highlight_count", () => {
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomC, data[roomC]);
const gotRoom = client.getRoom(roomC);
expect(gotRoom).toBeDefined();
expect(
gotRoom.getUnreadNotificationCount(NotificationCountType.Highlight),
).toEqual(data[roomC].highlight_count);
});
it("can be created with a notification_count", () => {
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomD, data[roomD]);
const gotRoom = client.getRoom(roomD);
expect(gotRoom).toBeDefined();
expect(
gotRoom.getUnreadNotificationCount(NotificationCountType.Total),
).toEqual(data[roomD].notification_count);
});
it("can be created with invite_state", () => {
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomE, data[roomE]);
const gotRoom = client.getRoom(roomE);
expect(gotRoom).toBeDefined();
expect(gotRoom.getMyMembership()).toEqual("invite");
expect(gotRoom.currentState.getJoinRule()).toEqual(JoinRule.Invite);
});
it("uses the 'name' field to caluclate the room name", () => {
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomF, data[roomF]);
const gotRoom = client.getRoom(roomF);
expect(gotRoom).toBeDefined();
expect(
gotRoom.name,
).toEqual(data[roomF].name);
});
describe("updating", () => {
it("can update with a new timeline event", async () => {
const newEvent = mkOwnEvent(EventType.RoomMessage, { body: "new event A" });
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomA, {
timeline: [newEvent],
required_state: [],
name: data[roomA].name,
});
const gotRoom = client.getRoom(roomA);
expect(gotRoom).toBeDefined();
const newTimeline = data[roomA].timeline;
newTimeline.push(newEvent);
assertTimelineEvents(gotRoom.getLiveTimeline().getEvents().slice(-3), newTimeline);
});
it("can update with a new required_state event", async () => {
let gotRoom = client.getRoom(roomB);
expect(gotRoom.getJoinRule()).toEqual(JoinRule.Invite); // default
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomB, {
required_state: [
mkOwnStateEvent("m.room.join_rules", { join_rule: "restricted" }, ""),
],
timeline: [],
name: data[roomB].name,
});
gotRoom = client.getRoom(roomB);
expect(gotRoom).toBeDefined();
expect(gotRoom.getJoinRule()).toEqual(JoinRule.Restricted);
});
it("can update with a new highlight_count", async () => {
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomC, {
name: data[roomC].name,
required_state: [],
timeline: [],
highlight_count: 1,
});
const gotRoom = client.getRoom(roomC);
expect(gotRoom).toBeDefined();
expect(
gotRoom.getUnreadNotificationCount(NotificationCountType.Highlight),
).toEqual(1);
});
it("can update with a new notification_count", async () => {
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomD, {
name: data[roomD].name,
required_state: [],
timeline: [],
notification_count: 1,
});
const gotRoom = client.getRoom(roomD);
expect(gotRoom).toBeDefined();
expect(
gotRoom.getUnreadNotificationCount(NotificationCountType.Total),
).toEqual(1);
});
});
});
});
describe("lifecycle", () => {
beforeAll(async () => {
await setupClient();
const hasSynced = sdk.sync();
await httpBackend.flushAllExpected();
await hasSynced;
});
const FAILED_SYNC_ERROR_THRESHOLD = 3; // would be nice to export the const in the actual class...
it("emits SyncState.Reconnecting when < FAILED_SYNC_ERROR_THRESHOLD & SyncState.Error when over", async () => {
mockSlidingSync.emit(
SlidingSyncEvent.Lifecycle, SlidingSyncState.Complete,
{ pos: "h", lists: [], rooms: {}, extensions: {} }, null,
);
expect(sdk.getSyncState()).toEqual(SyncState.Syncing);
mockSlidingSync.emit(
SlidingSyncEvent.Lifecycle, SlidingSyncState.RequestFinished, null, new Error("generic"),
);
expect(sdk.getSyncState()).toEqual(SyncState.Reconnecting);
for (let i = 0; i < FAILED_SYNC_ERROR_THRESHOLD; i++) {
mockSlidingSync.emit(
SlidingSyncEvent.Lifecycle, SlidingSyncState.RequestFinished, null, new Error("generic"),
);
}
expect(sdk.getSyncState()).toEqual(SyncState.Error);
});
it("emits SyncState.Syncing after a previous SyncState.Error", async () => {
mockSlidingSync.emit(
SlidingSyncEvent.Lifecycle,
SlidingSyncState.Complete,
{ pos: "i", lists: [], rooms: {}, extensions: {} },
null,
);
expect(sdk.getSyncState()).toEqual(SyncState.Syncing);
});
it("emits SyncState.Error immediately when receiving M_UNKNOWN_TOKEN and stops syncing", async () => {
expect(mockSlidingSync.stop).not.toBeCalled();
mockSlidingSync.emit(SlidingSyncEvent.Lifecycle, SlidingSyncState.RequestFinished, null, new MatrixError({
errcode: "M_UNKNOWN_TOKEN",
message: "Oh no your access token is no longer valid",
}));
expect(sdk.getSyncState()).toEqual(SyncState.Error);
expect(mockSlidingSync.stop).toBeCalled();
});
});
describe("opts", () => {
afterEach(teardownClient);
it("can resolveProfilesToInvites", async () => {
await setupClient({
resolveInvitesToProfiles: true,
});
const roomId = "!resolveProfilesToInvites:localhost";
const invitee = "@invitee:localhost";
const inviteeProfile = {
avatar_url: "mxc://foobar",
displayname: "The Invitee",
};
httpBackend.when("GET", "/profile").respond(200, inviteeProfile);
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomId, {
initial: true,
name: "Room with Invite",
required_state: [],
timeline: [
mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""),
mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId),
mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""),
mkOwnStateEvent(EventType.RoomMember, { membership: "invite" }, invitee),
],
});
await httpBackend.flush("/profile", 1, 1000);
const room = client.getRoom(roomId);
expect(room).toBeDefined();
const inviteeMember = room.getMember(invitee);
expect(inviteeMember).toBeDefined();
expect(inviteeMember.getMxcAvatarUrl()).toEqual(inviteeProfile.avatar_url);
expect(inviteeMember.name).toEqual(inviteeProfile.displayname);
});
});
describe("ExtensionE2EE", () => {
let ext: Extension;
beforeAll(async () => {
await setupClient({
withCrypto: true,
});
const hasSynced = sdk.sync();
await httpBackend.flushAllExpected();
await hasSynced;
ext = findExtension("e2ee");
});
afterAll(async () => {
// needed else we do some async operations in the background which can cause Jest to whine:
// "Cannot log after tests are done. Did you forget to wait for something async in your test?"
// Attempted to log "Saving device tracking data null"."
client.crypto.stop();
});
it("gets enabled on the initial request only", () => {
expect(ext.onRequest(true)).toEqual({
enabled: true,
});
expect(ext.onRequest(false)).toEqual(undefined);
});
it("can update device lists", () => {
ext.onResponse({
device_lists: {
changed: ["@alice:localhost"],
left: ["@bob:localhost"],
},
});
// TODO: more assertions?
});
it("can update OTK counts", () => {
client.crypto.updateOneTimeKeyCount = jest.fn();
ext.onResponse({
device_one_time_keys_count: {
signed_curve25519: 42,
},
});
expect(client.crypto.updateOneTimeKeyCount).toHaveBeenCalledWith(42);
ext.onResponse({
device_one_time_keys_count: {
not_signed_curve25519: 42,
// missing field -> default to 0
},
});
expect(client.crypto.updateOneTimeKeyCount).toHaveBeenCalledWith(0);
});
it("can update fallback keys", () => {
ext.onResponse({
device_unused_fallback_key_types: ["signed_curve25519"],
});
expect(client.crypto.getNeedsNewFallback()).toEqual(false);
ext.onResponse({
device_unused_fallback_key_types: ["not_signed_curve25519"],
});
expect(client.crypto.getNeedsNewFallback()).toEqual(true);
});
});
describe("ExtensionAccountData", () => {
let ext: Extension;
beforeAll(async () => {
await setupClient();
const hasSynced = sdk.sync();
await httpBackend.flushAllExpected();
await hasSynced;
ext = findExtension("account_data");
});
it("gets enabled on the initial request only", () => {
expect(ext.onRequest(true)).toEqual({
enabled: true,
});
expect(ext.onRequest(false)).toEqual(undefined);
});
it("processes global account data", async () => {
const globalType = "global_test";
const globalContent = {
info: "here",
};
let globalData = client.getAccountData(globalType);
expect(globalData).toBeUndefined();
ext.onResponse({
global: [
{
type: globalType,
content: globalContent,
},
],
});
globalData = client.getAccountData(globalType);
expect(globalData).toBeDefined();
expect(globalData.getContent()).toEqual(globalContent);
});
it("processes rooms account data", async () => {
const roomId = "!room:id";
mockSlidingSync.emit(SlidingSyncEvent.RoomData, roomId, {
name: "Room with account data",
required_state: [],
timeline: [
mkOwnStateEvent(EventType.RoomCreate, { creator: selfUserId }, ""),
mkOwnStateEvent(EventType.RoomMember, { membership: "join" }, selfUserId),
mkOwnStateEvent(EventType.RoomPowerLevels, { users: { [selfUserId]: 100 } }, ""),
mkOwnEvent(EventType.RoomMessage, { body: "hello" }),
],
initial: true,
});
const roomContent = {
foo: "bar",
};
const roomType = "test";
ext.onResponse({
rooms: {
[roomId]: [
{
type: roomType,
content: roomContent,
},
],
},
});
const room = client.getRoom(roomId);
expect(room).toBeDefined();
const event = room.getAccountData(roomType);
expect(event).toBeDefined();
expect(event.getContent()).toEqual(roomContent);
});
it("doesn't crash for unknown room account data", async () => {
const unknownRoomId = "!unknown:id";
const roomType = "tester";
ext.onResponse({
rooms: {
[unknownRoomId]: [
{
type: roomType,
content: {
foo: "Bar",
},
},
],
},
});
const room = client.getRoom(unknownRoomId);
expect(room).toBeNull();
expect(client.getAccountData(roomType)).toBeUndefined();
});
it("can update push rules via account data", async () => {
const roomId = "!foo:bar";
const pushRulesContent: IPushRules = {
global: {
[PushRuleKind.RoomSpecific]: [{
enabled: true,
default: true,
pattern: "monkey",
actions: [
{
set_tweak: TweakName.Sound,
value: "default",
},
],
rule_id: roomId,
}],
},
};
let pushRule = client.getRoomPushRule("global", roomId);
expect(pushRule).toBeUndefined();
ext.onResponse({
global: [
{
type: EventType.PushRules,
content: pushRulesContent,
},
],
});
pushRule = client.getRoomPushRule("global", roomId);
expect(pushRule).toEqual(pushRulesContent.global[PushRuleKind.RoomSpecific][0]);
});
});
describe("ExtensionToDevice", () => {
let ext: Extension;
beforeAll(async () => {
await setupClient();
const hasSynced = sdk.sync();
await httpBackend.flushAllExpected();
await hasSynced;
ext = findExtension("to_device");
});
it("gets enabled with a limit on the initial request only", () => {
const reqJson: any = ext.onRequest(true);
expect(reqJson.enabled).toEqual(true);
expect(reqJson.limit).toBeGreaterThan(0);
expect(reqJson.since).toBeUndefined();
});
it("updates the since value", async () => {
ext.onResponse({
next_batch: "12345",
events: [],
});
expect(ext.onRequest(false)).toEqual({
since: "12345",
});
});
it("can handle missing fields", async () => {
ext.onResponse({
next_batch: "23456",
// no events array
});
});
it("emits to-device events on the client", async () => {
const toDeviceType = "custom_test";
const toDeviceContent = {
foo: "bar",
};
let called = false;
client.once(ClientEvent.ToDeviceEvent, (ev) => {
expect(ev.getContent()).toEqual(toDeviceContent);
expect(ev.getType()).toEqual(toDeviceType);
called = true;
});
ext.onResponse({
next_batch: "34567",
events: [
{
type: toDeviceType,
content: toDeviceContent,
},
],
});
expect(called).toBe(true);
});
it("can cancel key verification requests", async () => {
const seen: Record<string, boolean> = {};
client.on(ClientEvent.ToDeviceEvent, (ev) => {
const evType = ev.getType();
expect(seen[evType]).toBeFalsy();
seen[evType] = true;
if (evType === "m.key.verification.start" || evType === "m.key.verification.request") {
expect(ev.isCancelled()).toEqual(true);
} else {
expect(ev.isCancelled()).toEqual(false);
}
});
ext.onResponse({
next_batch: "45678",
events: [
// someone tries to verify keys
{
type: "m.key.verification.start",
content: {
transaction_id: "a",
},
},
{
type: "m.key.verification.request",
content: {
transaction_id: "a",
},
},
// then gives up
{
type: "m.key.verification.cancel",
content: {
transaction_id: "a",
},
},
],
});
});
});
});

View File

@ -0,0 +1,758 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// eslint-disable-next-line no-restricted-imports
import EventEmitter from "events";
import MockHttpBackend from "matrix-mock-request";
import { SlidingSync, SlidingSyncState, ExtensionState, SlidingSyncEvent } from "../../src/sliding-sync";
import { TestClient } from "../TestClient";
import { logger } from "../../src/logger";
import { MatrixClient } from "../../src";
import { sleep } from "../../src/utils";
/**
* Tests for sliding sync. These tests are broken down into sub-tests which are reliant upon one another.
* Each test suite (describe block) uses a single MatrixClient/HTTPBackend and a single SlidingSync class.
* Each test will call different functions on SlidingSync which may depend on state from previous tests.
*/
describe("SlidingSync", () => {
let client: MatrixClient = null;
let httpBackend: MockHttpBackend = null;
const selfUserId = "@alice:localhost";
const selfAccessToken = "aseukfgwef";
const proxyBaseUrl = "http://localhost:8008";
const syncUrl = proxyBaseUrl + "/_matrix/client/unstable/org.matrix.msc3575/sync";
// assign client/httpBackend globals
const setupClient = () => {
const testClient = new TestClient(selfUserId, "DEVICE", selfAccessToken);
httpBackend = testClient.httpBackend;
client = testClient.client;
};
// tear down client/httpBackend globals
const teardownClient = () => {
httpBackend.verifyNoOutstandingExpectation();
client.stopClient();
return httpBackend.stop();
};
describe("start/stop", () => {
beforeAll(setupClient);
afterAll(teardownClient);
let slidingSync: SlidingSync;
it("should start the sync loop upon calling start()", async () => {
slidingSync = new SlidingSync(proxyBaseUrl, [], {}, client, 1);
const fakeResp = {
pos: "a",
lists: [],
rooms: {},
extensions: {},
};
httpBackend.when("POST", syncUrl).respond(200, fakeResp);
const p = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state, resp, err) => {
expect(state).toEqual(SlidingSyncState.RequestFinished);
expect(resp).toEqual(fakeResp);
expect(err).toBeFalsy();
return true;
});
slidingSync.start();
await httpBackend.flushAllExpected();
await p;
});
it("should stop the sync loop upon calling stop()", () => {
slidingSync.stop();
httpBackend.verifyNoOutstandingExpectation();
});
});
describe("room subscriptions", () => {
beforeAll(setupClient);
afterAll(teardownClient);
const roomId = "!foo:bar";
const anotherRoomID = "!another:room";
let roomSubInfo = {
timeline_limit: 1,
required_state: [
["m.room.name", ""],
],
};
const wantRoomData = {
name: "foo bar",
required_state: [],
timeline: [],
};
let slidingSync: SlidingSync;
it("should be able to subscribe to a room", async () => {
// add the subscription
slidingSync = new SlidingSync(proxyBaseUrl, [], roomSubInfo, client, 1);
slidingSync.modifyRoomSubscriptions(new Set([roomId]));
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("room sub", body);
expect(body.room_subscriptions).toBeTruthy();
expect(body.room_subscriptions[roomId]).toEqual(roomSubInfo);
}).respond(200, {
pos: "a",
lists: [],
extensions: {},
rooms: {
[roomId]: wantRoomData,
},
});
const p = listenUntil(slidingSync, "SlidingSync.RoomData", (gotRoomId, gotRoomData) => {
expect(gotRoomId).toEqual(roomId);
expect(gotRoomData).toEqual(wantRoomData);
return true;
});
slidingSync.start();
await httpBackend.flushAllExpected();
await p;
});
it("should be possible to adjust room subscription info whilst syncing", async () => {
// listen for updated request
const newSubInfo = {
timeline_limit: 100,
required_state: [
["m.room.member", "*"],
],
};
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("adjusted sub", body);
expect(body.room_subscriptions).toBeTruthy();
expect(body.room_subscriptions[roomId]).toEqual(newSubInfo);
}).respond(200, {
pos: "a",
lists: [],
extensions: {},
rooms: {
[roomId]: wantRoomData,
},
});
const p = listenUntil(slidingSync, "SlidingSync.RoomData", (gotRoomId, gotRoomData) => {
expect(gotRoomId).toEqual(roomId);
expect(gotRoomData).toEqual(wantRoomData);
return true;
});
slidingSync.modifyRoomSubscriptionInfo(newSubInfo);
await httpBackend.flushAllExpected();
await p;
// need to set what the new subscription info is for subsequent tests
roomSubInfo = newSubInfo;
});
it("should be possible to add room subscriptions whilst syncing", async () => {
// listen for updated request
const anotherRoomData = {
name: "foo bar 2",
room_id: anotherRoomID,
// we should not fall over if fields are missing.
// required_state: [],
// timeline: [],
};
const anotherRoomDataFixed = {
name: anotherRoomData.name,
room_id: anotherRoomID,
required_state: [],
timeline: [],
};
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("new subs", body);
expect(body.room_subscriptions).toBeTruthy();
// only the new room is sent, the other is sticky
expect(body.room_subscriptions[anotherRoomID]).toEqual(roomSubInfo);
expect(body.room_subscriptions[roomId]).toBeUndefined();
}).respond(200, {
pos: "b",
lists: [],
extensions: {},
rooms: {
[anotherRoomID]: anotherRoomData,
},
});
const p = listenUntil(slidingSync, "SlidingSync.RoomData", (gotRoomId, gotRoomData) => {
expect(gotRoomId).toEqual(anotherRoomID);
expect(gotRoomData).toEqual(anotherRoomDataFixed);
return true;
});
const subs = slidingSync.getRoomSubscriptions();
subs.add(anotherRoomID);
slidingSync.modifyRoomSubscriptions(subs);
await httpBackend.flushAllExpected();
await p;
});
it("should be able to unsubscribe from a room", async () => {
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("unsub request", body);
expect(body.room_subscriptions).toBeFalsy();
expect(body.unsubscribe_rooms).toEqual([roomId]);
}).respond(200, {
pos: "b",
lists: [],
});
const p = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
// remove the subscription for the first room
slidingSync.modifyRoomSubscriptions(new Set([anotherRoomID]));
await httpBackend.flushAllExpected();
await p;
slidingSync.stop();
});
});
describe("lists", () => {
beforeAll(setupClient);
afterAll(teardownClient);
const roomA = "!a:localhost";
const roomB = "!b:localhost";
const roomC = "!c:localhost";
const rooms = {
[roomA]: {
name: "A",
required_state: [],
timeline: [],
},
[roomB]: {
name: "B",
required_state: [],
timeline: [],
},
[roomC]: {
name: "C",
required_state: [],
timeline: [],
},
};
const newRanges = [[0, 2], [3, 5]];
let slidingSync: SlidingSync;
it("should be possible to subscribe to a list", async () => {
// request first 3 rooms
const listReq = {
ranges: [[0, 2]],
sort: ["by_name"],
timeline_limit: 1,
required_state: [
["m.room.topic", ""],
],
filters: {
is_dm: true,
},
};
slidingSync = new SlidingSync(proxyBaseUrl, [listReq], {}, client, 1);
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("list", body);
expect(body.lists).toBeTruthy();
expect(body.lists[0]).toEqual(listReq);
}).respond(200, {
pos: "a",
lists: [{
count: 500,
ops: [{
op: "SYNC",
range: [0, 2],
room_ids: Object.keys(rooms),
}],
}],
rooms: rooms,
});
const listenerData = {};
const dataListener = (roomId, roomData) => {
expect(listenerData[roomId]).toBeFalsy();
listenerData[roomId] = roomData;
};
slidingSync.on(SlidingSyncEvent.RoomData, dataListener);
const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
slidingSync.start();
await httpBackend.flushAllExpected();
await responseProcessed;
expect(listenerData[roomA]).toEqual(rooms[roomA]);
expect(listenerData[roomB]).toEqual(rooms[roomB]);
expect(listenerData[roomC]).toEqual(rooms[roomC]);
expect(slidingSync.listLength()).toEqual(1);
slidingSync.off(SlidingSyncEvent.RoomData, dataListener);
});
it("should be possible to retrieve list data", () => {
expect(slidingSync.getList(0)).toBeDefined();
expect(slidingSync.getList(5)).toBeNull();
expect(slidingSync.getListData(5)).toBeNull();
const syncData = slidingSync.getListData(0);
expect(syncData.joinedCount).toEqual(500); // from previous test
expect(syncData.roomIndexToRoomId).toEqual({
0: roomA,
1: roomB,
2: roomC,
});
});
it("should be possible to adjust list ranges", async () => {
// modify the list ranges
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("next ranges", body.lists[0].ranges);
expect(body.lists).toBeTruthy();
expect(body.lists[0]).toEqual({
// only the ranges should be sent as the rest are unchanged and sticky
ranges: newRanges,
});
}).respond(200, {
pos: "b",
lists: [{
count: 500,
ops: [{
op: "SYNC",
range: [0, 2],
room_ids: Object.keys(rooms),
}],
}],
});
const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.RequestFinished;
});
slidingSync.setListRanges(0, newRanges);
await httpBackend.flushAllExpected();
await responseProcessed;
});
it("should be possible to add an extra list", async () => {
// add extra list
const extraListReq = {
ranges: [[0, 100]],
sort: ["by_name"],
filters: {
"is_dm": true,
},
};
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("extra list", body);
expect(body.lists).toBeTruthy();
expect(body.lists[0]).toEqual({
// only the ranges should be sent as the rest are unchanged and sticky
ranges: newRanges,
});
expect(body.lists[1]).toEqual(extraListReq);
}).respond(200, {
pos: "c",
lists: [
{
count: 500,
},
{
count: 50,
ops: [{
op: "SYNC",
range: [0, 2],
room_ids: Object.keys(rooms),
}],
},
],
});
listenUntil(slidingSync, "SlidingSync.List", (listIndex, joinedCount, roomIndexToRoomId) => {
expect(listIndex).toEqual(1);
expect(joinedCount).toEqual(50);
expect(roomIndexToRoomId).toEqual({
0: roomA,
1: roomB,
2: roomC,
});
return true;
});
const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
slidingSync.setList(1, extraListReq);
await httpBackend.flushAllExpected();
await responseProcessed;
});
it("should be possible to get list DELETE/INSERTs", async () => {
// move C (2) to A (0)
httpBackend.when("POST", syncUrl).respond(200, {
pos: "e",
lists: [{
count: 500,
ops: [{
op: "DELETE",
index: 2,
}, {
op: "INSERT",
index: 0,
room_id: roomC,
}],
},
{
count: 50,
}],
});
let listPromise = listenUntil(slidingSync, "SlidingSync.List",
(listIndex, joinedCount, roomIndexToRoomId) => {
expect(listIndex).toEqual(0);
expect(joinedCount).toEqual(500);
expect(roomIndexToRoomId).toEqual({
0: roomC,
1: roomA,
2: roomB,
});
return true;
});
let responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend.flushAllExpected();
await responseProcessed;
await listPromise;
// move C (0) back to A (2)
httpBackend.when("POST", syncUrl).respond(200, {
pos: "f",
lists: [{
count: 500,
ops: [{
op: "DELETE",
index: 0,
}, {
op: "INSERT",
index: 2,
room_id: roomC,
}],
},
{
count: 50,
}],
});
listPromise = listenUntil(slidingSync, "SlidingSync.List",
(listIndex, joinedCount, roomIndexToRoomId) => {
expect(listIndex).toEqual(0);
expect(joinedCount).toEqual(500);
expect(roomIndexToRoomId).toEqual({
0: roomA,
1: roomB,
2: roomC,
});
return true;
});
responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend.flushAllExpected();
await responseProcessed;
await listPromise;
});
it("should ignore invalid list indexes", async () => {
httpBackend.when("POST", syncUrl).respond(200, {
pos: "e",
lists: [{
count: 500,
ops: [{
op: "DELETE",
index: 2324324,
}],
},
{
count: 50,
}],
});
const listPromise = listenUntil(slidingSync, "SlidingSync.List",
(listIndex, joinedCount, roomIndexToRoomId) => {
expect(listIndex).toEqual(0);
expect(joinedCount).toEqual(500);
expect(roomIndexToRoomId).toEqual({
0: roomA,
1: roomB,
2: roomC,
});
return true;
});
const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend.flushAllExpected();
await responseProcessed;
await listPromise;
});
it("should be possible to update a list", async () => {
httpBackend.when("POST", syncUrl).respond(200, {
pos: "g",
lists: [{
count: 42,
ops: [
{
op: "INVALIDATE",
range: [0, 2],
},
{
op: "SYNC",
range: [0, 1],
room_ids: [roomB, roomC],
},
],
},
{
count: 50,
}],
});
// update the list with a new filter
slidingSync.setList(0, {
filters: {
is_encrypted: true,
},
ranges: [[0, 100]],
});
const listPromise = listenUntil(slidingSync, "SlidingSync.List",
(listIndex, joinedCount, roomIndexToRoomId) => {
expect(listIndex).toEqual(0);
expect(joinedCount).toEqual(42);
expect(roomIndexToRoomId).toEqual({
0: roomB,
1: roomC,
});
return true;
});
const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend.flushAllExpected();
await responseProcessed;
await listPromise;
slidingSync.stop();
});
});
describe("extensions", () => {
beforeAll(setupClient);
afterAll(teardownClient);
let slidingSync: SlidingSync;
const extReq = {
foo: "bar",
};
const extResp = {
baz: "quuz",
};
// Pre-extensions get called BEFORE processing the sync response
const preExtName = "foobar";
let onPreExtensionRequest;
let onPreExtensionResponse;
// Post-extensions get called AFTER processing the sync response
const postExtName = "foobar2";
let onPostExtensionRequest;
let onPostExtensionResponse;
const extPre = {
name: () => preExtName,
onRequest: (initial) => { return onPreExtensionRequest(initial); },
onResponse: (res) => { return onPreExtensionResponse(res); },
when: () => ExtensionState.PreProcess,
};
const extPost = {
name: () => postExtName,
onRequest: (initial) => { return onPostExtensionRequest(initial); },
onResponse: (res) => { return onPostExtensionResponse(res); },
when: () => ExtensionState.PostProcess,
};
it("should be able to register an extension", async () => {
slidingSync = new SlidingSync(proxyBaseUrl, [], {}, client, 1);
slidingSync.registerExtension(extPre);
const callbackOrder = [];
let extensionOnResponseCalled = false;
onPreExtensionRequest = () => {
return extReq;
};
onPreExtensionResponse = (resp) => {
extensionOnResponseCalled = true;
callbackOrder.push("onPreExtensionResponse");
expect(resp).toEqual(extResp);
};
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("ext req", body);
expect(body.extensions).toBeTruthy();
expect(body.extensions[preExtName]).toEqual(extReq);
}).respond(200, {
pos: "a",
ops: [],
counts: [],
extensions: {
[preExtName]: extResp,
},
});
const p = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state, resp, err) => {
if (state === SlidingSyncState.Complete) {
callbackOrder.push("Lifecycle");
return true;
}
});
slidingSync.start();
await httpBackend.flushAllExpected();
await p;
expect(extensionOnResponseCalled).toBe(true);
expect(callbackOrder).toEqual(["onPreExtensionResponse", "Lifecycle"]);
});
it("should be able to send nothing in an extension request/response", async () => {
onPreExtensionRequest = () => {
return undefined;
};
let responseCalled = false;
onPreExtensionResponse = (resp) => {
responseCalled = true;
};
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("ext req nothing", body);
expect(body.extensions).toBeTruthy();
expect(body.extensions[preExtName]).toBeUndefined();
}).respond(200, {
pos: "a",
ops: [],
counts: [],
extensions: {},
});
// we need to resend as sliding sync will already have a buffered request with the old
// extension values from the previous test.
slidingSync.resend();
const p = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state, resp, err) => {
return state === SlidingSyncState.Complete;
});
await httpBackend.flushAllExpected();
await p;
expect(responseCalled).toBe(false);
});
it("is possible to register extensions after start() has been called", async () => {
slidingSync.registerExtension(extPost);
onPostExtensionRequest = () => {
return extReq;
};
let responseCalled = false;
const callbackOrder = [];
onPostExtensionResponse = (resp) => {
expect(resp).toEqual(extResp);
responseCalled = true;
callbackOrder.push("onPostExtensionResponse");
};
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("ext req after start", body);
expect(body.extensions).toBeTruthy();
expect(body.extensions[preExtName]).toBeUndefined(); // from the earlier test
expect(body.extensions[postExtName]).toEqual(extReq);
}).respond(200, {
pos: "c",
ops: [],
counts: [],
extensions: {
[postExtName]: extResp,
},
});
// we need to resend as sliding sync will already have a buffered request with the old
// extension values from the previous test.
slidingSync.resend();
const p = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state, resp, err) => {
if (state === SlidingSyncState.Complete) {
callbackOrder.push("Lifecycle");
return true;
}
});
await httpBackend.flushAllExpected();
await p;
expect(responseCalled).toBe(true);
expect(callbackOrder).toEqual(["Lifecycle", "onPostExtensionResponse"]);
slidingSync.stop();
});
it("is not possible to register the same extension name twice", async () => {
slidingSync = new SlidingSync(proxyBaseUrl, [], {}, client, 1);
slidingSync.registerExtension(extPre);
expect(() => { slidingSync.registerExtension(extPre); }).toThrow();
});
});
});
async function timeout(delayMs: number, reason: string): Promise<never> {
await sleep(delayMs);
throw new Error(`timeout: ${delayMs}ms - ${reason}`);
}
/**
* Listen until a callback returns data.
* @param {EventEmitter} emitter The event emitter
* @param {string} eventName The event to listen for
* @param {function} callback The callback which will be invoked when events fire. Return something truthy from this to resolve the promise.
* @param {number} timeoutMs The number of milliseconds to wait for the callback to return data. Default: 500ms.
* @returns {Promise} A promise which will be resolved when the callback returns data. If the callback throws or the timeout is reached,
* the promise is rejected.
*/
function listenUntil<T>(
emitter: EventEmitter,
eventName: string,
callback: (...args: any[]) => T,
timeoutMs = 500,
): Promise<T> {
const trace = new Error().stack.split(`\n`)[2];
return Promise.race([new Promise<T>((resolve, reject) => {
const wrapper = (...args) => {
try {
const data = callback(...args);
if (data) {
emitter.off(eventName, wrapper);
resolve(data);
}
} catch (err) {
reject(err);
}
};
emitter.on(eventName, wrapper);
}), timeout(timeoutMs, "timed out waiting for event " + eventName + " " + trace)]);
}

View File

@ -361,6 +361,71 @@ describe("Room", function() {
expect(callCount).toEqual(2);
});
it("should be able to update local echo without a txn ID (/send then /sync)", function() {
const eventJson = utils.mkMessage({
room: roomId, user: userA, event: false,
}) as object;
delete eventJson["txn_id"];
delete eventJson["event_id"];
const localEvent = new MatrixEvent(Object.assign({ event_id: "$temp" }, eventJson));
localEvent.status = EventStatus.SENDING;
expect(localEvent.getTxnId()).toBeNull();
expect(room.timeline.length).toEqual(0);
// first add the local echo. This is done before the /send request is even sent.
const txnId = "My_txn_id";
room.addPendingEvent(localEvent, txnId);
expect(room.getEventForTxnId(txnId)).toEqual(localEvent);
expect(room.timeline.length).toEqual(1);
// now the /send request returns the true event ID.
const realEventId = "$real-event-id";
room.updatePendingEvent(localEvent, EventStatus.SENT, realEventId);
// then /sync returns the remoteEvent, it should de-dupe based on the event ID.
const remoteEvent = new MatrixEvent(Object.assign({ event_id: realEventId }, eventJson));
expect(remoteEvent.getTxnId()).toBeNull();
room.addLiveEvents([remoteEvent]);
// the duplicate strategy code should ensure we don't add a 2nd event to the live timeline
expect(room.timeline.length).toEqual(1);
// but without the event ID matching we will still have the local event in pending events
expect(room.getEventForTxnId(txnId)).toBeUndefined();
});
it("should be able to update local echo without a txn ID (/sync then /send)", function() {
const eventJson = utils.mkMessage({
room: roomId, user: userA, event: false,
}) as object;
delete eventJson["txn_id"];
delete eventJson["event_id"];
const txnId = "My_txn_id";
const localEvent = new MatrixEvent(Object.assign({ event_id: "$temp", txn_id: txnId }, eventJson));
localEvent.status = EventStatus.SENDING;
expect(localEvent.getTxnId()).toEqual(txnId);
expect(room.timeline.length).toEqual(0);
// first add the local echo. This is done before the /send request is even sent.
room.addPendingEvent(localEvent, txnId);
expect(room.getEventForTxnId(txnId)).toEqual(localEvent);
expect(room.timeline.length).toEqual(1);
// now the /sync returns the remoteEvent, it is impossible for the JS SDK to de-dupe this.
const realEventId = "$real-event-id";
const remoteEvent = new MatrixEvent(Object.assign({ event_id: realEventId }, eventJson));
expect(remoteEvent.getUnsigned().transaction_id).toBeUndefined();
room.addLiveEvents([remoteEvent]);
expect(room.timeline.length).toEqual(2); // impossible to de-dupe as no txn ID or matching event ID
// then the /send request returns the real event ID.
// Now it is possible for the JS SDK to de-dupe this.
room.updatePendingEvent(localEvent, EventStatus.SENT, realEventId);
// the 2nd event should be removed from the timeline.
expect(room.timeline.length).toEqual(1);
// but without the event ID matching we will still have the local event in pending events
expect(room.getEventForTxnId(txnId)).toBeUndefined();
});
});
describe('addEphemeralEvents', () => {

View File

@ -190,6 +190,8 @@ import { MediaHandler } from "./webrtc/mediaHandler";
import { IRefreshTokenResponse } from "./@types/auth";
import { TypedEventEmitter } from "./models/typed-event-emitter";
import { ReceiptType } from "./@types/read_receipts";
import { MSC3575SlidingSyncRequest, MSC3575SlidingSyncResponse, SlidingSync } from "./sliding-sync";
import { SlidingSyncSdk } from "./sliding-sync-sdk";
import { Thread, THREAD_RELATION_TYPE } from "./models/thread";
import { MBeaconInfoEventContent, M_BEACON_INFO } from "./@types/beacon";
@ -411,6 +413,11 @@ export interface IStartClientOpts {
* @experimental
*/
experimentalThreadSupport?: boolean;
/**
* @experimental
*/
slidingSync?: SlidingSync;
}
export interface IStoredClientOpts extends IStartClientOpts {
@ -903,7 +910,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
protected verificationMethods: VerificationMethod[];
protected fallbackICEServerAllowed = false;
protected roomList: RoomList;
protected syncApi: SyncApi;
protected syncApi: SlidingSyncSdk | SyncApi;
public pushRules: IPushRules;
protected syncLeftRoomsPromise: Promise<Room[]>;
protected syncedLeftRooms = false;
@ -1176,7 +1183,11 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
}
return this.canResetTimelineCallback(roomId);
};
if (this.clientOpts.slidingSync) {
this.syncApi = new SlidingSyncSdk(this.clientOpts.slidingSync, this, this.clientOpts);
} else {
this.syncApi = new SyncApi(this, this.clientOpts);
}
this.syncApi.sync();
if (this.clientOpts.clientWellKnownPollPeriod !== undefined) {
@ -5874,6 +5885,9 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
// There can be only room-kind push rule per room
// and its id is the room id.
if (this.pushRules) {
if (!this.pushRules[scope] || !this.pushRules[scope].room) {
return;
}
for (let i = 0; i < this.pushRules[scope].room.length; i++) {
const rule = this.pushRules[scope].room[i];
if (rule.rule_id === roomId) {
@ -8908,6 +8922,41 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
return new MSC3089TreeSpace(this, roomId);
}
/**
* Perform a single MSC3575 sliding sync request.
* @param {MSC3575SlidingSyncRequest} req The request to make.
* @param {string} proxyBaseUrl The base URL for the sliding sync proxy.
* @returns {MSC3575SlidingSyncResponse} The sliding sync response, or a standard error.
* @throws on non 2xx status codes with an object with a field "httpStatus":number.
*/
public slidingSync(
req: MSC3575SlidingSyncRequest, proxyBaseUrl?: string,
): IAbortablePromise<MSC3575SlidingSyncResponse> {
const qps: Record<string, any> = {};
if (req.pos) {
qps.pos = req.pos;
delete req.pos;
}
if (req.timeout) {
qps.timeout = req.timeout;
delete req.timeout;
}
const clientTimeout = req.clientTimeout;
delete req.clientTimeout;
return this.http.authedRequest<MSC3575SlidingSyncResponse>(
undefined,
Method.Post,
"/sync",
qps,
req,
{
prefix: "/_matrix/client/unstable/org.matrix.msc3575",
baseUrl: proxyBaseUrl,
localTimeoutMs: clientTimeout,
},
);
}
/**
* @experimental
*/

View File

@ -111,6 +111,7 @@ interface IRequest extends _Request {
interface IRequestOpts<T> {
prefix?: string;
baseUrl?: string;
localTimeoutMs?: number;
headers?: Record<string, string>;
json?: boolean; // defaults to true
@ -576,6 +577,9 @@ export class MatrixHttpApi {
* @param {string=} opts.prefix The full prefix to use e.g.
* "/_matrix/client/v2_alpha". If not specified, uses this.opts.prefix.
*
* @param {string=} opts.baseUrl The alternative base url to use.
* If not specified, uses this.opts.baseUrl
*
* @param {Object=} opts.headers map of additional request headers
*
* @return {Promise} Resolves to <code>{data: {Object},
@ -671,7 +675,8 @@ export class MatrixHttpApi {
opts?: O,
): IAbortablePromise<ResponseType<T, O>> {
const prefix = opts?.prefix ?? this.opts.prefix;
const fullUri = this.opts.baseUrl + prefix + path;
const baseUrl = opts?.baseUrl ?? this.opts.baseUrl;
const fullUri = baseUrl + prefix + path;
return this.requestOtherUrl<T, O>(callback, method, fullUri, queryParams, data, opts);
}

View File

@ -370,7 +370,6 @@ export class RoomState extends TypedEventEmitter<EmittedEvents, EventHandlerMap>
});
this.onBeaconLivenessChange();
// update higher level data structures. This needs to be done AFTER the
// core event dict as these structures may depend on other state events in
// the given array (e.g. disambiguating display names in one go to do both
@ -401,7 +400,6 @@ export class RoomState extends TypedEventEmitter<EmittedEvents, EventHandlerMap>
const member = this.getOrCreateMember(userId, event);
member.setMembershipEvent(event, this);
this.updateMember(member);
this.emit(RoomStateEvent.Members, event, this, member);
} else if (event.getType() === EventType.RoomPowerLevels) {

View File

@ -1917,6 +1917,27 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
// If any pending visibility change is waiting for this (older) event,
this.applyPendingVisibilityEvents(event);
// Sliding Sync modifications:
// The proxy cannot guarantee every sent event will have a transaction_id field, so we need
// to check the event ID against the list of pending events if there is no transaction ID
// field. Only do this for events sent by us though as it's potentially expensive to loop
// the pending events map.
const txnId = event.getUnsigned().transaction_id;
if (!txnId && event.getSender() === this.myUserId) {
// check the txn map for a matching event ID
for (const tid in this.txnToEvent) {
const localEvent = this.txnToEvent[tid];
if (localEvent.getId() === event.getId()) {
logger.debug("processLiveEvent: found sent event without txn ID: ", tid, event.getId());
// update the unsigned field so we can re-use the same codepaths
const unsigned = event.getUnsigned();
unsigned.transaction_id = tid;
event.setUnsigned(unsigned);
break;
}
}
}
if (event.getUnsigned().transaction_id) {
const existingEvent = this.txnToEvent[event.getUnsigned().transaction_id];
if (existingEvent) {
@ -2173,7 +2194,22 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
const timeline = this.getTimelineForEvent(newEventId);
if (timeline) {
// we've already received the event via the event stream.
// nothing more to do here.
// nothing more to do here, assuming the transaction ID was correctly matched.
// Let's check that.
const remoteEvent = this.findEventById(newEventId);
const remoteTxnId = remoteEvent.getUnsigned().transaction_id;
if (!remoteTxnId) {
// This code path is mostly relevant for the Sliding Sync proxy.
// The remote event did not contain a transaction ID, so we did not handle
// the remote echo yet. Handle it now.
const unsigned = remoteEvent.getUnsigned();
unsigned.transaction_id = event.getTxnId();
remoteEvent.setUnsigned(unsigned);
// the remote event is _already_ in the timeline, so we need to remove it so
// we can convert the local event into the final event.
this.removeEvent(remoteEvent.getId());
this.handleRemoteEcho(remoteEvent, event);
}
return;
}
}

835
src/sliding-sync-sdk.ts Normal file
View File

@ -0,0 +1,835 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { NotificationCountType, Room, RoomEvent } from "./models/room";
import { logger } from './logger';
import * as utils from "./utils";
import { EventTimeline } from "./models/event-timeline";
import { ClientEvent, IStoredClientOpts, MatrixClient, PendingEventOrdering } from "./client";
import { ISyncStateData, SyncState } from "./sync";
import { MatrixEvent } from "./models/event";
import { Crypto } from "./crypto";
import { IMinimalEvent, IRoomEvent, IStateEvent, IStrippedState } from "./sync-accumulator";
import { MatrixError } from "./http-api";
import { RoomStateEvent } from "./models/room-state";
import { RoomMemberEvent } from "./models/room-member";
import {
Extension,
ExtensionState,
MSC3575RoomData,
MSC3575SlidingSyncResponse,
SlidingSync,
SlidingSyncEvent,
SlidingSyncState,
} from "./sliding-sync";
import { EventType, IPushRules } from "./matrix";
import { PushProcessor } from "./pushprocessor";
// Number of consecutive failed syncs that will lead to a syncState of ERROR as opposed
// to RECONNECTING. This is needed to inform the client of server issues when the
// keepAlive is successful but the server /sync fails.
const FAILED_SYNC_ERROR_THRESHOLD = 3;
class ExtensionE2EE implements Extension {
constructor(private readonly crypto: Crypto) {}
public name(): string {
return "e2ee";
}
public when(): ExtensionState {
return ExtensionState.PreProcess;
}
public onRequest(isInitial: boolean): object {
if (!isInitial) {
return undefined;
}
return {
enabled: true, // this is sticky so only send it on the initial request
};
}
public async onResponse(data: object): Promise<void> {
// Handle device list updates
if (data["device_lists"]) {
await this.crypto.handleDeviceListChanges({
oldSyncToken: "yep", // XXX need to do this so the device list changes get processed :(
}, data["device_lists"]);
}
// Handle one_time_keys_count
if (data["device_one_time_keys_count"]) {
const currentCount = data["device_one_time_keys_count"].signed_curve25519 || 0;
this.crypto.updateOneTimeKeyCount(currentCount);
}
if (data["device_unused_fallback_key_types"] ||
data["org.matrix.msc2732.device_unused_fallback_key_types"]) {
// The presence of device_unused_fallback_key_types indicates that the
// server supports fallback keys. If there's no unused
// signed_curve25519 fallback key we need a new one.
const unusedFallbackKeys = data["device_unused_fallback_key_types"] ||
data["org.matrix.msc2732.device_unused_fallback_key_types"];
this.crypto.setNeedsNewFallback(
unusedFallbackKeys instanceof Array &&
!unusedFallbackKeys.includes("signed_curve25519"),
);
}
}
}
class ExtensionToDevice implements Extension {
private nextBatch?: string = null;
constructor(private readonly client: MatrixClient) {}
public name(): string {
return "to_device";
}
public when(): ExtensionState {
return ExtensionState.PreProcess;
}
public onRequest(isInitial: boolean): object {
const extReq = {
since: this.nextBatch !== null ? this.nextBatch : undefined,
};
if (isInitial) {
extReq["limit"] = 100;
extReq["enabled"] = true;
}
return extReq;
}
public async onResponse(data: object): Promise<void> {
const cancelledKeyVerificationTxns = [];
data["events"] = data["events"] || [];
data["events"]
.map(this.client.getEventMapper())
.map((toDeviceEvent) => { // map is a cheap inline forEach
// We want to flag m.key.verification.start events as cancelled
// if there's an accompanying m.key.verification.cancel event, so
// we pull out the transaction IDs from the cancellation events
// so we can flag the verification events as cancelled in the loop
// below.
if (toDeviceEvent.getType() === "m.key.verification.cancel") {
const txnId = toDeviceEvent.getContent()['transaction_id'];
if (txnId) {
cancelledKeyVerificationTxns.push(txnId);
}
}
// as mentioned above, .map is a cheap inline forEach, so return
// the unmodified event.
return toDeviceEvent;
})
.forEach(
(toDeviceEvent) => {
const content = toDeviceEvent.getContent();
if (
toDeviceEvent.getType() == "m.room.message" &&
content.msgtype == "m.bad.encrypted"
) {
// the mapper already logged a warning.
logger.log(
'Ignoring undecryptable to-device event from ' +
toDeviceEvent.getSender(),
);
return;
}
if (toDeviceEvent.getType() === "m.key.verification.start"
|| toDeviceEvent.getType() === "m.key.verification.request") {
const txnId = content['transaction_id'];
if (cancelledKeyVerificationTxns.includes(txnId)) {
toDeviceEvent.flagCancelled();
}
}
this.client.emit(ClientEvent.ToDeviceEvent, toDeviceEvent);
},
);
this.nextBatch = data["next_batch"];
}
}
class ExtensionAccountData implements Extension {
constructor(private readonly client: MatrixClient) {}
public name(): string {
return "account_data";
}
public when(): ExtensionState {
return ExtensionState.PostProcess;
}
public onRequest(isInitial: boolean): object {
if (!isInitial) {
return undefined;
}
return {
enabled: true,
};
}
public onResponse(data: {global: object[], rooms: Record<string, object[]>}): void {
if (data.global && data.global.length > 0) {
this.processGlobalAccountData(data.global);
}
for (const roomId in data.rooms) {
const accountDataEvents = mapEvents(this.client, roomId, data.rooms[roomId]);
const room = this.client.getRoom(roomId);
if (!room) {
logger.warn("got account data for room but room doesn't exist on client:", roomId);
continue;
}
room.addAccountData(accountDataEvents);
accountDataEvents.forEach((e) => {
this.client.emit(ClientEvent.Event, e);
});
}
}
private processGlobalAccountData(globalAccountData: object[]): void {
const events = mapEvents(this.client, undefined, globalAccountData);
const prevEventsMap = events.reduce((m, c) => {
m[c.getId()] = this.client.store.getAccountData(c.getType());
return m;
}, {});
this.client.store.storeAccountDataEvents(events);
events.forEach(
(accountDataEvent) => {
// Honour push rules that come down the sync stream but also
// honour push rules that were previously cached. Base rules
// will be updated when we receive push rules via getPushRules
// (see sync) before syncing over the network.
if (accountDataEvent.getType() === EventType.PushRules) {
const rules = accountDataEvent.getContent<IPushRules>();
this.client.pushRules = PushProcessor.rewriteDefaultRules(rules);
}
const prevEvent = prevEventsMap[accountDataEvent.getId()];
this.client.emit(ClientEvent.AccountData, accountDataEvent, prevEvent);
return accountDataEvent;
},
);
}
}
/**
* A copy of SyncApi such that it can be used as a drop-in replacement for sync v2. For the actual
* sliding sync API, see sliding-sync.ts or the class SlidingSync.
*/
export class SlidingSyncSdk {
private syncState: SyncState = null;
private syncStateData: ISyncStateData;
private lastPos: string = null;
private failCount = 0;
private notifEvents: MatrixEvent[] = []; // accumulator of sync events in the current sync response
constructor(
private readonly slidingSync: SlidingSync,
private readonly client: MatrixClient,
private readonly opts: Partial<IStoredClientOpts> = {},
) {
this.opts.initialSyncLimit = this.opts.initialSyncLimit ?? 8;
this.opts.resolveInvitesToProfiles = this.opts.resolveInvitesToProfiles || false;
this.opts.pollTimeout = this.opts.pollTimeout || (30 * 1000);
this.opts.pendingEventOrdering = this.opts.pendingEventOrdering || PendingEventOrdering.Chronological;
this.opts.experimentalThreadSupport = this.opts.experimentalThreadSupport === true;
if (!opts.canResetEntireTimeline) {
opts.canResetEntireTimeline = (_roomId: string) => {
return false;
};
}
if (client.getNotifTimelineSet()) {
client.reEmitter.reEmit(client.getNotifTimelineSet(), [
RoomEvent.Timeline,
RoomEvent.TimelineReset,
]);
}
this.slidingSync.on(SlidingSyncEvent.Lifecycle, this.onLifecycle.bind(this));
this.slidingSync.on(SlidingSyncEvent.RoomData, this.onRoomData.bind(this));
const extensions: Extension[] = [
new ExtensionToDevice(this.client),
new ExtensionAccountData(this.client),
];
if (this.opts.crypto) {
extensions.push(
new ExtensionE2EE(this.opts.crypto),
);
}
extensions.forEach((ext) => {
this.slidingSync.registerExtension(ext);
});
}
private onRoomData(roomId: string, roomData: MSC3575RoomData): void {
let room = this.client.store.getRoom(roomId);
if (!room) {
if (!roomData.initial) {
logger.debug("initial flag not set but no stored room exists for room ", roomId, roomData);
return;
}
room = createRoom(this.client, roomId, this.opts);
}
this.processRoomData(this.client, room, roomData);
}
private onLifecycle(state: SlidingSyncState, resp: MSC3575SlidingSyncResponse, err?: Error): void {
if (err) {
logger.debug("onLifecycle", state, err);
}
switch (state) {
case SlidingSyncState.Complete:
this.purgeNotifications();
// Element won't stop showing the initial loading spinner unless we fire SyncState.Prepared
if (!this.lastPos) {
this.updateSyncState(SyncState.Prepared, {
oldSyncToken: this.lastPos,
nextSyncToken: resp.pos,
catchingUp: false,
fromCache: false,
});
}
// Conversely, Element won't show the room list unless there is at least 1x SyncState.Syncing
// so hence for the very first sync we will fire prepared then immediately syncing.
this.updateSyncState(SyncState.Syncing, {
oldSyncToken: this.lastPos,
nextSyncToken: resp.pos,
catchingUp: false,
fromCache: false,
});
this.lastPos = resp.pos;
break;
case SlidingSyncState.RequestFinished:
if (err) {
this.failCount += 1;
this.updateSyncState(
this.failCount > FAILED_SYNC_ERROR_THRESHOLD ? SyncState.Error : SyncState.Reconnecting,
{
error: new MatrixError(err),
},
);
if (this.shouldAbortSync(new MatrixError(err))) {
return; // shouldAbortSync actually stops syncing too so we don't need to do anything.
}
} else {
this.failCount = 0;
}
break;
}
}
/**
* Sync rooms the user has left.
* @return {Promise} Resolved when they've been added to the store.
*/
public async syncLeftRooms() {
return []; // TODO
}
/**
* Peek into a room. This will result in the room in question being synced so it
* is accessible via getRooms(). Live updates for the room will be provided.
* @param {string} roomId The room ID to peek into.
* @return {Promise} A promise which resolves once the room has been added to the
* store.
*/
public async peek(_roomId: string): Promise<Room> {
return null; // TODO
}
/**
* Stop polling for updates in the peeked room. NOPs if there is no room being
* peeked.
*/
public stopPeeking(): void {
// TODO
}
/**
* Returns the current state of this sync object
* @see module:client~MatrixClient#event:"sync"
* @return {?String}
*/
public getSyncState(): SyncState {
return this.syncState;
}
/**
* Returns the additional data object associated with
* the current sync state, or null if there is no
* such data.
* Sync errors, if available, are put in the 'error' key of
* this object.
* @return {?Object}
*/
public getSyncStateData(): ISyncStateData {
return this.syncStateData;
}
private shouldAbortSync(error: MatrixError): boolean {
if (error.errcode === "M_UNKNOWN_TOKEN") {
// The logout already happened, we just need to stop.
logger.warn("Token no longer valid - assuming logout");
this.stop();
this.updateSyncState(SyncState.Error, { error });
return true;
}
return false;
}
private async processRoomData(client: MatrixClient, room: Room, roomData: MSC3575RoomData) {
roomData = ensureNameEvent(client, room.roomId, roomData);
const stateEvents = mapEvents(this.client, room.roomId, roomData.required_state);
// Prevent events from being decrypted ahead of time
// this helps large account to speed up faster
// room::decryptCriticalEvent is in charge of decrypting all the events
// required for a client to function properly
const timelineEvents = mapEvents(this.client, room.roomId, roomData.timeline, false);
const ephemeralEvents = []; // TODO this.mapSyncEventsFormat(joinObj.ephemeral);
const encrypted = this.client.isRoomEncrypted(room.roomId);
// we do this first so it's correct when any of the events fire
if (roomData.notification_count != null) {
room.setUnreadNotificationCount(
NotificationCountType.Total,
roomData.notification_count,
);
}
if (roomData.highlight_count != null) {
// We track unread notifications ourselves in encrypted rooms, so don't
// bother setting it here. We trust our calculations better than the
// server's for this case, and therefore will assume that our non-zero
// count is accurate.
if (!encrypted
|| (encrypted && room.getUnreadNotificationCount(NotificationCountType.Highlight) <= 0)) {
room.setUnreadNotificationCount(
NotificationCountType.Highlight,
roomData.highlight_count,
);
}
}
if (roomData.invite_state) {
const inviteStateEvents = mapEvents(this.client, room.roomId, roomData.invite_state);
this.processRoomEvents(room, inviteStateEvents);
if (roomData.initial) {
room.recalculate();
this.client.store.storeRoom(room);
this.client.emit(ClientEvent.Room, room);
}
inviteStateEvents.forEach((e) => {
this.client.emit(ClientEvent.Event, e);
});
room.updateMyMembership("invite");
return;
}
if (roomData.initial) {
// set the back-pagination token. Do this *before* adding any
// events so that clients can start back-paginating.
room.getLiveTimeline().setPaginationToken(
roomData.prev_batch, EventTimeline.BACKWARDS);
}
/* TODO
else if (roomData.limited) {
let limited = true;
// we've got a limited sync, so we *probably* have a gap in the
// timeline, so should reset. But we might have been peeking or
// paginating and already have some of the events, in which
// case we just want to append any subsequent events to the end
// of the existing timeline.
//
// This is particularly important in the case that we already have
// *all* of the events in the timeline - in that case, if we reset
// the timeline, we'll end up with an entirely empty timeline,
// which we'll try to paginate but not get any new events (which
// will stop us linking the empty timeline into the chain).
//
for (let i = timelineEvents.length - 1; i >= 0; i--) {
const eventId = timelineEvents[i].getId();
if (room.getTimelineForEvent(eventId)) {
logger.debug("Already have event " + eventId + " in limited " +
"sync - not resetting");
limited = false;
// we might still be missing some of the events before i;
// we don't want to be adding them to the end of the
// timeline because that would put them out of order.
timelineEvents.splice(0, i);
// XXX: there's a problem here if the skipped part of the
// timeline modifies the state set in stateEvents, because
// we'll end up using the state from stateEvents rather
// than the later state from timelineEvents. We probably
// need to wind stateEvents forward over the events we're
// skipping.
break;
}
}
if (limited) {
deregisterStateListeners(room);
room.resetLiveTimeline(
roomData.prev_batch,
null, // TODO this.opts.canResetEntireTimeline(room.roomId) ? null : syncEventData.oldSyncToken,
);
// We have to assume any gap in any timeline is
// reason to stop incrementally tracking notifications and
// reset the timeline.
this.client.resetNotifTimelineSet();
registerStateListeners(this.client, room);
}
} */
this.processRoomEvents(room, stateEvents, timelineEvents, false);
// we deliberately don't add ephemeral events to the timeline
room.addEphemeralEvents(ephemeralEvents);
room.recalculate();
if (roomData.initial) {
client.store.storeRoom(room);
client.emit(ClientEvent.Room, room);
}
// check if any timeline events should bing and add them to the notifEvents array:
// we'll purge this once we've fully processed the sync response
this.addNotifications(timelineEvents);
const processRoomEvent = async (e: MatrixEvent) => {
client.emit(ClientEvent.Event, e);
if (e.isState() && e.getType() == EventType.RoomEncryption && this.opts.crypto) {
await this.opts.crypto.onCryptoEvent(e);
}
};
await utils.promiseMapSeries(stateEvents, processRoomEvent);
await utils.promiseMapSeries(timelineEvents, processRoomEvent);
ephemeralEvents.forEach(function(e) {
client.emit(ClientEvent.Event, e);
});
room.updateMyMembership("join");
// Decrypt only the last message in all rooms to make sure we can generate a preview
// And decrypt all events after the recorded read receipt to ensure an accurate
// notification count
room.decryptCriticalEvents();
}
/**
* @param {Room} room
* @param {MatrixEvent[]} stateEventList A list of state events. This is the state
* at the *START* of the timeline list if it is supplied.
* @param {MatrixEvent[]} [timelineEventList] A list of timeline events. Lower index
* @param {boolean} fromCache whether the sync response came from cache
* is earlier in time. Higher index is later.
*/
private processRoomEvents(
room: Room,
stateEventList: MatrixEvent[],
timelineEventList?: MatrixEvent[],
fromCache = false,
): void {
timelineEventList = timelineEventList || [];
stateEventList = stateEventList || [];
// If there are no events in the timeline yet, initialise it with
// the given state events
const liveTimeline = room.getLiveTimeline();
const timelineWasEmpty = liveTimeline.getEvents().length == 0;
if (timelineWasEmpty) {
// Passing these events into initialiseState will freeze them, so we need
// to compute and cache the push actions for them now, otherwise sync dies
// with an attempt to assign to read only property.
// XXX: This is pretty horrible and is assuming all sorts of behaviour from
// these functions that it shouldn't be. We should probably either store the
// push actions cache elsewhere so we can freeze MatrixEvents, or otherwise
// find some solution where MatrixEvents are immutable but allow for a cache
// field.
for (const ev of stateEventList) {
this.client.getPushActionsForEvent(ev);
}
liveTimeline.initialiseState(stateEventList);
}
// If the timeline wasn't empty, we process the state events here: they're
// defined as updates to the state before the start of the timeline, so this
// starts to roll the state forward.
// XXX: That's what we *should* do, but this can happen if we were previously
// peeking in a room, in which case we obviously do *not* want to add the
// state events here onto the end of the timeline. Historically, the js-sdk
// has just set these new state events on the old and new state. This seems
// very wrong because there could be events in the timeline that diverge the
// state, in which case this is going to leave things out of sync. However,
// for now I think it;s best to behave the same as the code has done previously.
if (!timelineWasEmpty) {
// XXX: As above, don't do this...
//room.addLiveEvents(stateEventList || []);
// Do this instead...
room.oldState.setStateEvents(stateEventList);
room.currentState.setStateEvents(stateEventList);
}
// execute the timeline events. This will continue to diverge the current state
// if the timeline has any state events in it.
// This also needs to be done before running push rules on the events as they need
// to be decorated with sender etc.
room.addLiveEvents(timelineEventList, {
fromCache: fromCache,
});
room.recalculate();
// resolve invites now we have set the latest state
this.resolveInvites(room);
}
private resolveInvites(room: Room): void {
if (!room || !this.opts.resolveInvitesToProfiles) {
return;
}
const client = this.client;
// For each invited room member we want to give them a displayname/avatar url
// if they have one (the m.room.member invites don't contain this).
room.getMembersWithMembership("invite").forEach(function(member) {
if (member._requestedProfileInfo) return;
member._requestedProfileInfo = true;
// try to get a cached copy first.
const user = client.getUser(member.userId);
let promise;
if (user) {
promise = Promise.resolve({
avatar_url: user.avatarUrl,
displayname: user.displayName,
});
} else {
promise = client.getProfileInfo(member.userId);
}
promise.then(function(info) {
// slightly naughty by doctoring the invite event but this means all
// the code paths remain the same between invite/join display name stuff
// which is a worthy trade-off for some minor pollution.
const inviteEvent = member.events.member;
if (inviteEvent.getContent().membership !== "invite") {
// between resolving and now they have since joined, so don't clobber
return;
}
inviteEvent.getContent().avatar_url = info.avatar_url;
inviteEvent.getContent().displayname = info.displayname;
// fire listeners
member.setMembershipEvent(inviteEvent, room.currentState);
}, function(_err) {
// OH WELL.
});
});
}
public retryImmediately(): boolean {
return true;
}
/**
* Main entry point. Blocks until stop() is called.
*/
public async sync() {
logger.debug("Sliding sync init loop");
// 1) We need to get push rules so we can check if events should bing as we get
// them from /sync.
while (!this.client.isGuest()) {
try {
logger.debug("Getting push rules...");
const result = await this.client.getPushRules();
logger.debug("Got push rules");
this.client.pushRules = result;
break;
} catch (err) {
logger.error("Getting push rules failed", err);
if (this.shouldAbortSync(err)) {
return;
}
}
}
// start syncing
await this.slidingSync.start();
}
/**
* Stops the sync object from syncing.
*/
public stop(): void {
logger.debug("SyncApi.stop");
this.slidingSync.stop();
}
/**
* Sets the sync state and emits an event to say so
* @param {String} newState The new state string
* @param {Object} data Object of additional data to emit in the event
*/
private updateSyncState(newState: SyncState, data?: ISyncStateData): void {
const old = this.syncState;
this.syncState = newState;
this.syncStateData = data;
this.client.emit(ClientEvent.Sync, this.syncState, old, data);
}
/**
* Takes a list of timelineEvents and adds and adds to notifEvents
* as appropriate.
* This must be called after the room the events belong to has been stored.
*
* @param {MatrixEvent[]} [timelineEventList] A list of timeline events. Lower index
* is earlier in time. Higher index is later.
*/
private addNotifications(timelineEventList: MatrixEvent[]): void {
// gather our notifications into this.notifEvents
if (!this.client.getNotifTimelineSet()) {
return;
}
for (const timelineEvent of timelineEventList) {
const pushActions = this.client.getPushActionsForEvent(timelineEvent);
if (pushActions && pushActions.notify &&
pushActions.tweaks && pushActions.tweaks.highlight) {
this.notifEvents.push(timelineEvent);
}
}
}
/**
* Purge any events in the notifEvents array. Used after a /sync has been complete.
* This should not be called at a per-room scope (e.g in onRoomData) because otherwise the ordering
* will be messed up e.g room A gets a bing, room B gets a newer bing, but both in the same /sync
* response. If we purge at a per-room scope then we could process room B before room A leading to
* room B appearing earlier in the notifications timeline, even though it has the higher origin_server_ts.
*/
private purgeNotifications(): void {
this.notifEvents.sort(function(a, b) {
return a.getTs() - b.getTs();
});
this.notifEvents.forEach((event) => {
this.client.getNotifTimelineSet().addLiveEvent(event);
});
this.notifEvents = [];
}
}
function ensureNameEvent(client: MatrixClient, roomId: string, roomData: MSC3575RoomData): MSC3575RoomData {
// make sure m.room.name is in required_state if there is a name, replacing anything previously
// there if need be. This ensures clients transparently 'calculate' the right room name. Native
// sliding sync clients should just read the "name" field.
if (!roomData.name) {
return roomData;
}
for (const stateEvent of roomData.required_state) {
if (stateEvent.type === EventType.RoomName && stateEvent.state_key === "") {
stateEvent.content = {
name: roomData.name,
};
return roomData;
}
}
roomData.required_state.push({
event_id: "$fake-sliding-sync-name-event-" + roomId,
state_key: "",
type: EventType.RoomName,
content: {
name: roomData.name,
},
sender: client.getUserId(),
origin_server_ts: new Date().getTime(),
});
return roomData;
}
// Helper functions which set up JS SDK structs are below and are identical to the sync v2 counterparts,
// just outside the class.
function createRoom(client: MatrixClient, roomId: string, opts: Partial<IStoredClientOpts>): Room { // XXX cargoculted from sync.ts
const { timelineSupport } = client;
const room = new Room(roomId, client, client.getUserId(), {
lazyLoadMembers: opts.lazyLoadMembers,
pendingEventOrdering: opts.pendingEventOrdering,
timelineSupport,
});
client.reEmitter.reEmit(room, [
RoomEvent.Name,
RoomEvent.Redaction,
RoomEvent.RedactionCancelled,
RoomEvent.Receipt,
RoomEvent.Tags,
RoomEvent.LocalEchoUpdated,
RoomEvent.AccountData,
RoomEvent.MyMembership,
RoomEvent.Timeline,
RoomEvent.TimelineReset,
]);
registerStateListeners(client, room);
return room;
}
function registerStateListeners(client: MatrixClient, room: Room): void { // XXX cargoculted from sync.ts
// we need to also re-emit room state and room member events, so hook it up
// to the client now. We need to add a listener for RoomState.members in
// order to hook them correctly.
client.reEmitter.reEmit(room.currentState, [
RoomStateEvent.Events,
RoomStateEvent.Members,
RoomStateEvent.NewMember,
RoomStateEvent.Update,
]);
room.currentState.on(RoomStateEvent.NewMember, function(event, state, member) {
member.user = client.getUser(member.userId);
client.reEmitter.reEmit(member, [
RoomMemberEvent.Name,
RoomMemberEvent.Typing,
RoomMemberEvent.PowerLevel,
RoomMemberEvent.Membership,
]);
});
}
/*
function deregisterStateListeners(room: Room): void { // XXX cargoculted from sync.ts
// could do with a better way of achieving this.
room.currentState.removeAllListeners(RoomStateEvent.Events);
room.currentState.removeAllListeners(RoomStateEvent.Members);
room.currentState.removeAllListeners(RoomStateEvent.NewMember);
} */
function mapEvents(client: MatrixClient, roomId: string, events: object[], decrypt = true): MatrixEvent[] {
const mapper = client.getEventMapper({ decrypt });
return (events as Array<IStrippedState | IRoomEvent | IStateEvent | IMinimalEvent>).map(function(e) {
e["room_id"] = roomId;
return mapper(e);
});
}

757
src/sliding-sync.ts Normal file
View File

@ -0,0 +1,757 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { logger } from './logger';
import { IAbortablePromise } from "./@types/partials";
import { MatrixClient } from "./client";
import { IRoomEvent, IStateEvent } from "./sync-accumulator";
import { TypedEventEmitter } from "./models//typed-event-emitter";
import { sleep } from "./utils";
// /sync requests allow you to set a timeout= but the request may continue
// beyond that and wedge forever, so we need to track how long we are willing
// to keep open the connection. This constant is *ADDED* to the timeout= value
// to determine the max time we're willing to wait.
const BUFFER_PERIOD_MS = 10 * 1000;
/**
* Represents a subscription to a room or set of rooms. Controls which events are returned.
*/
export interface MSC3575RoomSubscription {
required_state?: string[][];
timeline_limit?: number;
}
/**
* Controls which rooms are returned in a given list.
*/
export interface MSC3575Filter {
is_dm?: boolean;
is_encrypted?: boolean;
is_invite?: boolean;
is_tombstoned?: boolean;
room_name_like?: string;
}
/**
* Represents a list subscription.
*/
export interface MSC3575List extends MSC3575RoomSubscription {
ranges: number[][];
sort?: string[];
filters?: MSC3575Filter;
slow_get_all_rooms?: boolean;
}
/**
* A complete Sliding Sync request.
*/
export interface MSC3575SlidingSyncRequest {
// json body params
lists?: MSC3575List[];
unsubscribe_rooms?: string[];
room_subscriptions?: Record<string, MSC3575RoomSubscription>;
extensions?: object;
// query params
pos?: string;
timeout?: number;
clientTimeout?: number;
}
export interface MSC3575RoomData {
name: string;
required_state: IStateEvent[];
timeline: (IRoomEvent | IStateEvent)[];
notification_count?: number;
highlight_count?: number;
invite_state?: IStateEvent[];
initial?: boolean;
limited?: boolean;
is_dm?: boolean;
prev_batch?: string;
}
interface ListResponse {
count: number;
ops: Operation[];
}
interface BaseOperation {
op: string;
}
interface DeleteOperation extends BaseOperation {
op: "DELETE";
index: number;
}
interface InsertOperation extends BaseOperation {
op: "INSERT";
index: number;
room_id: string;
}
interface InvalidateOperation extends BaseOperation {
op: "INVALIDATE";
range: [number, number];
}
interface SyncOperation extends BaseOperation {
op: "SYNC";
range: [number, number];
room_ids: string[];
}
type Operation = DeleteOperation | InsertOperation | InvalidateOperation | SyncOperation;
/**
* A complete Sliding Sync response
*/
export interface MSC3575SlidingSyncResponse {
pos: string;
lists: ListResponse[];
rooms: Record<string, MSC3575RoomData>;
extensions: object;
}
export enum SlidingSyncState {
/**
* Fired by SlidingSyncEvent.Lifecycle event immediately before processing the response.
*/
RequestFinished = "FINISHED",
/**
* Fired by SlidingSyncEvent.Lifecycle event immediately after all room data listeners have been
* invoked, but before list listeners.
*/
Complete = "COMPLETE",
}
/**
* Internal Class. SlidingList represents a single list in sliding sync. The list can have filters,
* multiple sliding windows, and maintains the index->room_id mapping.
*/
class SlidingList {
private list: MSC3575List;
private isModified: boolean;
// returned data
public roomIndexToRoomId: Record<number, string>;
public joinedCount: number;
/**
* Construct a new sliding list.
* @param {MSC3575List} list The range, sort and filter values to use for this list.
*/
constructor(list: MSC3575List) {
this.replaceList(list);
}
/**
* Mark this list as modified or not. Modified lists will return sticky params with calls to getList.
* This is useful for the first time the list is sent, or if the list has changed in some way.
* @param modified True to mark this list as modified so all sticky parameters will be re-sent.
*/
public setModified(modified: boolean): void {
this.isModified = modified;
}
/**
* Update the list range for this list. Does not affect modified status as list ranges are non-sticky.
* @param newRanges The new ranges for the list
*/
public updateListRange(newRanges: number[][]): void {
this.list.ranges = JSON.parse(JSON.stringify(newRanges));
}
/**
* Replace list parameters. All fields will be replaced with the new list parameters.
* @param list The new list parameters
*/
public replaceList(list: MSC3575List): void {
list.filters = list.filters || {};
list.ranges = list.ranges || [];
this.list = JSON.parse(JSON.stringify(list));
this.isModified = true;
// reset values as the join count may be very different (if filters changed) including the rooms
// (e.g. sort orders or sliding window ranges changed)
// the constantly changing sliding window ranges. Not an array for performance reasons
// E.g. tracking ranges 0-99, 500-599, we don't want to have a 600 element array
this.roomIndexToRoomId = {};
// the total number of joined rooms according to the server, always >= len(roomIndexToRoomId)
this.joinedCount = 0;
}
/**
* Return a copy of the list suitable for a request body.
* @param {boolean} forceIncludeAllParams True to forcibly include all params even if the list
* hasn't been modified. Callers may want to do this if they are modifying the list prior to calling
* updateList.
*/
public getList(forceIncludeAllParams: boolean): MSC3575List {
let list = {
ranges: JSON.parse(JSON.stringify(this.list.ranges)),
};
if (this.isModified || forceIncludeAllParams) {
list = JSON.parse(JSON.stringify(this.list));
}
return list;
}
/**
* Check if a given index is within the list range. This is required even though the /sync API
* provides explicit updates with index positions because of the following situation:
* 0 1 2 3 4 5 6 7 8 indexes
* a b c d e f COMMANDS: SYNC 0 2 a b c; SYNC 6 8 d e f;
* a b c d _ f COMMAND: DELETE 7;
* e a b c d f COMMAND: INSERT 0 e;
* c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it
* @param i The index to check
* @returns True if the index is within a sliding window
*/
public isIndexInRange(i: number): boolean {
for (const r of this.list.ranges) {
if (r[0] <= i && i <= r[1]) {
return true;
}
}
return false;
}
}
/**
* When onResponse extensions should be invoked: before or after processing the main response.
*/
export enum ExtensionState {
// Call onResponse before processing the response body. This is useful when your extension is
// preparing the ground for the response body e.g. processing to-device messages before the
// encrypted event arrives.
PreProcess = "ExtState.PreProcess",
// Call onResponse after processing the response body. This is useful when your extension is
// decorating data from the client, and you rely on MatrixClient.getRoom returning the Room object
// e.g. room account data.
PostProcess = "ExtState.PostProcess",
}
/**
* An interface that must be satisfied to register extensions
*/
export interface Extension {
/**
* The extension name to go under 'extensions' in the request body.
* @returns The JSON key.
*/
name(): string;
/**
* A function which is called when the request JSON is being formed.
* Returns the data to insert under this key.
* @param isInitial True when this is part of the initial request (send sticky params)
* @returns The request JSON to send.
*/
onRequest(isInitial: boolean): object;
/**
* A function which is called when there is response JSON under this extension.
* @param data The response JSON under the extension name.
*/
onResponse(data: object);
/**
* Controls when onResponse should be called.
* @returns The state when it should be called.
*/
when(): ExtensionState;
}
/**
* Events which can be fired by the SlidingSync class. These are designed to provide different levels
* of information when processing sync responses.
* - RoomData: concerns rooms, useful for SlidingSyncSdk to update its knowledge of rooms.
* - Lifecycle: concerns callbacks at various well-defined points in the sync process.
* - List: concerns lists, useful for UI layers to re-render room lists.
* Specifically, the order of event invocation is:
* - Lifecycle (state=RequestFinished)
* - RoomData (N times)
* - Lifecycle (state=Complete)
* - List (at most once per list)
*/
export enum SlidingSyncEvent {
/**
* This event fires when there are updates for a room. Fired as and when rooms are encountered
* in the response.
*/
RoomData = "SlidingSync.RoomData",
/**
* This event fires at various points in the /sync loop lifecycle.
* - SlidingSyncState.RequestFinished: Fires after we receive a valid response but before the
* response has been processed. Perform any pre-process steps here. If there was a problem syncing,
* `err` will be set (e.g network errors).
* - SlidingSyncState.Complete: Fires after all SlidingSyncEvent.RoomData have been fired but before
* SlidingSyncEvent.List.
*/
Lifecycle = "SlidingSync.Lifecycle",
/**
* This event fires whenever there has been a change to this list index. It fires exactly once
* per list, even if there were multiple operations for the list.
* It fires AFTER Lifecycle and RoomData events.
*/
List = "SlidingSync.List",
}
export type SlidingSyncEventHandlerMap = {
[SlidingSyncEvent.RoomData]: (roomId: string, roomData: MSC3575RoomData) => void;
[SlidingSyncEvent.Lifecycle]: (state: SlidingSyncState, resp: MSC3575SlidingSyncResponse, err: Error) => void;
[SlidingSyncEvent.List]: (
listIndex: number, joinedCount: number, roomIndexToRoomId: Record<number, string>,
) => void;
};
/**
* SlidingSync is a high-level data structure which controls the majority of sliding sync.
* It has no hooks into JS SDK except for needing a MatrixClient to perform the HTTP request.
* This means this class (and everything it uses) can be used in isolation from JS SDK if needed.
* To hook this up with the JS SDK, you need to use SlidingSyncSdk.
*/
export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSyncEventHandlerMap> {
private lists: SlidingList[];
private listModifiedCount = 0;
private terminated = false;
// flag set when resend() is called because we cannot rely on detecting AbortError in JS SDK :(
private needsResend = false;
// map of extension name to req/resp handler
private extensions: Record<string, Extension> = {};
private desiredRoomSubscriptions = new Set<string>(); // the *desired* room subscriptions
private confirmedRoomSubscriptions = new Set<string>();
private pendingReq?: IAbortablePromise<MSC3575SlidingSyncResponse>;
/**
* Create a new sliding sync instance
* @param {string} proxyBaseUrl The base URL of the sliding sync proxy
* @param {MSC3575List[]} lists The lists to use for sliding sync.
* @param {MSC3575RoomSubscription} roomSubscriptionInfo The params to use for room subscriptions.
* @param {MatrixClient} client The client to use for /sync calls.
* @param {number} timeoutMS The number of milliseconds to wait for a response.
*/
constructor(
private readonly proxyBaseUrl: string,
lists: MSC3575List[],
private roomSubscriptionInfo: MSC3575RoomSubscription,
private readonly client: MatrixClient,
private readonly timeoutMS: number,
) {
super();
this.lists = lists.map((l) => new SlidingList(l));
}
/**
* Get the length of the sliding lists.
* @returns The number of lists in the sync request
*/
public listLength(): number {
return this.lists.length;
}
/**
* Get the room data for a list.
* @param index The list index
* @returns The list data which contains the rooms in this list
*/
public getListData(index: number): {joinedCount: number, roomIndexToRoomId: Record<number, string>} {
if (!this.lists[index]) {
return null;
}
return {
joinedCount: this.lists[index].joinedCount,
roomIndexToRoomId: Object.assign({}, this.lists[index].roomIndexToRoomId),
};
}
/**
* Get the full list parameters for a list index. This function is provided for callers to use
* in conjunction with setList to update fields on an existing list.
* @param index The list index to get the list for.
* @returns A copy of the list or undefined.
*/
public getList(index: number): MSC3575List {
if (!this.lists[index]) {
return null;
}
return this.lists[index].getList(true);
}
/**
* Set new ranges for an existing list. Calling this function when _only_ the ranges have changed
* is more efficient than calling setList(index,list) as this function won't resend sticky params,
* whereas setList always will.
* @param index The list index to modify
* @param ranges The new ranges to apply.
*/
public setListRanges(index: number, ranges: number[][]): void {
this.lists[index].updateListRange(ranges);
this.resend();
}
/**
* Add or replace a list. Calling this function will interrupt the /sync request to resend new
* lists.
* @param index The index to modify
* @param list The new list parameters.
*/
public setList(index: number, list: MSC3575List): void {
if (this.lists[index]) {
this.lists[index].replaceList(list);
} else {
this.lists[index] = new SlidingList(list);
}
this.listModifiedCount += 1;
this.resend();
}
/**
* Get the room subscriptions for the sync API.
* @returns A copy of the desired room subscriptions.
*/
public getRoomSubscriptions(): Set<string> {
return new Set(Array.from(this.desiredRoomSubscriptions));
}
/**
* Modify the room subscriptions for the sync API. Calling this function will interrupt the
* /sync request to resend new subscriptions. If the /sync stream has not started, this will
* prepare the room subscriptions for when start() is called.
* @param s The new desired room subscriptions.
*/
public modifyRoomSubscriptions(s: Set<string>) {
this.desiredRoomSubscriptions = s;
this.resend();
}
/**
* Modify which events to retrieve for room subscriptions. Invalidates all room subscriptions
* such that they will be sent up afresh.
* @param rs The new room subscription fields to fetch.
*/
public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): void {
this.roomSubscriptionInfo = rs;
this.confirmedRoomSubscriptions = new Set<string>();
this.resend();
}
/**
* Register an extension to send with the /sync request.
* @param ext The extension to register.
*/
public registerExtension(ext: Extension): void {
if (this.extensions[ext.name()]) {
throw new Error(`registerExtension: ${ext.name()} already exists as an extension`);
}
this.extensions[ext.name()] = ext;
}
private getExtensionRequest(isInitial: boolean): object {
const ext = {};
Object.keys(this.extensions).forEach((extName) => {
ext[extName] = this.extensions[extName].onRequest(isInitial);
});
return ext;
}
private onPreExtensionsResponse(ext: object): void {
Object.keys(ext).forEach((extName) => {
if (this.extensions[extName].when() == ExtensionState.PreProcess) {
this.extensions[extName].onResponse(ext[extName]);
}
});
}
private onPostExtensionsResponse(ext: object): void {
Object.keys(ext).forEach((extName) => {
if (this.extensions[extName].when() == ExtensionState.PostProcess) {
this.extensions[extName].onResponse(ext[extName]);
}
});
}
/**
* Invoke all attached room data listeners.
* @param {string} roomId The room which received some data.
* @param {object} roomData The raw sliding sync response JSON.
*/
private invokeRoomDataListeners(roomId: string, roomData: MSC3575RoomData): void {
if (!roomData.required_state) { roomData.required_state = []; }
if (!roomData.timeline) { roomData.timeline = []; }
this.emit(SlidingSyncEvent.RoomData, roomId, roomData);
}
/**
* Invoke all attached lifecycle listeners.
* @param {SlidingSyncState} state The Lifecycle state
* @param {object} resp The raw sync response JSON
* @param {Error?} err Any error that occurred when making the request e.g. network errors.
*/
private invokeLifecycleListeners(state: SlidingSyncState, resp: MSC3575SlidingSyncResponse, err?: Error): void {
this.emit(SlidingSyncEvent.Lifecycle, state, resp, err);
}
private processListOps(list: ListResponse, listIndex: number): void {
let gapIndex = -1;
list.ops.forEach((op: Operation) => {
switch (op.op) {
case "DELETE": {
logger.debug("DELETE", listIndex, op.index, ";");
delete this.lists[listIndex].roomIndexToRoomId[op.index];
gapIndex = op.index;
break;
}
case "INSERT": {
logger.debug(
"INSERT",
listIndex,
op.index,
op.room_id,
";",
);
if (this.lists[listIndex].roomIndexToRoomId[op.index]) {
// something is in this space, shift items out of the way
if (gapIndex < 0) {
logger.debug(
"cannot work out where gap is, INSERT without previous DELETE! List: ",
listIndex,
);
return;
}
// 0,1,2,3 index
// [A,B,C,D]
// DEL 3
// [A,B,C,_]
// INSERT E 0
// [E,A,B,C]
// gapIndex=3, op.index=0
if (gapIndex > op.index) {
// the gap is further down the list, shift every element to the right
// starting at the gap so we can just shift each element in turn:
// [A,B,C,_] gapIndex=3, op.index=0
// [A,B,C,C] i=3
// [A,B,B,C] i=2
// [A,A,B,C] i=1
// Terminate. We'll assign into op.index next.
for (let i = gapIndex; i > op.index; i--) {
if (this.lists[listIndex].isIndexInRange(i)) {
this.lists[listIndex].roomIndexToRoomId[i] =
this.lists[listIndex].roomIndexToRoomId[
i - 1
];
}
}
} else if (gapIndex < op.index) {
// the gap is further up the list, shift every element to the left
// starting at the gap so we can just shift each element in turn
for (let i = gapIndex; i < op.index; i++) {
if (this.lists[listIndex].isIndexInRange(i)) {
this.lists[listIndex].roomIndexToRoomId[i] =
this.lists[listIndex].roomIndexToRoomId[
i + 1
];
}
}
}
}
this.lists[listIndex].roomIndexToRoomId[op.index] = op.room_id;
break;
}
case "INVALIDATE": {
const startIndex = op.range[0];
for (let i = startIndex; i <= op.range[1]; i++) {
delete this.lists[listIndex].roomIndexToRoomId[i];
}
logger.debug(
"INVALIDATE",
listIndex,
op.range[0],
op.range[1],
";",
);
break;
}
case "SYNC": {
const startIndex = op.range[0];
for (let i = startIndex; i <= op.range[1]; i++) {
const roomId = op.room_ids[i - startIndex];
if (!roomId) {
break; // we are at the end of list
}
this.lists[listIndex].roomIndexToRoomId[i] = roomId;
}
logger.debug(
"SYNC",
listIndex,
op.range[0],
op.range[1],
op.room_ids.join(" "),
";",
);
break;
}
}
});
}
/**
* Resend a Sliding Sync request. Used when something has changed in the request.
*/
public resend(): void {
this.needsResend = true;
this.pendingReq?.abort();
}
/**
* Stop syncing with the server.
*/
public stop(): void {
this.terminated = true;
this.pendingReq?.abort();
// remove all listeners so things can be GC'd
this.removeAllListeners(SlidingSyncEvent.Lifecycle);
this.removeAllListeners(SlidingSyncEvent.List);
this.removeAllListeners(SlidingSyncEvent.RoomData);
}
/**
* Start syncing with the server. Blocks until stopped.
*/
public async start() {
let currentPos: string;
while (!this.terminated) {
this.needsResend = false;
let doNotUpdateList = false;
let resp: MSC3575SlidingSyncResponse;
try {
const listModifiedCount = this.listModifiedCount;
const reqBody: MSC3575SlidingSyncRequest = {
lists: this.lists.map((l) => {
return l.getList(false);
}),
pos: currentPos,
timeout: this.timeoutMS,
clientTimeout: this.timeoutMS + BUFFER_PERIOD_MS,
extensions: this.getExtensionRequest(currentPos === undefined),
};
// check if we are (un)subscribing to a room and modify request this one time for it
const newSubscriptions = difference(this.desiredRoomSubscriptions, this.confirmedRoomSubscriptions);
const unsubscriptions = difference(this.confirmedRoomSubscriptions, this.desiredRoomSubscriptions);
if (unsubscriptions.size > 0) {
reqBody.unsubscribe_rooms = Array.from(unsubscriptions);
}
if (newSubscriptions.size > 0) {
reqBody.room_subscriptions = {};
for (const roomId of newSubscriptions) {
reqBody.room_subscriptions[roomId] = this.roomSubscriptionInfo;
}
}
this.pendingReq = this.client.slidingSync(reqBody, this.proxyBaseUrl);
resp = await this.pendingReq;
logger.debug(resp);
currentPos = resp.pos;
// update what we think we're subscribed to.
for (const roomId of newSubscriptions) {
this.confirmedRoomSubscriptions.add(roomId);
}
for (const roomId of unsubscriptions) {
this.confirmedRoomSubscriptions.delete(roomId);
}
if (listModifiedCount !== this.listModifiedCount) {
// the lists have been modified whilst we were waiting for 'await' to return, but the abort()
// call did nothing. It is NOT SAFE to modify the list array now. We'll process the response but
// not update list pointers.
logger.debug("list modified during await call, not updating list");
doNotUpdateList = true;
}
// mark all these lists as having been sent as sticky so we don't keep sending sticky params
this.lists.forEach((l) => {
l.setModified(false);
});
// set default empty values so we don't need to null check
resp.lists = resp.lists || [];
resp.rooms = resp.rooms || {};
resp.extensions = resp.extensions || {};
resp.lists.forEach((val, i) => {
this.lists[i].joinedCount = val.count;
});
this.invokeLifecycleListeners(
SlidingSyncState.RequestFinished,
resp,
);
} catch (err) {
if (err.httpStatus) {
this.invokeLifecycleListeners(
SlidingSyncState.RequestFinished,
null,
err,
);
await sleep(3000);
} else if (this.needsResend || err === "aborted") {
// don't sleep as we caused this error by abort()ing the request.
// we check for 'aborted' because that's the error Jest returns and without it
// we get warnings about not exiting fast enough.
continue;
} else {
logger.error(err);
await sleep(3000);
}
}
if (!resp) {
continue;
}
this.onPreExtensionsResponse(resp.extensions);
Object.keys(resp.rooms).forEach((roomId) => {
this.invokeRoomDataListeners(
roomId,
resp.rooms[roomId],
);
});
const listIndexesWithUpdates: Set<number> = new Set();
if (!doNotUpdateList) {
resp.lists.forEach((list, listIndex) => {
list.ops = list.ops || [];
if (list.ops.length > 0) {
listIndexesWithUpdates.add(listIndex);
}
this.processListOps(list, listIndex);
});
}
this.invokeLifecycleListeners(SlidingSyncState.Complete, resp);
this.onPostExtensionsResponse(resp.extensions);
listIndexesWithUpdates.forEach((i) => {
this.emit(
SlidingSyncEvent.List,
i, this.lists[i].joinedCount, Object.assign({}, this.lists[i].roomIndexToRoomId),
);
});
}
}
}
const difference = (setA: Set<string>, setB: Set<string>): Set<string> => {
const diff = new Set(setA);
for (const elem of setB) {
diff.delete(elem);
}
return diff;
};

View File

@ -4807,7 +4807,7 @@ matrix-events-sdk@^0.0.1-beta.7:
resolved "https://registry.yarnpkg.com/matrix-events-sdk/-/matrix-events-sdk-0.0.1-beta.7.tgz#5ffe45eba1f67cc8d7c2377736c728b322524934"
integrity sha512-9jl4wtWanUFSy2sr2lCjErN/oC8KTAtaeaozJtrgot1JiQcEI4Rda9OLgQ7nLKaqb4Z/QUx/fR3XpDzm5Jy1JA==
matrix-mock-request@^2.0.1:
matrix-mock-request@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/matrix-mock-request/-/matrix-mock-request-2.1.0.tgz#86f5b0ef846865d0767d3a8e64f5bcd6ca94c178"
integrity sha512-Cjpl3yP6h0yu5GKG89m1XZXZlm69Kg/qHV41N/t6SrQsgcfM3Bfavqx9YrtG0UnuXGy4bBSZIe1QiWVeFPZw1A==