1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-07-31 15:24:23 +03:00

Switch sliding sync support to simplified sliding sync (#4400)

* Switch sliding sync support to simplified sliding sync

Experimental PR to test js-sdk with simlified sliding sync.

This does not maintain support for regulaer sliding sync.

* Remove txn_id handling, ensure we always resend when req params change

* Fix some tests

* Fix remaining tests

* Mark TODOs on tests which need to die

* Linting

* Make comments lie less

* void

* Always sent full extension request

* Fix test

* Remove usage of deprecated field

* Hopefully fix DM names

* Refactor how heroes are handled in Room

* Fix how heroes work

* Linting

* Ensure that when SSS omits heroes we don't forget we had heroes

Otherwise when the room next appears the name/avatar reset to
'Empty Room' with no avatar.

* Check the right flag when doing timeline trickling

* Also change when the backpagination token is set

* Remove list ops and server-provided sort positions

SSS doesn't have them.

* Linting

* Add Room.bumpStamp

* Update crypto wasm lib

For new functions

* Add performance logging

* Fix breaking change in crypto wasm v8

* Update crypto wasm for breaking changes

See https://github.com/matrix-org/matrix-rust-sdk-crypto-wasm/releases/tag/v8.0.0
for how this was mapped from the previous API.

* Mark all tracked users as dirty on expired SSS connections

See https://github.com/matrix-org/matrix-rust-sdk/pull/3965 for
more information. Requires `Extension.onRequest` to be `async`.

* add ts extension

* Fix typedoc ref

* Add method to interface

* Don't force membership to invite

The membership was set correctly from the stripped state anyway so
this was redundant and was breaking rooms where we'd knocked.

* Missed merge

* Type import

* Make coverage happier

* More test coverage

* Grammar & formatting

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>

* Remove markAllTrackedUsersAsDirty from crypto API

Not sure why this was in there, seems like it just needed to be in
crypto sync callbacks, which it already was.

* Remove I from interface

* API doc

* Move Hero definition to room-summary

* make comment more specific

* Move internal details into room.ts

and make the comment a proper tsdoc comment

* Use terser arrow function syntax

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>

* Move comment to where we do the lookup

* Clarify comment

also prettier says hi

* Add comment

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>

* Add tsdoc

explaining that the summary event will be modified

* more comment

* Remove unrelated changes

* Add docs & make fields optional

* Type import

* Clarify sync versions

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>

* Make tsdoc comment & add info on when it's used.

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>

* Rephrase comment

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>

* Prettier

* Only fetch member for hero in legacy sync mode

* Split out a separate method to set SSS room summary

Rather than trying to fudge up an object that looked enough like the
old one that we could pass it in.

* Type import

* Make link work

* Nope, linter treats it as an unused import

* Add link the other way

* Add more detail to doc

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>

* Remove unnecessary cast

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>

* Remove length > 0 check

as it wasn't really necessary and may cause heroes not to be cleared?

* Doc params

* Remove unnecessary undefined comparison

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>

* Put the comparison back

as it's necessary to stop typescript complaining

* Fix comment

* Fix comment

---------

Co-authored-by: Kegan Dougal <7190048+kegsay@users.noreply.github.com>
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
This commit is contained in:
David Baker
2025-03-18 17:23:45 +00:00
committed by GitHub
parent c233334f27
commit fd47a189e0
10 changed files with 319 additions and 1144 deletions

View File

@ -649,11 +649,13 @@ describe("SlidingSyncSdk", () => {
ext = findExtension("e2ee");
});
it("gets enabled on the initial request only", () => {
expect(ext.onRequest(true)).toEqual({
it("gets enabled all the time", async () => {
expect(await ext.onRequest(true)).toEqual({
enabled: true,
});
expect(await ext.onRequest(false)).toEqual({
enabled: true,
});
expect(ext.onRequest(false)).toEqual(undefined);
});
it("can update device lists", () => {
@ -695,11 +697,13 @@ describe("SlidingSyncSdk", () => {
ext = findExtension("account_data");
});
it("gets enabled on the initial request only", () => {
expect(ext.onRequest(true)).toEqual({
it("gets enabled all the time", async () => {
expect(await ext.onRequest(true)).toEqual({
enabled: true,
});
expect(await ext.onRequest(false)).toEqual({
enabled: true,
});
expect(ext.onRequest(false)).toEqual(undefined);
});
it("processes global account data", async () => {
@ -823,8 +827,12 @@ describe("SlidingSyncSdk", () => {
ext = findExtension("to_device");
});
it("gets enabled with a limit on the initial request only", () => {
const reqJson: any = ext.onRequest(true);
it("gets enabled all the time", async () => {
let reqJson: any = await ext.onRequest(true);
expect(reqJson.enabled).toEqual(true);
expect(reqJson.limit).toBeGreaterThan(0);
expect(reqJson.since).toBeUndefined();
reqJson = await ext.onRequest(false);
expect(reqJson.enabled).toEqual(true);
expect(reqJson.limit).toBeGreaterThan(0);
expect(reqJson.since).toBeUndefined();
@ -835,7 +843,7 @@ describe("SlidingSyncSdk", () => {
next_batch: "12345",
events: [],
});
expect(ext.onRequest(false)).toEqual({
expect(await ext.onRequest(false)).toMatchObject({
since: "12345",
});
});
@ -919,11 +927,13 @@ describe("SlidingSyncSdk", () => {
ext = findExtension("typing");
});
it("gets enabled on the initial request only", () => {
expect(ext.onRequest(true)).toEqual({
it("gets enabled all the time", async () => {
expect(await ext.onRequest(true)).toEqual({
enabled: true,
});
expect(await ext.onRequest(false)).toEqual({
enabled: true,
});
expect(ext.onRequest(false)).toEqual(undefined);
});
it("processes typing notifications", async () => {
@ -1042,11 +1052,13 @@ describe("SlidingSyncSdk", () => {
ext = findExtension("receipts");
});
it("gets enabled on the initial request only", () => {
expect(ext.onRequest(true)).toEqual({
it("gets enabled all the time", async () => {
expect(await ext.onRequest(true)).toEqual({
enabled: true,
});
expect(await ext.onRequest(false)).toEqual({
enabled: true,
});
expect(ext.onRequest(false)).toEqual(undefined);
});
it("processes receipts", async () => {

View File

@ -41,7 +41,7 @@ describe("SlidingSync", () => {
const selfUserId = "@alice:localhost";
const selfAccessToken = "aseukfgwef";
const proxyBaseUrl = "http://localhost:8008";
const syncUrl = proxyBaseUrl + "/_matrix/client/unstable/org.matrix.msc3575/sync";
const syncUrl = proxyBaseUrl + "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync";
// assign client/httpBackend globals
const setupClient = () => {
@ -103,8 +103,8 @@ describe("SlidingSync", () => {
};
const ext: Extension<any, any> = {
name: () => "custom_extension",
onRequest: (initial) => {
return { initial: initial };
onRequest: async (_) => {
return { initial: true };
},
onResponse: async (res) => {
return;
@ -143,18 +143,16 @@ describe("SlidingSync", () => {
});
await httpBackend!.flushAllExpected();
// expect nothing but ranges and non-initial extensions to be sent
// expect all params to be sent TODO: check MSC4186
httpBackend!
.when("POST", syncUrl)
.check(function (req) {
const body = req.data;
logger.debug("got ", body);
expect(body.room_subscriptions).toBeFalsy();
expect(body.lists["a"]).toEqual({
ranges: [[0, 10]],
});
expect(body.lists["a"]).toEqual(listInfo);
expect(body.extensions).toBeTruthy();
expect(body.extensions["custom_extension"]).toEqual({ initial: false });
expect(body.extensions["custom_extension"]).toEqual({ initial: true });
expect(req.queryParams!["pos"]).toEqual("11");
})
.respond(200, function () {
@ -332,6 +330,7 @@ describe("SlidingSync", () => {
await p;
});
// TODO: this does not exist in MSC4186
it("should be able to unsubscribe from a room", async () => {
httpBackend!
.when("POST", syncUrl)
@ -389,18 +388,19 @@ describe("SlidingSync", () => {
[3, 5],
];
// request first 3 rooms
const listReq = {
ranges: [[0, 2]],
sort: ["by_name"],
timeline_limit: 1,
required_state: [["m.room.topic", ""]],
filters: {
is_dm: true,
},
};
let slidingSync: SlidingSync;
it("should be possible to subscribe to a list", async () => {
// request first 3 rooms
const listReq = {
ranges: [[0, 2]],
sort: ["by_name"],
timeline_limit: 1,
required_state: [["m.room.topic", ""]],
filters: {
is_dm: true,
},
};
slidingSync = new SlidingSync(proxyBaseUrl, new Map([["a", listReq]]), {}, client!, 1);
httpBackend!
.when("POST", syncUrl)
@ -452,11 +452,6 @@ describe("SlidingSync", () => {
expect(slidingSync.getListData("b")).toBeNull();
const syncData = slidingSync.getListData("a")!;
expect(syncData.joinedCount).toEqual(500); // from previous test
expect(syncData.roomIndexToRoomId).toEqual({
0: roomA,
1: roomB,
2: roomC,
});
});
it("should be possible to adjust list ranges", async () => {
@ -467,10 +462,9 @@ describe("SlidingSync", () => {
const body = req.data;
logger.log("next ranges", body.lists["a"].ranges);
expect(body.lists).toBeTruthy();
expect(body.lists["a"]).toEqual({
// only the ranges should be sent as the rest are unchanged and sticky
ranges: newRanges,
});
// list range should be changed
listReq.ranges = newRanges;
expect(body.lists["a"]).toEqual(listReq); // resend all values TODO: check MSC4186
})
.respond(200, {
pos: "b",
@ -495,7 +489,9 @@ describe("SlidingSync", () => {
await httpBackend!.flushAllExpected();
await responseProcessed;
// setListRanges for an invalid list key returns an error
await expect(slidingSync.setListRanges("idontexist", newRanges)).rejects.toBeTruthy();
expect(() => {
slidingSync.setListRanges("idontexist", newRanges);
}).toThrow();
});
it("should be possible to add an extra list", async () => {
@ -513,10 +509,7 @@ describe("SlidingSync", () => {
const body = req.data;
logger.log("extra list", body);
expect(body.lists).toBeTruthy();
expect(body.lists["a"]).toEqual({
// only the ranges should be sent as the rest are unchanged and sticky
ranges: newRanges,
});
expect(body.lists["a"]).toEqual(listReq); // resend all values TODO: check MSC4186
expect(body.lists["b"]).toEqual(extraListReq);
})
.respond(200, {
@ -537,16 +530,6 @@ describe("SlidingSync", () => {
},
},
});
listenUntil(slidingSync, "SlidingSync.List", (listKey, joinedCount, roomIndexToRoomId) => {
expect(listKey).toEqual("b");
expect(joinedCount).toEqual(50);
expect(roomIndexToRoomId).toEqual({
0: roomA,
1: roomB,
2: roomC,
});
return true;
});
const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
@ -554,706 +537,6 @@ describe("SlidingSync", () => {
await httpBackend!.flushAllExpected();
await responseProcessed;
});
it("should be possible to get list DELETE/INSERTs", async () => {
// move C (2) to A (0)
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "e",
lists: {
a: {
count: 500,
ops: [
{
op: "DELETE",
index: 2,
},
{
op: "INSERT",
index: 0,
room_id: roomC,
},
],
},
b: {
count: 50,
},
},
});
let listPromise = listenUntil(
slidingSync,
"SlidingSync.List",
(listKey, joinedCount, roomIndexToRoomId) => {
expect(listKey).toEqual("a");
expect(joinedCount).toEqual(500);
expect(roomIndexToRoomId).toEqual({
0: roomC,
1: roomA,
2: roomB,
});
return true;
},
);
let responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend!.flushAllExpected();
await responseProcessed;
await listPromise;
// move C (0) back to A (2)
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "f",
lists: {
a: {
count: 500,
ops: [
{
op: "DELETE",
index: 0,
},
{
op: "INSERT",
index: 2,
room_id: roomC,
},
],
},
b: {
count: 50,
},
},
});
listPromise = listenUntil(slidingSync, "SlidingSync.List", (listKey, joinedCount, roomIndexToRoomId) => {
expect(listKey).toEqual("a");
expect(joinedCount).toEqual(500);
expect(roomIndexToRoomId).toEqual({
0: roomA,
1: roomB,
2: roomC,
});
return true;
});
responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend!.flushAllExpected();
await responseProcessed;
await listPromise;
});
it("should ignore invalid list indexes", async () => {
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "e",
lists: {
a: {
count: 500,
ops: [
{
op: "DELETE",
index: 2324324,
},
],
},
b: {
count: 50,
},
},
});
const listPromise = listenUntil(
slidingSync,
"SlidingSync.List",
(listKey, joinedCount, roomIndexToRoomId) => {
expect(listKey).toEqual("a");
expect(joinedCount).toEqual(500);
expect(roomIndexToRoomId).toEqual({
0: roomA,
1: roomB,
2: roomC,
});
return true;
},
);
const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend!.flushAllExpected();
await responseProcessed;
await listPromise;
});
it("should be possible to update a list", async () => {
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "g",
lists: {
a: {
count: 42,
ops: [
{
op: "INVALIDATE",
range: [0, 2],
},
{
op: "SYNC",
range: [0, 1],
room_ids: [roomB, roomC],
},
],
},
b: {
count: 50,
},
},
});
// update the list with a new filter
slidingSync.setList("a", {
filters: {
is_encrypted: true,
},
ranges: [[0, 100]],
});
const listPromise = listenUntil(
slidingSync,
"SlidingSync.List",
(listKey, joinedCount, roomIndexToRoomId) => {
expect(listKey).toEqual("a");
expect(joinedCount).toEqual(42);
expect(roomIndexToRoomId).toEqual({
0: roomB,
1: roomC,
});
return true;
},
);
const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend!.flushAllExpected();
await responseProcessed;
await listPromise;
});
// this refers to a set of operations where the end result is no change.
it("should handle net zero operations correctly", async () => {
const indexToRoomId = {
0: roomB,
1: roomC,
};
expect(slidingSync.getListData("a")!.roomIndexToRoomId).toEqual(indexToRoomId);
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "f",
// currently the list is [B,C] so we will insert D then immediately delete it
lists: {
a: {
count: 500,
ops: [
{
op: "DELETE",
index: 2,
},
{
op: "INSERT",
index: 0,
room_id: roomA,
},
{
op: "DELETE",
index: 0,
},
],
},
b: {
count: 50,
},
},
});
const listPromise = listenUntil(
slidingSync,
"SlidingSync.List",
(listKey, joinedCount, roomIndexToRoomId) => {
expect(listKey).toEqual("a");
expect(joinedCount).toEqual(500);
expect(roomIndexToRoomId).toEqual(indexToRoomId);
return true;
},
);
const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend!.flushAllExpected();
await responseProcessed;
await listPromise;
});
it("should handle deletions correctly", async () => {
expect(slidingSync.getListData("a")!.roomIndexToRoomId).toEqual({
0: roomB,
1: roomC,
});
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "g",
lists: {
a: {
count: 499,
ops: [
{
op: "DELETE",
index: 0,
},
],
},
b: {
count: 50,
},
},
});
const listPromise = listenUntil(
slidingSync,
"SlidingSync.List",
(listKey, joinedCount, roomIndexToRoomId) => {
expect(listKey).toEqual("a");
expect(joinedCount).toEqual(499);
expect(roomIndexToRoomId).toEqual({
0: roomC,
});
return true;
},
);
const responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend!.flushAllExpected();
await responseProcessed;
await listPromise;
});
it("should handle insertions correctly", async () => {
expect(slidingSync.getListData("a")!.roomIndexToRoomId).toEqual({
0: roomC,
});
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "h",
lists: {
a: {
count: 500,
ops: [
{
op: "INSERT",
index: 1,
room_id: roomA,
},
],
},
b: {
count: 50,
},
},
});
let listPromise = listenUntil(
slidingSync,
"SlidingSync.List",
(listKey, joinedCount, roomIndexToRoomId) => {
expect(listKey).toEqual("a");
expect(joinedCount).toEqual(500);
expect(roomIndexToRoomId).toEqual({
0: roomC,
1: roomA,
});
return true;
},
);
let responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend!.flushAllExpected();
await responseProcessed;
await listPromise;
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "h",
lists: {
a: {
count: 501,
ops: [
{
op: "INSERT",
index: 1,
room_id: roomB,
},
],
},
b: {
count: 50,
},
},
});
listPromise = listenUntil(slidingSync, "SlidingSync.List", (listKey, joinedCount, roomIndexToRoomId) => {
expect(listKey).toEqual("a");
expect(joinedCount).toEqual(501);
expect(roomIndexToRoomId).toEqual({
0: roomC,
1: roomB,
2: roomA,
});
return true;
});
responseProcessed = listenUntil(slidingSync, "SlidingSync.Lifecycle", (state) => {
return state === SlidingSyncState.Complete;
});
await httpBackend!.flushAllExpected();
await responseProcessed;
await listPromise;
slidingSync.stop();
});
// Regression test to make sure things like DELETE 0 INSERT 0 work correctly and we don't
// end up losing room IDs.
it("should handle insertions with a spurious DELETE correctly", async () => {
slidingSync = new SlidingSync(
proxyBaseUrl,
new Map([
[
"a",
{
ranges: [[0, 20]],
},
],
]),
{},
client!,
1,
);
// initially start with nothing
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "a",
lists: {
a: {
count: 0,
ops: [],
},
},
});
slidingSync.start();
await httpBackend!.flushAllExpected();
expect(slidingSync.getListData("a")!.roomIndexToRoomId).toEqual({});
// insert a room
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "b",
lists: {
a: {
count: 1,
ops: [
{
op: "DELETE",
index: 0,
},
{
op: "INSERT",
index: 0,
room_id: roomA,
},
],
},
},
});
await httpBackend!.flushAllExpected();
expect(slidingSync.getListData("a")!.roomIndexToRoomId).toEqual({
0: roomA,
});
// insert another room
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "c",
lists: {
a: {
count: 1,
ops: [
{
op: "DELETE",
index: 1,
},
{
op: "INSERT",
index: 0,
room_id: roomB,
},
],
},
},
});
await httpBackend!.flushAllExpected();
expect(slidingSync.getListData("a")!.roomIndexToRoomId).toEqual({
0: roomB,
1: roomA,
});
// insert a final room
httpBackend!.when("POST", syncUrl).respond(200, {
pos: "c",
lists: {
a: {
count: 1,
ops: [
{
op: "DELETE",
index: 2,
},
{
op: "INSERT",
index: 0,
room_id: roomC,
},
],
},
},
});
await httpBackend!.flushAllExpected();
expect(slidingSync.getListData("a")!.roomIndexToRoomId).toEqual({
0: roomC,
1: roomB,
2: roomA,
});
slidingSync.stop();
});
});
describe("transaction IDs", () => {
beforeAll(setupClient);
afterAll(teardownClient);
const roomId = "!foo:bar";
let slidingSync: SlidingSync;
// really this applies to them all but it's easier to just test one
it("should resolve modifyRoomSubscriptions after SlidingSync.start() is called", async () => {
const roomSubInfo = {
timeline_limit: 1,
required_state: [["m.room.name", ""]],
};
// add the subscription
slidingSync = new SlidingSync(proxyBaseUrl, new Map(), roomSubInfo, client!, 1);
// modification before SlidingSync.start()
const subscribePromise = slidingSync.modifyRoomSubscriptions(new Set([roomId]));
let txnId: string | undefined;
httpBackend!
.when("POST", syncUrl)
.check(function (req) {
const body = req.data;
logger.debug("got ", body);
expect(body.room_subscriptions).toBeTruthy();
expect(body.room_subscriptions[roomId]).toEqual(roomSubInfo);
expect(body.txn_id).toBeTruthy();
txnId = body.txn_id;
})
.respond(200, function () {
return {
pos: "aaa",
txn_id: txnId,
lists: {},
extensions: {},
rooms: {
[roomId]: {
name: "foo bar",
required_state: [],
timeline: [],
},
},
};
});
slidingSync.start();
await httpBackend!.flushAllExpected();
await subscribePromise;
});
it("should resolve setList during a connection", async () => {
const newList = {
ranges: [[0, 20]],
};
const promise = slidingSync.setList("a", newList);
let txnId: string | undefined;
httpBackend!
.when("POST", syncUrl)
.check(function (req) {
const body = req.data;
logger.debug("got ", body);
expect(body.room_subscriptions).toBeFalsy();
expect(body.lists["a"]).toEqual(newList);
expect(body.txn_id).toBeTruthy();
txnId = body.txn_id;
})
.respond(200, function () {
return {
pos: "bbb",
txn_id: txnId,
lists: { a: { count: 5 } },
extensions: {},
};
});
await httpBackend!.flushAllExpected();
await promise;
expect(txnId).toBeDefined();
});
it("should resolve setListRanges during a connection", async () => {
const promise = slidingSync.setListRanges("a", [[20, 40]]);
let txnId: string | undefined;
httpBackend!
.when("POST", syncUrl)
.check(function (req) {
const body = req.data;
logger.debug("got ", body);
expect(body.room_subscriptions).toBeFalsy();
expect(body.lists["a"]).toEqual({
ranges: [[20, 40]],
});
expect(body.txn_id).toBeTruthy();
txnId = body.txn_id;
})
.respond(200, function () {
return {
pos: "ccc",
txn_id: txnId,
lists: { a: { count: 5 } },
extensions: {},
};
});
await httpBackend!.flushAllExpected();
await promise;
expect(txnId).toBeDefined();
});
it("should resolve modifyRoomSubscriptionInfo during a connection", async () => {
const promise = slidingSync.modifyRoomSubscriptionInfo({
timeline_limit: 99,
});
let txnId: string | undefined;
httpBackend!
.when("POST", syncUrl)
.check(function (req) {
const body = req.data;
logger.debug("got ", body);
expect(body.room_subscriptions).toBeTruthy();
expect(body.room_subscriptions[roomId]).toEqual({
timeline_limit: 99,
});
expect(body.txn_id).toBeTruthy();
txnId = body.txn_id;
})
.respond(200, function () {
return {
pos: "ddd",
txn_id: txnId,
extensions: {},
};
});
await httpBackend!.flushAllExpected();
await promise;
expect(txnId).toBeDefined();
});
it("should reject earlier pending promises if a later transaction is acknowledged", async () => {
// i.e if we have [A,B,C] and see txn_id=C then A,B should be rejected.
const gotTxnIds: any[] = [];
const pushTxn = function (req: MockHttpBackend["requests"][0]) {
gotTxnIds.push(req.data.txn_id);
};
const failPromise = slidingSync.setListRanges("a", [[20, 40]]);
httpBackend!.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "e" }); // missing txn_id
await httpBackend!.flushAllExpected();
const failPromise2 = slidingSync.setListRanges("a", [[60, 70]]);
httpBackend!.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "f" }); // missing txn_id
await httpBackend!.flushAllExpected();
const okPromise = slidingSync.setListRanges("a", [[0, 20]]);
let txnId: string | undefined;
httpBackend!
.when("POST", syncUrl)
.check((req) => {
txnId = req.data.txn_id;
})
.respond(200, () => {
// include the txn_id, earlier requests should now be reject()ed.
return {
pos: "g",
txn_id: txnId,
};
});
await Promise.all([
expect(failPromise).rejects.toEqual(gotTxnIds[0]),
expect(failPromise2).rejects.toEqual(gotTxnIds[1]),
httpBackend!.flushAllExpected(),
okPromise,
]);
expect(txnId).toBeDefined();
});
it("should not reject later pending promises if an earlier transaction is acknowledged", async () => {
// i.e if we have [A,B,C] and see txn_id=B then C should not be rejected but A should.
const gotTxnIds: any[] = [];
const pushTxn = function (req: MockHttpBackend["requests"][0]) {
gotTxnIds.push(req.data?.txn_id);
};
const A = slidingSync.setListRanges("a", [[20, 40]]);
httpBackend!.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "A" });
await httpBackend!.flushAllExpected();
const B = slidingSync.setListRanges("a", [[60, 70]]);
httpBackend!.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "B" }); // missing txn_id
await httpBackend!.flushAllExpected();
// attach rejection handlers now else if we do it later Jest treats that as an unhandled rejection
// which is a fail.
const C = slidingSync.setListRanges("a", [[0, 20]]);
let pendingC = true;
C.finally(() => {
pendingC = false;
});
httpBackend!
.when("POST", syncUrl)
.check(pushTxn)
.respond(200, () => {
// include the txn_id for B, so C's promise is outstanding
return {
pos: "C",
txn_id: gotTxnIds[1],
};
});
await Promise.all([
expect(A).rejects.toEqual(gotTxnIds[0]),
httpBackend!.flushAllExpected(),
// A is rejected, see above
expect(B).resolves.toEqual(gotTxnIds[1]), // B is resolved
]);
expect(pendingC).toBe(true); // C is pending still
});
it("should do nothing for unknown txn_ids", async () => {
const promise = slidingSync.setListRanges("a", [[20, 40]]);
let pending = true;
promise.finally(() => {
pending = false;
});
let txnId: string | undefined;
httpBackend!
.when("POST", syncUrl)
.check(function (req) {
const body = req.data;
logger.debug("got ", body);
expect(body.room_subscriptions).toBeFalsy();
expect(body.lists["a"]).toEqual({
ranges: [[20, 40]],
});
expect(body.txn_id).toBeTruthy();
txnId = body.txn_id;
})
.respond(200, function () {
return {
pos: "ccc",
txn_id: "bogus transaction id",
lists: { a: { count: 5 } },
extensions: {},
};
});
await httpBackend!.flushAllExpected();
expect(txnId).toBeDefined();
expect(pending).toBe(true);
slidingSync.stop();
});
});
describe("custom room subscriptions", () => {
@ -1543,7 +826,7 @@ describe("SlidingSync", () => {
const extPre: Extension<any, any> = {
name: () => preExtName,
onRequest: (initial) => {
onRequest: async (initial) => {
return onPreExtensionRequest(initial);
},
onResponse: (res) => {
@ -1553,7 +836,7 @@ describe("SlidingSync", () => {
};
const extPost: Extension<any, any> = {
name: () => postExtName,
onRequest: (initial) => {
onRequest: async (initial) => {
return onPostExtensionRequest(initial);
},
onResponse: (res) => {
@ -1568,7 +851,7 @@ describe("SlidingSync", () => {
const callbackOrder: string[] = [];
let extensionOnResponseCalled = false;
onPreExtensionRequest = () => {
onPreExtensionRequest = async () => {
return extReq;
};
onPreExtensionResponse = async (resp) => {
@ -1608,7 +891,7 @@ describe("SlidingSync", () => {
});
it("should be able to send nothing in an extension request/response", async () => {
onPreExtensionRequest = () => {
onPreExtensionRequest = async () => {
return undefined;
};
let responseCalled = false;
@ -1643,7 +926,7 @@ describe("SlidingSync", () => {
it("is possible to register extensions after start() has been called", async () => {
slidingSync.registerExtension(extPost);
onPostExtensionRequest = () => {
onPostExtensionRequest = async () => {
return extReq;
};
let responseCalled = false;

View File

@ -1234,6 +1234,16 @@ describe("Room", function () {
expect(room.name).toEqual(nameB);
});
it("supports MSC4186 style heroes", () => {
const nameB = "Bertha Bobbington";
const nameC = "Clarissa Harissa";
addMember(userB, KnownMembership.Join, { name: nameB });
addMember(userC, KnownMembership.Join, { name: nameC });
room.setMSC4186SummaryData([{ user_id: userB }, { user_id: userC }], undefined, undefined);
room.recalculate();
expect(room.name).toEqual(`${nameB} and ${nameC}`);
});
it("reverts to empty room in case of self chat", function () {
room.setSummary({
"m.heroes": [],
@ -4276,4 +4286,9 @@ describe("Room", function () {
expect(filteredEvents[0].getContent().body).toEqual("ev2");
});
});
it("saves and retrieves the bump stamp", () => {
room.setBumpStamp(123456789);
expect(room.getBumpStamp()).toEqual(123456789);
});
});

View File

@ -8194,7 +8194,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const clientTimeout = req.clientTimeout;
delete req.clientTimeout;
return this.http.authedRequest<MSC3575SlidingSyncResponse>(Method.Post, "/sync", qps, req, {
prefix: "/_matrix/client/unstable/org.matrix.msc3575",
prefix: "/_matrix/client/unstable/org.matrix.simplified_msc3575",
baseUrl: proxyBaseUrl,
localTimeoutMs: clientTimeout,
abortSignal,

View File

@ -138,6 +138,15 @@ export interface SyncCryptoCallbacks {
* @param syncState - information about the completed sync.
*/
onSyncCompleted(syncState: OnSyncCompletedData): void;
/**
* Mark all tracked users' device lists as dirty.
*
* This method will cause additional `/keys/query` requests on the server, so should be used only
* when the client has desynced tracking device list deltas from the server.
* In MSC4186: Simplified Sliding Sync, this can happen when the server expires the connection.
*/
markAllTrackedUsersAsDirty(): Promise<void>;
}
/**

View File

@ -14,9 +14,40 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
/**
* A stripped m.room.member event which contains the key renderable fields from the event,
* sent only in simplified sliding sync (not `/v3/sync`).
* This is very similar to MSC4186Hero from sliding-sync.ts but an internal format with
* camelCase rather than underscores.
*/
export type Hero = {
userId: string;
displayName?: string;
avatarUrl?: string;
/**
* If true, the hero is from an MSC4186 summary, in which case `displayName` and `avatarUrl` will
* have been set by the server if available. If false, the `Hero` has been constructed from a `/v3/sync` response,
* so these fields will always be undefined.
*/
fromMSC4186: boolean;
};
/**
* High level summary information for a room, as returned by `/v3/sync`.
*/
export interface IRoomSummary {
/**
* The room heroes: a selected set of members that can be used when summarising or
* generating a name for a room. List of user IDs.
*/
"m.heroes": string[];
/**
* The number of joined members in the room.
*/
"m.joined_member_count"?: number;
/**
* The number of invited members in the room.
*/
"m.invited_member_count"?: number;
}

View File

@ -35,7 +35,7 @@ import {
} from "./event.ts";
import { EventStatus } from "./event-status.ts";
import { RoomMember } from "./room-member.ts";
import { type IRoomSummary, RoomSummary } from "./room-summary.ts";
import { type IRoomSummary, type Hero, RoomSummary } from "./room-summary.ts";
import { logger } from "../logger.ts";
import { TypedReEmitter } from "../ReEmitter.ts";
import {
@ -77,6 +77,7 @@ import { compareEventOrdering } from "./compare-event-ordering.ts";
import * as utils from "../utils.ts";
import { KnownMembership, type Membership } from "../@types/membership.ts";
import { type Capabilities, type IRoomVersionsCapability, RoomVersionStability } from "../serverCapabilities.ts";
import { type MSC4186Hero } from "../sliding-sync.ts";
// These constants are used as sane defaults when the homeserver doesn't support
// the m.room_versions capability. In practice, KNOWN_SAFE_ROOM_VERSION should be
@ -335,6 +336,7 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
public readonly reEmitter: TypedReEmitter<RoomEmittedEvents, RoomEventHandlerMap>;
private txnToEvent: Map<string, MatrixEvent> = new Map(); // Pending in-flight requests { string: MatrixEvent }
private notificationCounts: NotificationCount = {};
private bumpStamp: number | undefined = undefined;
private readonly threadNotifications = new Map<string, NotificationCount>();
public readonly cachedThreadReadReceipts = new Map<string, CachedReceiptStructure[]>();
// Useful to know at what point the current user has started using threads in this room
@ -361,7 +363,16 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
// read by megolm via getter; boolean value - null indicates "use global value"
private blacklistUnverifiedDevices?: boolean;
private selfMembership?: Membership;
private summaryHeroes: string[] | null = null;
/**
* A `Hero` is a stripped `m.room.member` event which contains the important renderable fields from the event.
*
* It is used in MSC4186 (Simplified Sliding Sync) as a replacement for the old `summary` field.
*
* When we are doing old-style (`/v3/sync`) sync, we simulate the SSS behaviour by constructing
* a `Hero` object based on the user id we get from the summary. Obviously, in that case,
* the `Hero` will lack a `displayName` or `avatarUrl`.
*/
private heroes: Hero[] | null = null;
// flags to stop logspam about missing m.room.create events
private getTypeWarning = false;
private getVersionWarning = false;
@ -879,7 +890,7 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
// fall back to summary information
const memberCount = this.getInvitedAndJoinedMemberCount();
if (memberCount === 2) {
return this.summaryHeroes?.[0];
return this.heroes?.[0]?.userId;
}
}
}
@ -897,8 +908,8 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
}
}
// Remember, we're assuming this room is a DM, so returning the first member we find should be fine
if (Array.isArray(this.summaryHeroes) && this.summaryHeroes.length) {
return this.summaryHeroes[0];
if (Array.isArray(this.heroes) && this.heroes.length) {
return this.heroes[0].userId;
}
const members = this.currentState.getMembers();
const anyMember = members.find((m) => m.userId !== this.myUserId);
@ -940,12 +951,45 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
if (nonFunctionalMemberCount > 2) return;
// Prefer the list of heroes, if present. It should only include the single other user in the DM.
const nonFunctionalHeroes = this.summaryHeroes?.filter((h) => !functionalMembers.includes(h));
const nonFunctionalHeroes = this.heroes?.filter((h) => !functionalMembers.includes(h.userId));
const hasHeroes = Array.isArray(nonFunctionalHeroes) && nonFunctionalHeroes.length;
if (hasHeroes) {
// use first hero that has a display name or avatar url, or whose user ID
// can be looked up as a member of the room
for (const hero of nonFunctionalHeroes) {
// If the hero was from a legacy sync (`/v3/sync`), we will need to look the user ID up in the room
// the display name and avatar URL will not be set.
if (!hero.fromMSC4186) {
// attempt to look up renderable fields from the m.room.member event if it exists
const member = this.getMember(hero.userId);
if (member) {
return member;
}
} else {
// use the Hero supplied values for the room member.
// TODO: It's unfortunate that this function, which clearly only cares about the
// avatar url, returns the entire RoomMember event. We need to fake an event
// to meet this API shape.
const heroMember = new RoomMember(this.roomId, hero.userId);
// set the display name and avatar url
heroMember.setMembershipEvent(
new MatrixEvent({
// ensure it's unique even if we hit the same millisecond
event_id: "$" + this.roomId + hero.userId + new Date().getTime(),
type: EventType.RoomMember,
state_key: hero.userId,
content: {
displayname: hero.displayName,
avatar_url: hero.avatarUrl,
},
}),
);
return heroMember;
}
}
const availableMember = nonFunctionalHeroes
.map((userId) => {
return this.getMember(userId);
.map((hero) => {
return this.getMember(hero.userId);
})
.find((member) => !!member);
if (availableMember) {
@ -970,8 +1014,8 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
// trust and try falling back to a hero, creating a one-off member for it
if (hasHeroes) {
const availableUser = nonFunctionalHeroes
.map((userId) => {
return this.client.getUser(userId);
.map((hero) => {
return this.client.getUser(hero.userId);
})
.find((user) => !!user);
if (availableUser) {
@ -1602,6 +1646,24 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
this.emit(RoomEvent.UnreadNotifications);
}
/**
* Set the bump stamp for this room. This can be used for sorting rooms when the timeline
* entries are unknown. Used in MSC4186: Simplified Sliding Sync.
* @param bumpStamp The bump_stamp value from the server
*/
public setBumpStamp(bumpStamp: number): void {
this.bumpStamp = bumpStamp;
}
/**
* Get the bump stamp for this room. This can be used for sorting rooms when the timeline
* entries are unknown. Used in MSC4186: Simplified Sliding Sync.
* @returns The bump stamp for the room, if it exists.
*/
public getBumpStamp(): number | undefined {
return this.bumpStamp;
}
/**
* Set one of the notification counts for this room
* @param type - The type of notification count to set.
@ -1616,8 +1678,13 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
return this.setUnreadNotificationCount(type, count);
}
/**
* Takes a legacy room summary (/v3/sync as opposed to MSC4186) and updates the room with it.
*
* @param summary - The room summary to update the room with
*/
public setSummary(summary: IRoomSummary): void {
const heroes = summary["m.heroes"];
const heroes = summary["m.heroes"]?.map((h) => ({ userId: h, fromMSC4186: false }));
const joinedCount = summary["m.joined_member_count"];
const invitedCount = summary["m.invited_member_count"];
if (Number.isInteger(joinedCount)) {
@ -1627,17 +1694,53 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
this.currentState.setInvitedMemberCount(invitedCount!);
}
if (Array.isArray(heroes)) {
// be cautious about trusting server values,
// and make sure heroes doesn't contain our own id
// just to be sure
this.summaryHeroes = heroes.filter((userId) => {
return userId !== this.myUserId;
// filter out ourselves just in case
this.heroes = heroes.filter((h) => {
return h.userId != this.myUserId;
});
}
this.emit(RoomEvent.Summary, summary);
}
/**
* Takes information from the MSC4186 room summary and updates the room with it.
*
* @param heroes - The room's hero members
* @param joinedCount - The number of joined members
* @param invitedCount - The number of invited members
*/
public setMSC4186SummaryData(
heroes: MSC4186Hero[] | undefined,
joinedCount: number | undefined,
invitedCount: number | undefined,
): void {
if (heroes) {
this.heroes = heroes
.filter((h) => h.user_id !== this.myUserId)
.map((h) => ({
userId: h.user_id,
displayName: h.displayname,
avatarUrl: h.avatar_url,
fromMSC4186: true,
}));
}
if (joinedCount !== undefined && Number.isInteger(joinedCount)) {
this.currentState.setJoinedMemberCount(joinedCount);
}
if (invitedCount !== undefined && Number.isInteger(invitedCount)) {
this.currentState.setInvitedMemberCount(invitedCount);
}
// Construct a summary object to emit as the event wants the info in a single object
// more like old-style (/v3/sync) summaries.
this.emit(RoomEvent.Summary, {
"m.heroes": this.heroes ? this.heroes.map((h) => h.userId) : [],
"m.joined_member_count": joinedCount,
"m.invited_member_count": invitedCount,
});
}
/**
* Whether to send encrypted messages to devices within this room.
* @param value - true to blacklist unverified devices, null
@ -3459,18 +3562,25 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
// get service members (e.g. helper bots) for exclusion
const excludedUserIds = this.getFunctionalMembers();
// get members that are NOT ourselves and are actually in the room.
// get members from heroes that are NOT ourselves
let otherNames: string[] = [];
if (this.summaryHeroes) {
// if we have a summary, the member state events should be in the room state
this.summaryHeroes.forEach((userId) => {
if (this.heroes) {
// if we have heroes, use those as the names
this.heroes.forEach((hero) => {
// filter service members
if (excludedUserIds.includes(userId)) {
if (excludedUserIds.includes(hero.userId)) {
inviteJoinCount--;
return;
}
const member = this.getMember(userId);
otherNames.push(member ? member.name : userId);
// If the hero has a display name, use that.
// Otherwise, look their user ID up in the membership and use
// the name from there, or the user ID as a last resort.
if (hero.displayName) {
otherNames.push(hero.displayName);
} else {
const member = this.getMember(hero.userId);
otherNames.push(member ? member.name : hero.userId);
}
});
} else {
let otherMembers = this.currentState.getMembers().filter((m) => {

View File

@ -1635,7 +1635,6 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, CryptoEventH
/** called by the sync loop after processing each sync.
*
* TODO: figure out something equivalent for sliding sync.
*
* @param syncState - information on the completed sync.
*/
@ -1647,6 +1646,13 @@ export class RustCrypto extends TypedEventEmitter<RustCryptoEvents, CryptoEventH
});
}
/**
* Implementation of {@link CryptoApi#markAllTrackedUsersAsDirty}.
*/
public async markAllTrackedUsersAsDirty(): Promise<void> {
await this.olmMachine.markAllTrackedUsersAsDirty();
}
/**
* Handle an incoming m.key.verification.request event, received either in-room or in a to-device message.
*

View File

@ -82,9 +82,16 @@ class ExtensionE2EE implements Extension<ExtensionE2EERequest, ExtensionE2EEResp
return ExtensionState.PreProcess;
}
public onRequest(isInitial: boolean): ExtensionE2EERequest | undefined {
if (!isInitial) {
return undefined;
public async onRequest(isInitial: boolean): Promise<ExtensionE2EERequest> {
if (isInitial) {
// In SSS, the `?pos=` contains the stream position for device list updates.
// If we do not have a `?pos=` (e.g because we forgot it, or because the server
// invalidated our connection) then we MUST invlaidate all device lists because
// the server will not tell us the delta. This will then cause UTDs as we will fail
// to encrypt for new devices. This is an expensive call, so we should
// really really remember `?pos=` wherever possible.
logger.log("ExtensionE2EE: invalidating all device lists due to missing 'pos'");
await this.crypto.markAllTrackedUsersAsDirty();
}
return {
enabled: true, // this is sticky so only send it on the initial request
@ -134,15 +141,12 @@ class ExtensionToDevice implements Extension<ExtensionToDeviceRequest, Extension
return ExtensionState.PreProcess;
}
public onRequest(isInitial: boolean): ExtensionToDeviceRequest {
const extReq: ExtensionToDeviceRequest = {
public async onRequest(isInitial: boolean): Promise<ExtensionToDeviceRequest> {
return {
since: this.nextBatch !== null ? this.nextBatch : undefined,
limit: 100,
enabled: true,
};
if (isInitial) {
extReq["limit"] = 100;
extReq["enabled"] = true;
}
return extReq;
}
public async onResponse(data: ExtensionToDeviceResponse): Promise<void> {
@ -216,10 +220,7 @@ class ExtensionAccountData implements Extension<ExtensionAccountDataRequest, Ext
return ExtensionState.PostProcess;
}
public onRequest(isInitial: boolean): ExtensionAccountDataRequest | undefined {
if (!isInitial) {
return undefined;
}
public async onRequest(isInitial: boolean): Promise<ExtensionAccountDataRequest> {
return {
enabled: true,
};
@ -286,10 +287,7 @@ class ExtensionTyping implements Extension<ExtensionTypingRequest, ExtensionTypi
return ExtensionState.PostProcess;
}
public onRequest(isInitial: boolean): ExtensionTypingRequest | undefined {
if (!isInitial) {
return undefined; // don't send a JSON object for subsequent requests, we don't need to.
}
public async onRequest(isInitial: boolean): Promise<ExtensionTypingRequest> {
return {
enabled: true,
};
@ -325,13 +323,10 @@ class ExtensionReceipts implements Extension<ExtensionReceiptsRequest, Extension
return ExtensionState.PostProcess;
}
public onRequest(isInitial: boolean): ExtensionReceiptsRequest | undefined {
if (isInitial) {
return {
enabled: true,
};
}
return undefined; // don't send a JSON object for subsequent requests, we don't need to.
public async onRequest(isInitial: boolean): Promise<ExtensionReceiptsRequest> {
return {
enabled: true,
};
}
public async onResponse(data: ExtensionReceiptsResponse): Promise<void> {
@ -442,6 +437,7 @@ export class SlidingSyncSdk {
}
} else {
this.failCount = 0;
logger.log(`SlidingSyncState.RequestFinished with ${Object.keys(resp?.rooms || []).length} rooms`);
}
break;
}
@ -580,7 +576,7 @@ export class SlidingSyncSdk {
// TODO: handle threaded / beacon events
if (roomData.initial) {
if (roomData.limited || roomData.initial) {
// we should not know about any of these timeline entries if this is a genuinely new room.
// If we do, then we've effectively done scrollback (e.g requesting timeline_limit: 1 for
// this room, then timeline_limit: 50).
@ -637,6 +633,9 @@ export class SlidingSyncSdk {
room.setUnreadNotificationCount(NotificationCountType.Highlight, roomData.highlight_count);
}
}
if (roomData.bump_stamp) {
room.setBumpStamp(roomData.bump_stamp);
}
if (Number.isInteger(roomData.invited_count)) {
room.currentState.setInvitedMemberCount(roomData.invited_count!);
@ -656,11 +655,10 @@ export class SlidingSyncSdk {
inviteStateEvents.forEach((e) => {
this.client.emit(ClientEvent.Event, e);
});
room.updateMyMembership(KnownMembership.Invite);
return;
}
if (roomData.initial) {
if (roomData.limited) {
// set the back-pagination token. Do this *before* adding any
// events so that clients can start back-paginating.
room.getLiveTimeline().setPaginationToken(roomData.prev_batch ?? null, EventTimeline.BACKWARDS);
@ -728,6 +726,8 @@ export class SlidingSyncSdk {
// synchronous execution prior to emitting SlidingSyncState.Complete
room.updateMyMembership(KnownMembership.Join);
room.setMSC4186SummaryData(roomData.heroes, roomData.joined_count, roomData.invited_count);
room.recalculate();
if (roomData.initial) {
client.store.storeRoom(room);

View File

@ -1,5 +1,5 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Copyright 2022-2024 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -18,7 +18,7 @@ import { logger } from "./logger.ts";
import { type MatrixClient } from "./client.ts";
import { type IRoomEvent, type IStateEvent } from "./sync-accumulator.ts";
import { TypedEventEmitter } from "./models/typed-event-emitter.ts";
import { sleep, type IDeferred, defer } from "./utils.ts";
import { sleep } from "./utils.ts";
import { type HTTPError } from "./http-api/index.ts";
// /sync requests allow you to set a timeout= but the request may continue
@ -82,10 +82,23 @@ export interface MSC3575SlidingSyncRequest {
clientTimeout?: number;
}
/**
* New format of hero introduced in MSC4186 with display name and avatar URL
* in addition to just user_id (as it is on the wire, with underscores)
* as opposed to Hero in room-summary.ts which has fields in camelCase
* (and also a flag to note what format the hero came from).
*/
export interface MSC4186Hero {
user_id: string;
displayname?: string;
avatar_url?: string;
}
export interface MSC3575RoomData {
name: string;
required_state: IStateEvent[];
timeline: (IRoomEvent | IStateEvent)[];
heroes?: MSC4186Hero[];
notification_count?: number;
highlight_count?: number;
joined_count?: number;
@ -96,41 +109,13 @@ export interface MSC3575RoomData {
is_dm?: boolean;
prev_batch?: string;
num_live?: number;
bump_stamp?: number;
}
interface ListResponse {
count: number;
ops: Operation[];
}
interface BaseOperation {
op: string;
}
interface DeleteOperation extends BaseOperation {
op: "DELETE";
index: number;
}
interface InsertOperation extends BaseOperation {
op: "INSERT";
index: number;
room_id: string;
}
interface InvalidateOperation extends BaseOperation {
op: "INVALIDATE";
range: [number, number];
}
interface SyncOperation extends BaseOperation {
op: "SYNC";
range: [number, number];
room_ids: string[];
}
type Operation = DeleteOperation | InsertOperation | InvalidateOperation | SyncOperation;
/**
* A complete Sliding Sync response
*/
@ -163,7 +148,6 @@ class SlidingList {
private isModified?: boolean;
// returned data
public roomIndexToRoomId: Record<number, string> = {};
public joinedCount = 0;
/**
@ -204,9 +188,6 @@ class SlidingList {
// reset values as the join count may be very different (if filters changed) including the rooms
// (e.g. sort orders or sliding window ranges changed)
// the constantly changing sliding window ranges. Not an array for performance reasons
// E.g. tracking ranges 0-99, 500-599, we don't want to have a 600 element array
this.roomIndexToRoomId = {};
// the total number of joined rooms according to the server, always >= len(roomIndexToRoomId)
this.joinedCount = 0;
}
@ -226,26 +207,6 @@ class SlidingList {
}
return list;
}
/**
* Check if a given index is within the list range. This is required even though the /sync API
* provides explicit updates with index positions because of the following situation:
* 0 1 2 3 4 5 6 7 8 indexes
* a b c d e f COMMANDS: SYNC 0 2 a b c; SYNC 6 8 d e f;
* a b c d _ f COMMAND: DELETE 7;
* e a b c d f COMMAND: INSERT 0 e;
* c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it
* @param i - The index to check
* @returns True if the index is within a sliding window
*/
public isIndexInRange(i: number): boolean {
for (const r of this.list.ranges) {
if (r[0] <= i && i <= r[1]) {
return true;
}
}
return false;
}
}
/**
@ -274,10 +235,10 @@ export interface Extension<Req extends object, Res extends object> {
/**
* A function which is called when the request JSON is being formed.
* Returns the data to insert under this key.
* @param isInitial - True when this is part of the initial request (send sticky params)
* @param isInitial - True when this is part of the initial request.
* @returns The request JSON to send.
*/
onRequest(isInitial: boolean): Req | undefined;
onRequest(isInitial: boolean): Promise<Req>;
/**
* A function which is called when there is response JSON under this extension.
* @param data - The response JSON under the extension name.
@ -295,12 +256,10 @@ export interface Extension<Req extends object, Res extends object> {
* of information when processing sync responses.
* - RoomData: concerns rooms, useful for SlidingSyncSdk to update its knowledge of rooms.
* - Lifecycle: concerns callbacks at various well-defined points in the sync process.
* - List: concerns lists, useful for UI layers to re-render room lists.
* Specifically, the order of event invocation is:
* - Lifecycle (state=RequestFinished)
* - RoomData (N times)
* - Lifecycle (state=Complete)
* - List (at most once per list)
*/
export enum SlidingSyncEvent {
/**
@ -313,16 +272,9 @@ export enum SlidingSyncEvent {
* - SlidingSyncState.RequestFinished: Fires after we receive a valid response but before the
* response has been processed. Perform any pre-process steps here. If there was a problem syncing,
* `err` will be set (e.g network errors).
* - SlidingSyncState.Complete: Fires after all SlidingSyncEvent.RoomData have been fired but before
* SlidingSyncEvent.List.
* - SlidingSyncState.Complete: Fires after the response has been processed.
*/
Lifecycle = "SlidingSync.Lifecycle",
/**
* This event fires whenever there has been a change to this list index. It fires exactly once
* per list, even if there were multiple operations for the list.
* It fires AFTER Lifecycle and RoomData events.
*/
List = "SlidingSync.List",
}
export type SlidingSyncEventHandlerMap = {
@ -332,7 +284,6 @@ export type SlidingSyncEventHandlerMap = {
resp: MSC3575SlidingSyncResponse | null,
err?: Error,
) => void;
[SlidingSyncEvent.List]: (listKey: string, joinedCount: number, roomIndexToRoomId: Record<number, string>) => void;
};
/**
@ -347,11 +298,6 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
private terminated = false;
// flag set when resend() is called because we cannot rely on detecting AbortError in JS SDK :(
private needsResend = false;
// the txn_id to send with the next request.
private txnId: string | null = null;
// a list (in chronological order of when they were sent) of objects containing the txn ID and
// a defer to resolve/reject depending on whether they were successfully sent or not.
private txnIdDefers: (IDeferred<string> & { txnId: string })[] = [];
// map of extension name to req/resp handler
private extensions: Record<string, Extension<any, any>> = {};
@ -426,14 +372,13 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
* @param key - The list key
* @returns The list data which contains the rooms in this list
*/
public getListData(key: string): { joinedCount: number; roomIndexToRoomId: Record<number, string> } | null {
public getListData(key: string): { joinedCount: number } | null {
const data = this.lists.get(key);
if (!data) {
return null;
}
return {
joinedCount: data.joinedCount,
roomIndexToRoomId: Object.assign({}, data.roomIndexToRoomId),
};
}
@ -461,13 +406,13 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
* (or rejects with the transaction ID if the action was not applied e.g the request was cancelled
* immediately after sending, in which case the action will be applied in the subsequent request)
*/
public setListRanges(key: string, ranges: number[][]): Promise<string> {
public setListRanges(key: string, ranges: number[][]): void {
const list = this.lists.get(key);
if (!list) {
return Promise.reject(new Error("no list with key " + key));
throw new Error("no list with key " + key);
}
list.updateListRange(ranges);
return this.resend();
this.resend();
}
/**
@ -479,7 +424,7 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
* (or rejects with the transaction ID if the action was not applied e.g the request was cancelled
* immediately after sending, in which case the action will be applied in the subsequent request)
*/
public setList(key: string, list: MSC3575List): Promise<string> {
public setList(key: string, list: MSC3575List): void {
const existingList = this.lists.get(key);
if (existingList) {
existingList.replaceList(list);
@ -488,7 +433,7 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
this.lists.set(key, new SlidingList(list));
}
this.listModifiedCount += 1;
return this.resend();
this.resend();
}
/**
@ -504,27 +449,21 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
* /sync request to resend new subscriptions. If the /sync stream has not started, this will
* prepare the room subscriptions for when start() is called.
* @param s - The new desired room subscriptions.
* @returns A promise which resolves to the transaction ID when it has been received down sync
* (or rejects with the transaction ID if the action was not applied e.g the request was cancelled
* immediately after sending, in which case the action will be applied in the subsequent request)
*/
public modifyRoomSubscriptions(s: Set<string>): Promise<string> {
public modifyRoomSubscriptions(s: Set<string>): void {
this.desiredRoomSubscriptions = s;
return this.resend();
this.resend();
}
/**
* Modify which events to retrieve for room subscriptions. Invalidates all room subscriptions
* such that they will be sent up afresh.
* @param rs - The new room subscription fields to fetch.
* @returns A promise which resolves to the transaction ID when it has been received down sync
* (or rejects with the transaction ID if the action was not applied e.g the request was cancelled
* immediately after sending, in which case the action will be applied in the subsequent request)
*/
public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): Promise<string> {
public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): void {
this.roomSubscriptionInfo = rs;
this.confirmedRoomSubscriptions = new Set<string>();
return this.resend();
this.resend();
}
/**
@ -538,11 +477,11 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
this.extensions[ext.name()] = ext;
}
private getExtensionRequest(isInitial: boolean): Record<string, object | undefined> {
private async getExtensionRequest(isInitial: boolean): Promise<Record<string, object | undefined>> {
const ext: Record<string, object | undefined> = {};
Object.keys(this.extensions).forEach((extName) => {
ext[extName] = this.extensions[extName].onRequest(isInitial);
});
for (const extName in this.extensions) {
ext[extName] = await this.extensions[extName].onRequest(isInitial);
}
return ext;
}
@ -595,203 +534,13 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
this.emit(SlidingSyncEvent.Lifecycle, state, resp, err);
}
private shiftRight(listKey: string, hi: number, low: number): void {
const list = this.lists.get(listKey);
if (!list) {
return;
}
// l h
// 0,1,2,3,4 <- before
// 0,1,2,2,3 <- after, hi is deleted and low is duplicated
for (let i = hi; i > low; i--) {
if (list.isIndexInRange(i)) {
list.roomIndexToRoomId[i] = list.roomIndexToRoomId[i - 1];
}
}
}
private shiftLeft(listKey: string, hi: number, low: number): void {
const list = this.lists.get(listKey);
if (!list) {
return;
}
// l h
// 0,1,2,3,4 <- before
// 0,1,3,4,4 <- after, low is deleted and hi is duplicated
for (let i = low; i < hi; i++) {
if (list.isIndexInRange(i)) {
list.roomIndexToRoomId[i] = list.roomIndexToRoomId[i + 1];
}
}
}
private removeEntry(listKey: string, index: number): void {
const list = this.lists.get(listKey);
if (!list) {
return;
}
// work out the max index
let max = -1;
for (const n in list.roomIndexToRoomId) {
if (Number(n) > max) {
max = Number(n);
}
}
if (max < 0 || index > max) {
return;
}
// Everything higher than the gap needs to be shifted left.
this.shiftLeft(listKey, max, index);
delete list.roomIndexToRoomId[max];
}
private addEntry(listKey: string, index: number): void {
const list = this.lists.get(listKey);
if (!list) {
return;
}
// work out the max index
let max = -1;
for (const n in list.roomIndexToRoomId) {
if (Number(n) > max) {
max = Number(n);
}
}
if (max < 0 || index > max) {
return;
}
// Everything higher than the gap needs to be shifted right, +1 so we don't delete the highest element
this.shiftRight(listKey, max + 1, index);
}
private processListOps(list: ListResponse, listKey: string): void {
let gapIndex = -1;
const listData = this.lists.get(listKey);
if (!listData) {
return;
}
list.ops.forEach((op: Operation) => {
if (!listData) {
return;
}
switch (op.op) {
case "DELETE": {
logger.debug("DELETE", listKey, op.index, ";");
delete listData.roomIndexToRoomId[op.index];
if (gapIndex !== -1) {
// we already have a DELETE operation to process, so process it.
this.removeEntry(listKey, gapIndex);
}
gapIndex = op.index;
break;
}
case "INSERT": {
logger.debug("INSERT", listKey, op.index, op.room_id, ";");
if (listData.roomIndexToRoomId[op.index]) {
// something is in this space, shift items out of the way
if (gapIndex < 0) {
// we haven't been told where to shift from, so make way for a new room entry.
this.addEntry(listKey, op.index);
} else if (gapIndex > op.index) {
// the gap is further down the list, shift every element to the right
// starting at the gap so we can just shift each element in turn:
// [A,B,C,_] gapIndex=3, op.index=0
// [A,B,C,C] i=3
// [A,B,B,C] i=2
// [A,A,B,C] i=1
// Terminate. We'll assign into op.index next.
this.shiftRight(listKey, gapIndex, op.index);
} else if (gapIndex < op.index) {
// the gap is further up the list, shift every element to the left
// starting at the gap so we can just shift each element in turn
this.shiftLeft(listKey, op.index, gapIndex);
}
}
// forget the gap, we don't need it anymore. This is outside the check for
// a room being present in this index position because INSERTs always universally
// forget the gap, not conditionally based on the presence of a room in the INSERT
// position. Without this, DELETE 0; INSERT 0; would do the wrong thing.
gapIndex = -1;
listData.roomIndexToRoomId[op.index] = op.room_id;
break;
}
case "INVALIDATE": {
const startIndex = op.range[0];
for (let i = startIndex; i <= op.range[1]; i++) {
delete listData.roomIndexToRoomId[i];
}
logger.debug("INVALIDATE", listKey, op.range[0], op.range[1], ";");
break;
}
case "SYNC": {
const startIndex = op.range[0];
for (let i = startIndex; i <= op.range[1]; i++) {
const roomId = op.room_ids[i - startIndex];
if (!roomId) {
break; // we are at the end of list
}
listData.roomIndexToRoomId[i] = roomId;
}
logger.debug("SYNC", listKey, op.range[0], op.range[1], (op.room_ids || []).join(" "), ";");
break;
}
}
});
if (gapIndex !== -1) {
// we already have a DELETE operation to process, so process it
// Everything higher than the gap needs to be shifted left.
this.removeEntry(listKey, gapIndex);
}
}
/**
* Resend a Sliding Sync request. Used when something has changed in the request. Resolves with
* the transaction ID of this request on success. Rejects with the transaction ID of this request
* on failure.
* Resend a Sliding Sync request. Used when something has changed in the request.
*/
public resend(): Promise<string> {
if (this.needsResend && this.txnIdDefers.length > 0) {
// we already have a resend queued, so just return the same promise
return this.txnIdDefers[this.txnIdDefers.length - 1].promise;
}
public resend(): void {
this.needsResend = true;
this.txnId = this.client.makeTxnId();
const d = defer<string>();
this.txnIdDefers.push({
...d,
txnId: this.txnId,
});
this.abortController?.abort();
this.abortController = new AbortController();
return d.promise;
}
private resolveTransactionDefers(txnId?: string): void {
if (!txnId) {
return;
}
// find the matching index
let txnIndex = -1;
for (let i = 0; i < this.txnIdDefers.length; i++) {
if (this.txnIdDefers[i].txnId === txnId) {
txnIndex = i;
break;
}
}
if (txnIndex === -1) {
// this shouldn't happen; we shouldn't be seeing txn_ids for things we don't know about,
// whine about it.
logger.warn(`resolveTransactionDefers: seen ${txnId} but it isn't a pending txn, ignoring.`);
return;
}
// This list is sorted in time, so if the input txnId ACKs in the middle of this array,
// then everything before it that hasn't been ACKed yet never will and we should reject them.
for (let i = 0; i < txnIndex; i++) {
this.txnIdDefers[i].reject(this.txnIdDefers[i].txnId);
}
this.txnIdDefers[txnIndex].resolve(txnId);
// clear out settled promises, including the one we resolved.
this.txnIdDefers = this.txnIdDefers.slice(txnIndex + 1);
}
/**
@ -802,7 +551,6 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
this.abortController?.abort();
// remove all listeners so things can be GC'd
this.removeAllListeners(SlidingSyncEvent.Lifecycle);
this.removeAllListeners(SlidingSyncEvent.List);
this.removeAllListeners(SlidingSyncEvent.RoomData);
}
@ -811,20 +559,13 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
*/
private resetup(): void {
logger.warn("SlidingSync: resetting connection info");
// any pending txn ID defers will be forgotten already by the server, so clear them out
this.txnIdDefers.forEach((d) => {
d.reject(d.txnId);
});
this.txnIdDefers = [];
// resend sticky params and de-confirm all subscriptions
this.lists.forEach((l) => {
l.setModified(true);
});
this.confirmedRoomSubscriptions = new Set<string>(); // leave desired ones alone though!
// reset the connection as we might be wedged
this.needsResend = true;
this.abortController?.abort();
this.abortController = new AbortController();
this.resend();
}
/**
@ -836,20 +577,18 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
let currentPos: string | undefined;
while (!this.terminated) {
this.needsResend = false;
let doNotUpdateList = false;
let resp: MSC3575SlidingSyncResponse | undefined;
try {
const listModifiedCount = this.listModifiedCount;
const reqLists: Record<string, MSC3575List> = {};
this.lists.forEach((l: SlidingList, key: string) => {
reqLists[key] = l.getList(false);
reqLists[key] = l.getList(true);
});
const reqBody: MSC3575SlidingSyncRequest = {
lists: reqLists,
pos: currentPos,
timeout: this.timeoutMS,
clientTimeout: this.timeoutMS + BUFFER_PERIOD_MS,
extensions: this.getExtensionRequest(currentPos === undefined),
extensions: await this.getExtensionRequest(currentPos === undefined),
};
// check if we are (un)subscribing to a room and modify request this one time for it
const newSubscriptions = difference(this.desiredRoomSubscriptions, this.confirmedRoomSubscriptions);
@ -868,10 +607,6 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
reqBody.room_subscriptions[roomId] = sub;
}
}
if (this.txnId) {
reqBody.txn_id = this.txnId;
this.txnId = null;
}
this.pendingReq = this.client.slidingSync(reqBody, this.proxyBaseUrl, this.abortController.signal);
resp = await this.pendingReq;
currentPos = resp.pos;
@ -882,13 +617,6 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
for (const roomId of unsubscriptions) {
this.confirmedRoomSubscriptions.delete(roomId);
}
if (listModifiedCount !== this.listModifiedCount) {
// the lists have been modified whilst we were waiting for 'await' to return, but the abort()
// call did nothing. It is NOT SAFE to modify the list array now. We'll process the response but
// not update list pointers.
logger.debug("list modified during await call, not updating list");
doNotUpdateList = true;
}
// mark all these lists as having been sent as sticky so we don't keep sending sticky params
this.lists.forEach((l) => {
l.setModified(false);
@ -931,27 +659,8 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
await this.invokeRoomDataListeners(roomId, resp!.rooms[roomId]);
}
const listKeysWithUpdates: Set<string> = new Set();
if (!doNotUpdateList) {
for (const [key, list] of Object.entries(resp.lists)) {
list.ops = list.ops ?? [];
if (list.ops.length > 0) {
listKeysWithUpdates.add(key);
}
this.processListOps(list, key);
}
}
this.invokeLifecycleListeners(SlidingSyncState.Complete, resp);
await this.onPostExtensionsResponse(resp.extensions);
listKeysWithUpdates.forEach((listKey: string) => {
const list = this.lists.get(listKey);
if (!list) {
return;
}
this.emit(SlidingSyncEvent.List, listKey, list.joinedCount, Object.assign({}, list.roomIndexToRoomId));
});
this.resolveTransactionDefers(resp.txn_id);
}
}
}