diff --git a/spec/integ/matrix-client-syncing.spec.js b/spec/integ/matrix-client-syncing.spec.ts similarity index 74% rename from spec/integ/matrix-client-syncing.spec.js rename to spec/integ/matrix-client-syncing.spec.ts index 0c571707a..9f3fb9887 100644 --- a/spec/integ/matrix-client-syncing.spec.js +++ b/spec/integ/matrix-client-syncing.spec.ts @@ -14,14 +14,24 @@ See the License for the specific language governing permissions and limitations under the License. */ -import { EventTimeline, MatrixEvent, RoomEvent, RoomStateEvent, RoomMemberEvent } from "../../src"; -import { UNSTABLE_MSC2716_MARKER } from "../../src/@types/event"; +import { Optional } from "matrix-events-sdk/lib/types"; +import HttpBackend from "matrix-mock-request"; + +import { + EventTimeline, + MatrixEvent, + RoomEvent, + RoomStateEvent, + RoomMemberEvent, + UNSTABLE_MSC2716_MARKER, + MatrixClient, +} from "../../src"; import * as utils from "../test-utils/test-utils"; import { TestClient } from "../TestClient"; -describe("MatrixClient syncing", function() { - let client = null; - let httpBackend = null; +describe("MatrixClient syncing", () => { + let client: Optional = null; + let httpBackend: Optional = null; const selfUserId = "@alice:localhost"; const selfAccessToken = "aseukfgwef"; const otherUserId = "@bob:localhost"; @@ -31,48 +41,47 @@ describe("MatrixClient syncing", function() { const roomOne = "!foo:localhost"; const roomTwo = "!bar:localhost"; - beforeEach(function() { + beforeEach(() => { const testClient = new TestClient(selfUserId, "DEVICE", selfAccessToken); httpBackend = testClient.httpBackend; client = testClient.client; - httpBackend.when("GET", "/versions").respond(200, {}); - httpBackend.when("GET", "/pushrules").respond(200, {}); - httpBackend.when("POST", "/filter").respond(200, { filter_id: "a filter id" }); + httpBackend!.when("GET", "/versions").respond(200, {}); + httpBackend!.when("GET", "/pushrules").respond(200, {}); + httpBackend!.when("POST", "/filter").respond(200, { filter_id: "a filter id" }); }); - afterEach(function() { - httpBackend.verifyNoOutstandingExpectation(); - client.stopClient(); - return httpBackend.stop(); + afterEach(() => { + httpBackend!.verifyNoOutstandingExpectation(); + client!.stopClient(); + return httpBackend!.stop(); }); - describe("startClient", function() { + describe("startClient", () => { const syncData = { next_batch: "batch_token", rooms: {}, presence: {}, }; - it("should /sync after /pushrules and /filter.", function(done) { - httpBackend.when("GET", "/sync").respond(200, syncData); + it("should /sync after /pushrules and /filter.", (done) => { + httpBackend!.when("GET", "/sync").respond(200, syncData); - client.startClient(); + client!.startClient(); - httpBackend.flushAllExpected().then(function() { + httpBackend!.flushAllExpected().then(() => { done(); }); }); - it("should pass the 'next_batch' token from /sync to the since= param " + - " of the next /sync", function(done) { - httpBackend.when("GET", "/sync").respond(200, syncData); - httpBackend.when("GET", "/sync").check(function(req) { + it("should pass the 'next_batch' token from /sync to the since= param of the next /sync", (done) => { + httpBackend!.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").check((req) => { expect(req.queryParams.since).toEqual(syncData.next_batch); }).respond(200, syncData); - client.startClient(); + client!.startClient(); - httpBackend.flushAllExpected().then(function() { + httpBackend!.flushAllExpected().then(() => { done(); }); }); @@ -96,14 +105,14 @@ describe("MatrixClient syncing", function() { }, }, }; - httpBackend.when("GET", "/sync").respond(200, { + httpBackend!.when("GET", "/sync").respond(200, { ...syncData, rooms: inviteSyncRoomSection, }); // Second sync: a leave (reject of some kind) - httpBackend.when("POST", "/leave").respond(200, {}); - httpBackend.when("GET", "/sync").respond(200, { + httpBackend!.when("POST", "/leave").respond(200, {}); + httpBackend!.when("GET", "/sync").respond(200, { ...syncData, rooms: { leave: { @@ -143,28 +152,28 @@ describe("MatrixClient syncing", function() { }); // Third sync: another invite - httpBackend.when("GET", "/sync").respond(200, { + httpBackend!.when("GET", "/sync").respond(200, { ...syncData, rooms: inviteSyncRoomSection, }); // First fire: an initial invite let fires = 0; - client.once(RoomEvent.MyMembership, (room, membership, oldMembership) => { // Room, string, string + client!.once(RoomEvent.MyMembership, (room, membership, oldMembership) => { // Room, string, string fires++; expect(room.roomId).toBe(roomId); expect(membership).toBe("invite"); expect(oldMembership).toBeFalsy(); // Second fire: a leave - client.once(RoomEvent.MyMembership, (room, membership, oldMembership) => { + client!.once(RoomEvent.MyMembership, (room, membership, oldMembership) => { fires++; expect(room.roomId).toBe(roomId); expect(membership).toBe("leave"); expect(oldMembership).toBe("invite"); // Third/final fire: a second invite - client.once(RoomEvent.MyMembership, (room, membership, oldMembership) => { + client!.once(RoomEvent.MyMembership, (room, membership, oldMembership) => { fires++; expect(room.roomId).toBe(roomId); expect(membership).toBe("invite"); @@ -173,18 +182,81 @@ describe("MatrixClient syncing", function() { }); // For maximum safety, "leave" the room after we register the handler - client.leave(roomId); + client!.leave(roomId); }); // noinspection ES6MissingAwait - client.startClient(); - await httpBackend.flushAllExpected(); + client!.startClient(); + await httpBackend!.flushAllExpected(); expect(fires).toBe(3); }); + + it("should honour lazyLoadMembers if user is not a guest", () => { + client!.doesServerSupportLazyLoading = jest.fn().mockResolvedValue(true); + + httpBackend!.when("GET", "/sync").check((req) => { + expect(JSON.parse(req.queryParams.filter).room.state.lazy_load_members).toBeTruthy(); + }).respond(200, syncData); + + client!.setGuest(false); + client!.startClient({ lazyLoadMembers: true }); + + return httpBackend!.flushAllExpected(); + }); + + it("should not honour lazyLoadMembers if user is a guest", () => { + httpBackend!.expectedRequests = []; + httpBackend!.when("GET", "/versions").respond(200, {}); + client!.doesServerSupportLazyLoading = jest.fn().mockResolvedValue(true); + + httpBackend!.when("GET", "/sync").check((req) => { + expect(JSON.parse(req.queryParams.filter).room?.state?.lazy_load_members).toBeFalsy(); + }).respond(200, syncData); + + client!.setGuest(true); + client!.startClient({ lazyLoadMembers: true }); + + return httpBackend!.flushAllExpected(); + }); }); - describe("resolving invites to profile info", function() { + describe("initial sync", () => { + const syncData = { + next_batch: "batch_token", + rooms: {}, + presence: {}, + }; + + it("should only apply initialSyncLimit to the initial sync", () => { + // 1st request + httpBackend!.when("GET", "/sync").check((req) => { + expect(JSON.parse(req.queryParams.filter).room.timeline.limit).toEqual(1); + }).respond(200, syncData); + // 2nd request + httpBackend!.when("GET", "/sync").check((req) => { + expect(req.queryParams.filter).toEqual("a filter id"); + }).respond(200, syncData); + + client!.startClient({ initialSyncLimit: 1 }); + + httpBackend!.flushSync(undefined); + return httpBackend!.flushAllExpected(); + }); + + it("should not apply initialSyncLimit to a first sync if we have a stored token", () => { + httpBackend!.when("GET", "/sync").check((req) => { + expect(req.queryParams.filter).toEqual("a filter id"); + }).respond(200, syncData); + + client!.store.getSavedSyncToken = jest.fn().mockResolvedValue("this-is-a-token"); + client!.startClient({ initialSyncLimit: 1 }); + + return httpBackend!.flushAllExpected(); + }); + }); + + describe("resolving invites to profile info", () => { const syncData = { next_batch: "s_5_3", presence: { @@ -197,7 +269,7 @@ describe("MatrixClient syncing", function() { }, }; - beforeEach(function() { + beforeEach(() => { syncData.presence.events = []; syncData.rooms.join[roomOne] = { timeline: { @@ -226,41 +298,43 @@ describe("MatrixClient syncing", function() { }; }); - it("should resolve incoming invites from /sync", function() { + it("should resolve incoming invites from /sync", () => { syncData.rooms.join[roomOne].state.events.push( utils.mkMembership({ room: roomOne, mship: "invite", user: userC, }), ); - httpBackend.when("GET", "/sync").respond(200, syncData); - httpBackend.when("GET", "/profile/" + encodeURIComponent(userC)).respond( + httpBackend!.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/profile/" + encodeURIComponent(userC)).respond( 200, { avatar_url: "mxc://flibble/wibble", displayname: "The Boss", }, ); - client.startClient({ + client!.startClient({ resolveInvitesToProfiles: true, }); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), - ]).then(function() { - const member = client.getRoom(roomOne).getMember(userC); + ]).then(() => { + const member = client!.getRoom(roomOne).getMember(userC); expect(member.name).toEqual("The Boss"); expect( - member.getAvatarUrl("home.server.url", null, null, null, false), + member.getAvatarUrl("home.server.url", null, null, null, false, false), ).toBeTruthy(); }); }); - it("should use cached values from m.presence wherever possible", function() { + it("should use cached values from m.presence wherever possible", () => { syncData.presence.events = [ utils.mkPresence({ - user: userC, presence: "online", name: "The Ghost", + user: userC, + presence: "online", + name: "The Ghost", }), ]; syncData.rooms.join[roomOne].state.events.push( @@ -269,25 +343,27 @@ describe("MatrixClient syncing", function() { }), ); - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); - client.startClient({ + client!.startClient({ resolveInvitesToProfiles: true, }); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), - ]).then(function() { - const member = client.getRoom(roomOne).getMember(userC); + ]).then(() => { + const member = client!.getRoom(roomOne).getMember(userC); expect(member.name).toEqual("The Ghost"); }); }); - it("should result in events on the room member firing", function() { + it("should result in events on the room member firing", () => { syncData.presence.events = [ utils.mkPresence({ - user: userC, presence: "online", name: "The Ghost", + user: userC, + presence: "online", + name: "The Ghost", }), ]; syncData.rooms.join[roomOne].state.events.push( @@ -296,83 +372,84 @@ describe("MatrixClient syncing", function() { }), ); - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); let latestFiredName = null; - client.on(RoomMemberEvent.Name, function(event, m) { + client!.on(RoomMemberEvent.Name, (event, m) => { if (m.userId === userC && m.roomId === roomOne) { latestFiredName = m.name; } }); - client.startClient({ + client!.startClient({ resolveInvitesToProfiles: true, }); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), - ]).then(function() { + ]).then(() => { expect(latestFiredName).toEqual("The Ghost"); }); }); - it("should no-op if resolveInvitesToProfiles is not set", function() { + it("should no-op if resolveInvitesToProfiles is not set", () => { syncData.rooms.join[roomOne].state.events.push( utils.mkMembership({ room: roomOne, mship: "invite", user: userC, }), ); - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); - client.startClient(); + client!.startClient(); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), - ]).then(function() { - const member = client.getRoom(roomOne).getMember(userC); + ]).then(() => { + const member = client!.getRoom(roomOne).getMember(userC); expect(member.name).toEqual(userC); expect( - member.getAvatarUrl("home.server.url", null, null, null, false), + member.getAvatarUrl("home.server.url", null, null, null, false, false), ).toBe(null); }); }); }); - describe("users", function() { + describe("users", () => { const syncData = { next_batch: "nb", presence: { events: [ utils.mkPresence({ - user: userA, presence: "online", + user: userA, + presence: "online", }), utils.mkPresence({ - user: userB, presence: "unavailable", + user: userB, + presence: "unavailable", }), ], }, }; - it("should create users for presence events from /sync", - function() { - httpBackend.when("GET", "/sync").respond(200, syncData); + it("should create users for presence events from /sync", () => { + httpBackend!.when("GET", "/sync").respond(200, syncData); - client.startClient(); + client!.startClient(); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), - ]).then(function() { - expect(client.getUser(userA).presence).toEqual("online"); - expect(client.getUser(userB).presence).toEqual("unavailable"); + ]).then(() => { + expect(client!.getUser(userA).presence).toEqual("online"); + expect(client!.getUser(userB).presence).toEqual("unavailable"); }); }); }); - describe("room state", function() { + describe("room state", () => { const msgText = "some text here"; const otherDisplayName = "Bob Smith"; @@ -478,17 +555,17 @@ describe("MatrixClient syncing", function() { }, }; - it("should continually recalculate the right room name.", function() { - httpBackend.when("GET", "/sync").respond(200, syncData); - httpBackend.when("GET", "/sync").respond(200, nextSyncData); + it("should continually recalculate the right room name.", () => { + httpBackend!.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, nextSyncData); - client.startClient(); + client!.startClient(); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(2), - ]).then(function() { - const room = client.getRoom(roomOne); + ]).then(() => { + const room = client!.getRoom(roomOne); // should have clobbered the name to the one from /events expect(room.name).toEqual( nextSyncData.rooms.join[roomOne].state.events[0].content.name, @@ -496,49 +573,49 @@ describe("MatrixClient syncing", function() { }); }); - it("should store the right events in the timeline.", function() { - httpBackend.when("GET", "/sync").respond(200, syncData); - httpBackend.when("GET", "/sync").respond(200, nextSyncData); + it("should store the right events in the timeline.", () => { + httpBackend!.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, nextSyncData); - client.startClient(); + client!.startClient(); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(2), - ]).then(function() { - const room = client.getRoom(roomTwo); + ]).then(() => { + const room = client!.getRoom(roomTwo); // should have added the message from /events expect(room.timeline.length).toEqual(2); expect(room.timeline[1].getContent().body).toEqual(msgText); }); }); - it("should set the right room name.", function() { - httpBackend.when("GET", "/sync").respond(200, syncData); - httpBackend.when("GET", "/sync").respond(200, nextSyncData); + it("should set the right room name.", () => { + httpBackend!.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, nextSyncData); - client.startClient(); + client!.startClient(); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(2), - ]).then(function() { - const room = client.getRoom(roomTwo); + ]).then(() => { + const room = client!.getRoom(roomTwo); // should use the display name of the other person. expect(room.name).toEqual(otherDisplayName); }); }); - it("should set the right user's typing flag.", function() { - httpBackend.when("GET", "/sync").respond(200, syncData); - httpBackend.when("GET", "/sync").respond(200, nextSyncData); + it("should set the right user's typing flag.", () => { + httpBackend!.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, nextSyncData); - client.startClient(); + client!.startClient(); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(2), - ]).then(function() { - const room = client.getRoom(roomTwo); + ]).then(() => { + const room = client!.getRoom(roomTwo); let member = room.getMember(otherUserId); expect(member).toBeTruthy(); expect(member.typing).toEqual(true); @@ -552,16 +629,16 @@ describe("MatrixClient syncing", function() { // events that arrive in the incremental sync as if they preceeded the // timeline events, however this breaks peeking, so it's disabled // (see sync.js) - xit("should correctly interpret state in incremental sync.", function() { - httpBackend.when("GET", "/sync").respond(200, syncData); - httpBackend.when("GET", "/sync").respond(200, nextSyncData); + xit("should correctly interpret state in incremental sync.", () => { + httpBackend!.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, nextSyncData); - client.startClient(); + client!.startClient(); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(2), - ]).then(function() { - const room = client.getRoom(roomOne); + ]).then(() => { + const room = client!.getRoom(roomOne); const stateAtStart = room.getLiveTimeline().getState( EventTimeline.BACKWARDS, ); @@ -576,11 +653,11 @@ describe("MatrixClient syncing", function() { }); }); - xit("should update power levels for users in a room", function() { + xit("should update power levels for users in a room", () => { }); - xit("should update the room topic", function() { + xit("should update the room topic", () => { }); @@ -650,16 +727,16 @@ describe("MatrixClient syncing", function() { expect(markerEvent.sender).toBeDefined(); expect(markerEvent.sender).not.toEqual(roomCreateEvent.sender); - httpBackend.when("GET", "/sync").respond(200, normalFirstSync); - httpBackend.when("GET", "/sync").respond(200, nextSyncData); + httpBackend!.when("GET", "/sync").respond(200, normalFirstSync); + httpBackend!.when("GET", "/sync").respond(200, nextSyncData); - client.startClient(); + client!.startClient(); await Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(2), ]); - const room = client.getRoom(roomOne); + const room = client!.getRoom(roomOne); expect(room.getTimelineNeedsRefresh()).toEqual(false); }); @@ -721,15 +798,15 @@ describe("MatrixClient syncing", function() { }, }; - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); - client.startClient(); + client!.startClient(); await Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), ]); - const room = client.getRoom(roomOne); + const room = client!.getRoom(roomOne); expect(room.getTimelineNeedsRefresh()).toEqual(false); }); @@ -751,15 +828,15 @@ describe("MatrixClient syncing", function() { }, }; - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); - client.startClient(); + client!.startClient(); await Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), ]); - const room = client.getRoom(roomOne); + const room = client!.getRoom(roomOne); expect(room.getTimelineNeedsRefresh()).toEqual(false); }); @@ -784,15 +861,15 @@ describe("MatrixClient syncing", function() { }, }; - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); - client.startClient(); + client!.startClient(); await Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), ]); - const room = client.getRoom(roomOne); + const room = client!.getRoom(roomOne); expect(room.getTimelineNeedsRefresh()).toEqual(false); }); @@ -818,27 +895,27 @@ describe("MatrixClient syncing", function() { const markerEventId = nextSyncData.rooms.join[roomOne].timeline.events[0].event_id; // Only do the first sync - httpBackend.when("GET", "/sync").respond(200, normalFirstSync); - client.startClient(); + httpBackend!.when("GET", "/sync").respond(200, normalFirstSync); + client!.startClient(); await Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), ]); // Get the room after the first sync so the room is created - const room = client.getRoom(roomOne); + const room = client!.getRoom(roomOne); let emitCount = 0; - room.on(RoomEvent.HistoryImportedWithinTimeline, function(markerEvent, room) { + room.on(RoomEvent.HistoryImportedWithinTimeline, (markerEvent, room) => { expect(markerEvent.getId()).toEqual(markerEventId); expect(room.roomId).toEqual(roomOne); emitCount += 1; }); // Now do a subsequent sync with the marker event - httpBackend.when("GET", "/sync").respond(200, nextSyncData); + httpBackend!.when("GET", "/sync").respond(200, nextSyncData); await Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), ]); @@ -873,16 +950,16 @@ describe("MatrixClient syncing", function() { }, }; - httpBackend.when("GET", "/sync").respond(200, normalFirstSync); - httpBackend.when("GET", "/sync").respond(200, nextSyncData); + httpBackend!.when("GET", "/sync").respond(200, normalFirstSync); + httpBackend!.when("GET", "/sync").respond(200, nextSyncData); - client.startClient(); + client!.startClient(); await Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(2), ]); - const room = client.getRoom(roomOne); + const room = client!.getRoom(roomOne); expect(room.getTimelineNeedsRefresh()).toEqual(true); }); }); @@ -929,19 +1006,19 @@ describe("MatrixClient syncing", function() { it("should be able to listen to state events even after " + "the timeline is reset during `limited` sync response", async () => { // Create a room from the sync - httpBackend.when("GET", "/sync").respond(200, syncData); - client.startClient(); + httpBackend!.when("GET", "/sync").respond(200, syncData); + client!.startClient(); await Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), ]); // Get the room after the first sync so the room is created - const room = client.getRoom(roomOne); + const room = client!.getRoom(roomOne); expect(room).toBeTruthy(); let stateEventEmitCount = 0; - client.on(RoomStateEvent.Update, () => { + client!.on(RoomStateEvent.Update, () => { stateEventEmitCount += 1; }); @@ -969,10 +1046,10 @@ describe("MatrixClient syncing", function() { prev_batch: "newerTok", }, }; - httpBackend.when("GET", "/sync").respond(200, limitedSyncData); + httpBackend!.when("GET", "/sync").respond(200, limitedSyncData); await Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), ]); @@ -997,25 +1074,25 @@ describe("MatrixClient syncing", function() { { timelineSupport: true }, ); httpBackend = testClientWithTimelineSupport.httpBackend; - httpBackend.when("GET", "/versions").respond(200, {}); - httpBackend.when("GET", "/pushrules").respond(200, {}); - httpBackend.when("POST", "/filter").respond(200, { filter_id: "a filter id" }); + httpBackend!.when("GET", "/versions").respond(200, {}); + httpBackend!.when("GET", "/pushrules").respond(200, {}); + httpBackend!.when("POST", "/filter").respond(200, { filter_id: "a filter id" }); client = testClientWithTimelineSupport.client; // Create a room from the sync - httpBackend.when("GET", "/sync").respond(200, syncData); - client.startClient(); + httpBackend!.when("GET", "/sync").respond(200, syncData); + client!.startClient(); await Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), ]); // Get the room after the first sync so the room is created - const room = client.getRoom(roomOne); + const room = client!.getRoom(roomOne); expect(room).toBeTruthy(); let stateEventEmitCount = 0; - client.on(RoomStateEvent.Update, () => { + client!.on(RoomStateEvent.Update, () => { stateEventEmitCount += 1; }); @@ -1027,8 +1104,8 @@ describe("MatrixClient syncing", function() { const eventsInRoom = syncData.rooms.join[roomOne].timeline.events; const contextUrl = `/rooms/${encodeURIComponent(roomOne)}/context/` + `${encodeURIComponent(eventsInRoom[0].event_id)}`; - httpBackend.when("GET", contextUrl) - .respond(200, function() { + httpBackend!.when("GET", contextUrl) + .respond(200, () => { return { start: "start_token", events_before: [EVENTS[1], EVENTS[0]], @@ -1045,7 +1122,7 @@ describe("MatrixClient syncing", function() { // reference to change await Promise.all([ room.refreshLiveTimeline(), - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), ]); // Cause `RoomStateEvent.Update` to be fired @@ -1056,8 +1133,8 @@ describe("MatrixClient syncing", function() { }); }); - describe("timeline", function() { - beforeEach(function() { + describe("timeline", () => { + beforeEach(() => { const syncData = { next_batch: "batch_token", rooms: { @@ -1075,16 +1152,16 @@ describe("MatrixClient syncing", function() { }, }; - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); - client.startClient(); + client!.startClient(); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), ]); }); - it("should set the back-pagination token on new rooms", function() { + it("should set the back-pagination token on new rooms", () => { const syncData = { next_batch: "batch_token", rooms: { @@ -1102,13 +1179,13 @@ describe("MatrixClient syncing", function() { }, }; - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), - ]).then(function() { - const room = client.getRoom(roomTwo); + ]).then(() => { + const room = client!.getRoom(roomTwo); expect(room).toBeTruthy(); const tok = room.getLiveTimeline() .getPaginationToken(EventTimeline.BACKWARDS); @@ -1116,7 +1193,7 @@ describe("MatrixClient syncing", function() { }); }); - it("should set the back-pagination token on gappy syncs", function() { + it("should set the back-pagination token on gappy syncs", () => { const syncData = { next_batch: "batch_token", rooms: { @@ -1134,11 +1211,11 @@ describe("MatrixClient syncing", function() { prev_batch: "newerTok", }, }; - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); let resetCallCount = 0; // the token should be set *before* timelineReset is emitted - client.on(RoomEvent.TimelineReset, function(room) { + client!.on(RoomEvent.TimelineReset, (room) => { resetCallCount++; const tl = room.getLiveTimeline(); @@ -1148,10 +1225,10 @@ describe("MatrixClient syncing", function() { }); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), - ]).then(function() { - const room = client.getRoom(roomOne); + ]).then(() => { + const room = client!.getRoom(roomOne); const tl = room.getLiveTimeline(); expect(tl.getEvents().length).toEqual(1); expect(resetCallCount).toEqual(1); @@ -1159,7 +1236,7 @@ describe("MatrixClient syncing", function() { }); }); - describe("receipts", function() { + describe("receipts", () => { const syncData = { rooms: { join: { @@ -1202,13 +1279,13 @@ describe("MatrixClient syncing", function() { }, }; - beforeEach(function() { + beforeEach(() => { syncData.rooms.join[roomOne].ephemeral = { events: [], }; }); - it("should sync receipts from /sync.", function() { + it("should sync receipts from /sync.", () => { const ackEvent = syncData.rooms.join[roomOne].timeline.events[0]; const receipt = {}; receipt[ackEvent.event_id] = { @@ -1222,15 +1299,15 @@ describe("MatrixClient syncing", function() { room_id: roomOne, type: "m.receipt", }]; - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); - client.startClient(); + client!.startClient(); return Promise.all([ - httpBackend.flushAllExpected(), + httpBackend!.flushAllExpected(), awaitSyncEvent(), - ]).then(function() { - const room = client.getRoom(roomOne); + ]).then(() => { + const room = client!.getRoom(roomOne); expect(room.getReceiptsForEvent(new MatrixEvent(ackEvent))).toEqual([{ type: "m.read", userId: userC, @@ -1242,59 +1319,62 @@ describe("MatrixClient syncing", function() { }); }); - describe("of a room", function() { + describe("of a room", () => { xit("should sync when a join event (which changes state) for the user" + - " arrives down the event stream (e.g. join from another device)", function() { + " arrives down the event stream (e.g. join from another device)", () => { }); - xit("should sync when the user explicitly calls joinRoom", function() { + xit("should sync when the user explicitly calls joinRoom", () => { }); }); - describe("syncLeftRooms", function() { - beforeEach(function(done) { - client.startClient(); + describe("syncLeftRooms", () => { + beforeEach((done) => { + client!.startClient(); - httpBackend.flushAllExpected().then(function() { + httpBackend!.flushAllExpected().then(() => { // the /sync call from syncLeftRooms ends up in the request // queue behind the call from the running client; add a response // to flush the client's one out. - httpBackend.when("GET", "/sync").respond(200, {}); + httpBackend!.when("GET", "/sync").respond(200, {}); done(); }); }); - it("should create and use an appropriate filter", function() { - httpBackend.when("POST", "/filter").check(function(req) { + it("should create and use an appropriate filter", () => { + httpBackend!.when("POST", "/filter").check((req) => { expect(req.data).toEqual({ - room: { timeline: { limit: 1 }, - include_leave: true } }); + room: { + timeline: { limit: 1 }, + include_leave: true, + }, + }); }).respond(200, { filter_id: "another_id" }); - const prom = new Promise((resolve) => { - httpBackend.when("GET", "/sync").check(function(req) { + const prom = new Promise((resolve) => { + httpBackend!.when("GET", "/sync").check((req) => { expect(req.queryParams.filter).toEqual("another_id"); resolve(); }).respond(200, {}); }); - client.syncLeftRooms(); + client!.syncLeftRooms(); // first flush the filter request; this will make syncLeftRooms // make its /sync call return Promise.all([ - httpBackend.flush("/filter").then(function() { + httpBackend!.flush("/filter").then(() => { // flush the syncs - return httpBackend.flushAllExpected(); + return httpBackend!.flushAllExpected(); }), prom, ]); }); - it("should set the back-pagination token on left rooms", function() { + it("should set the back-pagination token on left rooms", () => { const syncData = { next_batch: "batch_token", rooms: { @@ -1313,15 +1393,15 @@ describe("MatrixClient syncing", function() { }, }; - httpBackend.when("POST", "/filter").respond(200, { + httpBackend!.when("POST", "/filter").respond(200, { filter_id: "another_id", }); - httpBackend.when("GET", "/sync").respond(200, syncData); + httpBackend!.when("GET", "/sync").respond(200, syncData); return Promise.all([ - client.syncLeftRooms().then(function() { - const room = client.getRoom(roomTwo); + client!.syncLeftRooms().then(() => { + const room = client!.getRoom(roomTwo); const tok = room.getLiveTimeline().getPaginationToken( EventTimeline.BACKWARDS); @@ -1329,8 +1409,8 @@ describe("MatrixClient syncing", function() { }), // first flush the filter request; this will make syncLeftRooms make its /sync call - httpBackend.flush("/filter").then(function() { - return httpBackend.flushAllExpected(); + httpBackend!.flush("/filter").then(() => { + return httpBackend!.flushAllExpected(); }), ]); }); @@ -1342,7 +1422,7 @@ describe("MatrixClient syncing", function() { * @param {Number?} numSyncs number of syncs to wait for * @returns {Promise} promise which resolves after the sync events have happened */ - function awaitSyncEvent(numSyncs) { + function awaitSyncEvent(numSyncs?: number) { return utils.syncPromise(client, numSyncs); } }); diff --git a/spec/test-utils/test-utils.ts b/spec/test-utils/test-utils.ts index abb328e0c..16d1cb565 100644 --- a/spec/test-utils/test-utils.ts +++ b/spec/test-utils/test-utils.ts @@ -147,9 +147,9 @@ export function mkEventCustom(base: T): T & GeneratedMetadata { interface IPresenceOpts { user?: string; sender?: string; - url: string; - name: string; - ago: number; + url?: string; + name?: string; + ago?: number; presence?: string; event?: boolean; } diff --git a/src/client.ts b/src/client.ts index acda99b70..d8fc75d56 100644 --- a/src/client.ts +++ b/src/client.ts @@ -396,8 +396,7 @@ export interface IStartClientOpts { pollTimeout?: number; /** - * The filter to apply to /sync calls. This will override the opts.initialSyncLimit, which would - * normally result in a timeline limit filter. + * The filter to apply to /sync calls. */ filter?: Filter; diff --git a/src/sync.ts b/src/sync.ts index 0a84c19a7..430e02602 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -23,6 +23,8 @@ limitations under the License. * for HTTP and WS at some point. */ +import { Optional } from "matrix-events-sdk"; + import { User, UserEvent } from "./models/user"; import { NotificationCountType, Room, RoomEvent } from "./models/room"; import * as utils from "./utils"; @@ -100,18 +102,16 @@ const MSC2716_ROOM_VERSIONS = [ function getFilterName(userId: string, suffix?: string): string { // scope this on the user ID because people may login on many accounts // and they all need to be stored! - return "FILTER_SYNC_" + userId + (suffix ? "_" + suffix : ""); + return `FILTER_SYNC_${userId}` + suffix ? "_" + suffix : ""; } function debuglog(...params) { - if (!DEBUG) { - return; - } + if (!DEBUG) return; logger.log(...params); } interface ISyncOptions { - filterId?: string; + filter?: string; hasSyncedBefore?: boolean; } @@ -161,14 +161,14 @@ type WrappedRoom = T & { * updating presence. */ export class SyncApi { - private _peekRoom: Room = null; - private currentSyncRequest: IAbortablePromise = null; - private syncState: SyncState = null; - private syncStateData: ISyncStateData = null; // additional data (eg. error object for failed sync) + private _peekRoom: Optional = null; + private currentSyncRequest: Optional> = null; + private syncState: Optional = null; + private syncStateData: Optional = null; // additional data (eg. error object for failed sync) private catchingUp = false; private running = false; - private keepAliveTimer: ReturnType = null; - private connectionReturnedDefer: IDeferred = null; + private keepAliveTimer: Optional> = null; + private connectionReturnedDefer: Optional> = null; private notifEvents: MatrixEvent[] = []; // accumulator of sync events in the current sync response private failedSyncCount = 0; // Number of consecutive failed /sync requests private storeIsInvalid = false; // flag set if the store needs to be cleared before we can start @@ -214,7 +214,7 @@ export class SyncApi { * historical messages are shown when we paginate `/messages` again. * @param {Room} room The room where the marker event was sent * @param {MatrixEvent} markerEvent The new marker event - * @param {ISetStateOptions} setStateOptions When `timelineWasEmpty` is set + * @param {IMarkerFoundOptions} setStateOptions When `timelineWasEmpty` is set * as `true`, the given marker event will be ignored */ private onMarkerStateEvent( @@ -367,7 +367,7 @@ export class SyncApi { // XXX: copypasted from /sync until we kill off this minging v1 API stuff) // handle presence events (User objects) - if (response.presence && Array.isArray(response.presence)) { + if (Array.isArray(response.presence)) { response.presence.map(client.getEventMapper()).forEach( function(presenceEvent) { let user = client.store.getUser(presenceEvent.getContent().user_id); @@ -542,20 +542,135 @@ export class SyncApi { return false; } + private getPushRules = async () => { + try { + debuglog("Getting push rules..."); + const result = await this.client.getPushRules(); + debuglog("Got push rules"); + + this.client.pushRules = result; + } catch (err) { + logger.error("Getting push rules failed", err); + if (this.shouldAbortSync(err)) return; + // wait for saved sync to complete before doing anything else, + // otherwise the sync state will end up being incorrect + debuglog("Waiting for saved sync before retrying push rules..."); + await this.recoverFromSyncStartupError(this.savedSyncPromise, err); + return this.getPushRules(); // try again + } + }; + + private buildDefaultFilter = () => { + return new Filter(this.client.credentials.userId); + }; + + private checkLazyLoadStatus = async () => { + debuglog("Checking lazy load status..."); + if (this.opts.lazyLoadMembers && this.client.isGuest()) { + this.opts.lazyLoadMembers = false; + } + if (this.opts.lazyLoadMembers) { + debuglog("Checking server lazy load support..."); + const supported = await this.client.doesServerSupportLazyLoading(); + if (supported) { + debuglog("Enabling lazy load on sync filter..."); + if (!this.opts.filter) { + this.opts.filter = this.buildDefaultFilter(); + } + this.opts.filter.setLazyLoadMembers(true); + } else { + debuglog("LL: lazy loading requested but not supported " + + "by server, so disabling"); + this.opts.lazyLoadMembers = false; + } + } + // need to vape the store when enabling LL and wasn't enabled before + debuglog("Checking whether lazy loading has changed in store..."); + const shouldClear = await this.wasLazyLoadingToggled(this.opts.lazyLoadMembers); + if (shouldClear) { + this.storeIsInvalid = true; + const reason = InvalidStoreError.TOGGLED_LAZY_LOADING; + const error = new InvalidStoreError(reason, !!this.opts.lazyLoadMembers); + this.updateSyncState(SyncState.Error, { error }); + // bail out of the sync loop now: the app needs to respond to this error. + // we leave the state as 'ERROR' which isn't great since this normally means + // we're retrying. The client must be stopped before clearing the stores anyway + // so the app should stop the client, clear the store and start it again. + logger.warn("InvalidStoreError: store is not usable: stopping sync."); + return; + } + if (this.opts.lazyLoadMembers) { + this.opts.crypto?.enableLazyLoading(); + } + try { + debuglog("Storing client options..."); + await this.client.storeClientOptions(); + debuglog("Stored client options"); + } catch (err) { + logger.error("Storing client options failed", err); + throw err; + } + }; + + private getFilter = async (): Promise<{ + filterId?: string; + filter?: Filter; + }> => { + debuglog("Getting filter..."); + let filter: Filter; + if (this.opts.filter) { + filter = this.opts.filter; + } else { + filter = this.buildDefaultFilter(); + } + + let filterId: string; + try { + filterId = await this.client.getOrCreateFilter(getFilterName(this.client.credentials.userId), filter); + } catch (err) { + logger.error("Getting filter failed", err); + if (this.shouldAbortSync(err)) return {}; + // wait for saved sync to complete before doing anything else, + // otherwise the sync state will end up being incorrect + debuglog("Waiting for saved sync before retrying filter..."); + await this.recoverFromSyncStartupError(this.savedSyncPromise, err); + return this.getFilter(); // try again + } + return { filter, filterId }; + }; + + private savedSyncPromise: Promise; + /** * Main entry point */ - public sync(): void { - const client = this.client; - + public async sync(): Promise { this.running = true; - if (global.window && global.window.addEventListener) { - global.window.addEventListener("online", this.onOnline, false); + global.window?.addEventListener?.("online", this.onOnline, false); + + if (this.client.isGuest()) { + // no push rules for guests, no access to POST filter for guests. + return this.doSync({}); } - let savedSyncPromise = Promise.resolve(); - let savedSyncToken = null; + // Pull the saved sync token out first, before the worker starts sending + // all the sync data which could take a while. This will let us send our + // first incremental sync request before we've processed our saved data. + debuglog("Getting saved sync token..."); + const savedSyncTokenPromise = this.client.store.getSavedSyncToken().then(tok => { + debuglog("Got saved sync token"); + return tok; + }); + + this.savedSyncPromise = this.client.store.getSavedSync().then((savedSync) => { + debuglog(`Got reply from saved sync, exists? ${!!savedSync}`); + if (savedSync) { + return this.syncFromCache(savedSync); + } + }).catch(err => { + logger.error("Getting saved sync failed", err); + }); // We need to do one-off checks before we can begin the /sync loop. // These are: @@ -565,149 +680,45 @@ export class SyncApi { // 3) We need to check the lazy loading option matches what was used in the // stored sync. If it doesn't, we can't use the stored sync. - const getPushRules = async () => { - try { - debuglog("Getting push rules..."); - const result = await client.getPushRules(); - debuglog("Got push rules"); + // Now start the first incremental sync request: this can also + // take a while so if we set it going now, we can wait for it + // to finish while we process our saved sync data. + await this.getPushRules(); + await this.checkLazyLoadStatus(); + const { filterId, filter } = await this.getFilter(); + if (!filter) return; // bail, getFilter failed - client.pushRules = result; - } catch (err) { - logger.error("Getting push rules failed", err); - if (this.shouldAbortSync(err)) return; - // wait for saved sync to complete before doing anything else, - // otherwise the sync state will end up being incorrect - debuglog("Waiting for saved sync before retrying push rules..."); - await this.recoverFromSyncStartupError(savedSyncPromise, err); - getPushRules(); - return; - } - checkLazyLoadStatus(); // advance to the next stage - }; + // reset the notifications timeline to prepare it to paginate from + // the current point in time. + // The right solution would be to tie /sync pagination tokens into + // /notifications API somehow. + this.client.resetNotifTimelineSet(); - const buildDefaultFilter = () => { - const filter = new Filter(client.credentials.userId); - filter.setTimelineLimit(this.opts.initialSyncLimit); - return filter; - }; + if (this.currentSyncRequest === null) { + let firstSyncFilter = filterId; + const savedSyncToken = await savedSyncTokenPromise; - const checkLazyLoadStatus = async () => { - debuglog("Checking lazy load status..."); - if (this.opts.lazyLoadMembers && client.isGuest()) { - this.opts.lazyLoadMembers = false; - } - if (this.opts.lazyLoadMembers) { - debuglog("Checking server lazy load support..."); - const supported = await client.doesServerSupportLazyLoading(); - if (supported) { - debuglog("Enabling lazy load on sync filter..."); - if (!this.opts.filter) { - this.opts.filter = buildDefaultFilter(); - } - this.opts.filter.setLazyLoadMembers(true); - } else { - debuglog("LL: lazy loading requested but not supported " + - "by server, so disabling"); - this.opts.lazyLoadMembers = false; - } - } - // need to vape the store when enabling LL and wasn't enabled before - debuglog("Checking whether lazy loading has changed in store..."); - const shouldClear = await this.wasLazyLoadingToggled(this.opts.lazyLoadMembers); - if (shouldClear) { - this.storeIsInvalid = true; - const reason = InvalidStoreError.TOGGLED_LAZY_LOADING; - const error = new InvalidStoreError(reason, !!this.opts.lazyLoadMembers); - this.updateSyncState(SyncState.Error, { error }); - // bail out of the sync loop now: the app needs to respond to this error. - // we leave the state as 'ERROR' which isn't great since this normally means - // we're retrying. The client must be stopped before clearing the stores anyway - // so the app should stop the client, clear the store and start it again. - logger.warn("InvalidStoreError: store is not usable: stopping sync."); - return; - } - if (this.opts.lazyLoadMembers && this.opts.crypto) { - this.opts.crypto.enableLazyLoading(); - } - try { - debuglog("Storing client options..."); - await this.client.storeClientOptions(); - debuglog("Stored client options"); - } catch (err) { - logger.error("Storing client options failed", err); - throw err; - } - - getFilter(); // Now get the filter and start syncing - }; - - const getFilter = async () => { - debuglog("Getting filter..."); - let filter; - if (this.opts.filter) { - filter = this.opts.filter; - } else { - filter = buildDefaultFilter(); - } - - let filterId; - try { - filterId = await client.getOrCreateFilter(getFilterName(client.credentials.userId), filter); - } catch (err) { - logger.error("Getting filter failed", err); - if (this.shouldAbortSync(err)) return; - // wait for saved sync to complete before doing anything else, - // otherwise the sync state will end up being incorrect - debuglog("Waiting for saved sync before retrying filter..."); - await this.recoverFromSyncStartupError(savedSyncPromise, err); - getFilter(); - return; - } - // reset the notifications timeline to prepare it to paginate from - // the current point in time. - // The right solution would be to tie /sync pagination tokens into - // /notifications API somehow. - client.resetNotifTimelineSet(); - - if (this.currentSyncRequest === null) { - // Send this first sync request here so we can then wait for the saved - // sync data to finish processing before we process the results of this one. + if (savedSyncToken) { debuglog("Sending first sync request..."); - this.currentSyncRequest = this.doSyncRequest({ filterId }, savedSyncToken); + } else { + debuglog("Sending initial sync request..."); + const initialFilter = this.buildDefaultFilter(); + initialFilter.setDefinition(filter.getDefinition()); + initialFilter.setTimelineLimit(this.opts.initialSyncLimit); + // Use an inline filter, no point uploading it for a single usage + firstSyncFilter = JSON.stringify(initialFilter.getDefinition()); } - // Now wait for the saved sync to finish... - debuglog("Waiting for saved sync before starting sync processing..."); - await savedSyncPromise; - this.doSync({ filterId }); - }; - - if (client.isGuest()) { - // no push rules for guests, no access to POST filter for guests. - this.doSync({}); - } else { - // Pull the saved sync token out first, before the worker starts sending - // all the sync data which could take a while. This will let us send our - // first incremental sync request before we've processed our saved data. - debuglog("Getting saved sync token..."); - savedSyncPromise = client.store.getSavedSyncToken().then((tok) => { - debuglog("Got saved sync token"); - savedSyncToken = tok; - debuglog("Getting saved sync..."); - return client.store.getSavedSync(); - }).then((savedSync) => { - debuglog(`Got reply from saved sync, exists? ${!!savedSync}`); - if (savedSync) { - return this.syncFromCache(savedSync); - } - }).catch(err => { - logger.error("Getting saved sync failed", err); - }); - // Now start the first incremental sync request: this can also - // take a while so if we set it going now, we can wait for it - // to finish while we process our saved sync data. - getPushRules(); + // Send this first sync request here so we can then wait for the saved + // sync data to finish processing before we process the results of this one. + this.currentSyncRequest = this.doSyncRequest({ filter: firstSyncFilter }, savedSyncToken); } + + // Now wait for the saved sync to finish... + debuglog("Waiting for saved sync before starting sync processing..."); + await this.savedSyncPromise; + // process the first sync request and continue syncing with the normal filterId + return this.doSync({ filter: filterId }); } /** @@ -719,9 +730,7 @@ export class SyncApi { // global.window AND global.window.removeEventListener. // Some platforms (e.g. React Native) register global.window, // but do not have global.window.removeEventListener. - if (global.window && global.window.removeEventListener) { - global.window.removeEventListener("online", this.onOnline, false); - } + global.window?.removeEventListener?.("online", this.onOnline, false); this.running = false; this.currentSyncRequest?.abort(); if (this.keepAliveTimer) { @@ -756,8 +765,7 @@ export class SyncApi { this.client.store.setSyncToken(nextSyncToken); // No previous sync, set old token to null - const syncEventData = { - oldSyncToken: null, + const syncEventData: ISyncStateData = { nextSyncToken, catchingUp: false, fromCache: true, @@ -792,7 +800,91 @@ export class SyncApi { * @param {boolean} syncOptions.hasSyncedBefore */ private async doSync(syncOptions: ISyncOptions): Promise { - const client = this.client; + while (this.running) { + const syncToken = this.client.store.getSyncToken(); + + let data: ISyncResponse; + try { + //debuglog('Starting sync since=' + syncToken); + if (this.currentSyncRequest === null) { + this.currentSyncRequest = this.doSyncRequest(syncOptions, syncToken); + } + data = await this.currentSyncRequest; + } catch (e) { + const abort = await this.onSyncError(e); + if (abort) return; + continue; + } finally { + this.currentSyncRequest = null; + } + + //debuglog('Completed sync, next_batch=' + data.next_batch); + + // set the sync token NOW *before* processing the events. We do this so + // if something barfs on an event we can skip it rather than constantly + // polling with the same token. + this.client.store.setSyncToken(data.next_batch); + + // Reset after a successful sync + this.failedSyncCount = 0; + + await this.client.store.setSyncData(data); + + const syncEventData = { + oldSyncToken: syncToken, + nextSyncToken: data.next_batch, + catchingUp: this.catchingUp, + }; + + if (this.opts.crypto) { + // tell the crypto module we're about to process a sync + // response + await this.opts.crypto.onSyncWillProcess(syncEventData); + } + + try { + await this.processSyncResponse(syncEventData, data); + } catch (e) { + // log the exception with stack if we have it, else fall back + // to the plain description + logger.error("Caught /sync error", e); + + // Emit the exception for client handling + this.client.emit(ClientEvent.SyncUnexpectedError, e); + } + + // update this as it may have changed + syncEventData.catchingUp = this.catchingUp; + + // emit synced events + if (!syncOptions.hasSyncedBefore) { + this.updateSyncState(SyncState.Prepared, syncEventData); + syncOptions.hasSyncedBefore = true; + } + + // tell the crypto module to do its processing. It may block (to do a + // /keys/changes request). + if (this.opts.crypto) { + await this.opts.crypto.onSyncCompleted(syncEventData); + } + + // keep emitting SYNCING -> SYNCING for clients who want to do bulk updates + this.updateSyncState(SyncState.Syncing, syncEventData); + + if (this.client.store.wantsSave()) { + // We always save the device list (if it's dirty) before saving the sync data: + // this means we know the saved device list data is at least as fresh as the + // stored sync data which means we don't have to worry that we may have missed + // device changes. We can also skip the delay since we're not calling this very + // frequently (and we don't really want to delay the sync for it). + if (this.opts.crypto) { + await this.opts.crypto.saveDeviceList(0); + } + + // tell databases that everything is now in a consistent state and can be saved. + this.client.store.save(); + } + } if (!this.running) { debuglog("Sync no longer running: exiting."); @@ -801,94 +893,7 @@ export class SyncApi { this.connectionReturnedDefer = null; } this.updateSyncState(SyncState.Stopped); - return; } - - const syncToken = client.store.getSyncToken(); - - let data; - try { - //debuglog('Starting sync since=' + syncToken); - if (this.currentSyncRequest === null) { - this.currentSyncRequest = this.doSyncRequest(syncOptions, syncToken); - } - data = await this.currentSyncRequest; - } catch (e) { - this.onSyncError(e, syncOptions); - return; - } finally { - this.currentSyncRequest = null; - } - - //debuglog('Completed sync, next_batch=' + data.next_batch); - - // set the sync token NOW *before* processing the events. We do this so - // if something barfs on an event we can skip it rather than constantly - // polling with the same token. - client.store.setSyncToken(data.next_batch); - - // Reset after a successful sync - this.failedSyncCount = 0; - - await client.store.setSyncData(data); - - const syncEventData = { - oldSyncToken: syncToken, - nextSyncToken: data.next_batch, - catchingUp: this.catchingUp, - }; - - if (this.opts.crypto) { - // tell the crypto module we're about to process a sync - // response - await this.opts.crypto.onSyncWillProcess(syncEventData); - } - - try { - await this.processSyncResponse(syncEventData, data); - } catch (e) { - // log the exception with stack if we have it, else fall back - // to the plain description - logger.error("Caught /sync error", e); - - // Emit the exception for client handling - this.client.emit(ClientEvent.SyncUnexpectedError, e); - } - - // update this as it may have changed - syncEventData.catchingUp = this.catchingUp; - - // emit synced events - if (!syncOptions.hasSyncedBefore) { - this.updateSyncState(SyncState.Prepared, syncEventData); - syncOptions.hasSyncedBefore = true; - } - - // tell the crypto module to do its processing. It may block (to do a - // /keys/changes request). - if (this.opts.crypto) { - await this.opts.crypto.onSyncCompleted(syncEventData); - } - - // keep emitting SYNCING -> SYNCING for clients who want to do bulk updates - this.updateSyncState(SyncState.Syncing, syncEventData); - - if (client.store.wantsSave()) { - // We always save the device list (if it's dirty) before saving the sync data: - // this means we know the saved device list data is at least as fresh as the - // stored sync data which means we don't have to worry that we may have missed - // device changes. We can also skip the delay since we're not calling this very - // frequently (and we don't really want to delay the sync for it). - if (this.opts.crypto) { - await this.opts.crypto.saveDeviceList(0); - } - - // tell databases that everything is now in a consistent state and can be saved. - client.store.save(); - } - - // Begin next sync - this.doSync(syncOptions); } private doSyncRequest(syncOptions: ISyncOptions, syncToken: string): IAbortablePromise { @@ -902,7 +907,7 @@ export class SyncApi { private getSyncParams(syncOptions: ISyncOptions, syncToken: string): ISyncParams { let pollTimeout = this.opts.pollTimeout; - if (this.getSyncState() !== 'SYNCING' || this.catchingUp) { + if (this.getSyncState() !== SyncState.Syncing || this.catchingUp) { // unless we are happily syncing already, we want the server to return // as quickly as possible, even if there are no events queued. This // serves two purposes: @@ -918,13 +923,13 @@ export class SyncApi { pollTimeout = 0; } - let filterId = syncOptions.filterId; - if (this.client.isGuest() && !filterId) { - filterId = this.getGuestFilter(); + let filter = syncOptions.filter; + if (this.client.isGuest() && !filter) { + filter = this.getGuestFilter(); } const qps: ISyncParams = { - filter: filterId, + filter, timeout: pollTimeout, }; @@ -941,7 +946,7 @@ export class SyncApi { qps._cacheBuster = Date.now(); } - if (this.getSyncState() == 'ERROR' || this.getSyncState() == 'RECONNECTING') { + if ([SyncState.Reconnecting, SyncState.Error].includes(this.getSyncState())) { // we think the connection is dead. If it comes back up, we won't know // about it till /sync returns. If the timeout= is high, this could // be a long time. Set it to 0 when doing retries so we don't have to wait @@ -952,7 +957,7 @@ export class SyncApi { return qps; } - private onSyncError(err: MatrixError, syncOptions: ISyncOptions): void { + private async onSyncError(err: MatrixError): Promise { if (!this.running) { debuglog("Sync no longer running: exiting"); if (this.connectionReturnedDefer) { @@ -960,14 +965,13 @@ export class SyncApi { this.connectionReturnedDefer = null; } this.updateSyncState(SyncState.Stopped); - return; + return true; // abort } logger.error("/sync error %s", err); - logger.error(err); if (this.shouldAbortSync(err)) { - return; + return true; // abort } this.failedSyncCount++; @@ -981,20 +985,7 @@ export class SyncApi { // erroneous. We set the state to 'reconnecting' // instead, so that clients can observe this state // if they wish. - this.startKeepAlives().then((connDidFail) => { - // Only emit CATCHUP if we detected a connectivity error: if we didn't, - // it's quite likely the sync will fail again for the same reason and we - // want to stay in ERROR rather than keep flip-flopping between ERROR - // and CATCHUP. - if (connDidFail && this.getSyncState() === SyncState.Error) { - this.updateSyncState(SyncState.Catchup, { - oldSyncToken: null, - nextSyncToken: null, - catchingUp: true, - }); - } - this.doSync(syncOptions); - }); + const keepAlivePromise = this.startKeepAlives(); this.currentSyncRequest = null; // Transition from RECONNECTING to ERROR after a given number of failed syncs @@ -1003,6 +994,19 @@ export class SyncApi { SyncState.Error : SyncState.Reconnecting, { error: err }, ); + + const connDidFail = await keepAlivePromise; + + // Only emit CATCHUP if we detected a connectivity error: if we didn't, + // it's quite likely the sync will fail again for the same reason and we + // want to stay in ERROR rather than keep flip-flopping between ERROR + // and CATCHUP. + if (connDidFail && this.getSyncState() === SyncState.Error) { + this.updateSyncState(SyncState.Catchup, { + catchingUp: true, + }); + } + return false; } /** @@ -1061,7 +1065,7 @@ export class SyncApi { // - The isBrandNewRoom boilerplate is boilerplatey. // handle presence events (User objects) - if (data.presence && Array.isArray(data.presence.events)) { + if (Array.isArray(data.presence?.events)) { data.presence.events.map(client.getEventMapper()).forEach( function(presenceEvent) { let user = client.store.getUser(presenceEvent.getSender()); @@ -1077,7 +1081,7 @@ export class SyncApi { } // handle non-room account_data - if (data.account_data && Array.isArray(data.account_data.events)) { + if (Array.isArray(data.account_data?.events)) { const events = data.account_data.events.map(client.getEventMapper()); const prevEventsMap = events.reduce((m, c) => { m[c.getId()] = client.store.getAccountData(c.getType()); @@ -1218,8 +1222,7 @@ export class SyncApi { // bother setting it here. We trust our calculations better than the // server's for this case, and therefore will assume that our non-zero // count is accurate. - if (!encrypted - || (encrypted && room.getUnreadNotificationCount(NotificationCountType.Highlight) <= 0)) { + if (!encrypted || room.getUnreadNotificationCount(NotificationCountType.Highlight) <= 0) { room.setUnreadNotificationCount( NotificationCountType.Highlight, joinObj.unread_notifications.highlight_count, @@ -1232,8 +1235,7 @@ export class SyncApi { if (joinObj.isBrandNewRoom) { // set the back-pagination token. Do this *before* adding any // events so that clients can start back-paginating. - room.getLiveTimeline().setPaginationToken( - joinObj.timeline.prev_batch, EventTimeline.BACKWARDS); + room.getLiveTimeline().setPaginationToken(joinObj.timeline.prev_batch, EventTimeline.BACKWARDS); } else if (joinObj.timeline.limited) { let limited = true;