You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-08-06 12:02:40 +03:00
Correctly handle limited sync responses by resetting the thread timeline (#3056)
* Reset thread livetimelines when desynced * Implement workaround for https://github.com/matrix-org/synapse/issues/14830
This commit is contained in:
committed by
GitHub
parent
7b10fa367d
commit
a34d06c7c2
@@ -20,7 +20,7 @@ import { Thread, THREAD_RELATION_TYPE, ThreadEvent } from "../../../src/models/t
|
|||||||
import { mkThread } from "../../test-utils/thread";
|
import { mkThread } from "../../test-utils/thread";
|
||||||
import { TestClient } from "../../TestClient";
|
import { TestClient } from "../../TestClient";
|
||||||
import { emitPromise, mkMessage, mock } from "../../test-utils/test-utils";
|
import { emitPromise, mkMessage, mock } from "../../test-utils/test-utils";
|
||||||
import { EventStatus, MatrixEvent } from "../../../src";
|
import { Direction, EventStatus, MatrixEvent } from "../../../src";
|
||||||
import { ReceiptType } from "../../../src/@types/read_receipts";
|
import { ReceiptType } from "../../../src/@types/read_receipts";
|
||||||
import { getMockClientWithEventEmitter, mockClientMethodsUser } from "../../test-utils/client";
|
import { getMockClientWithEventEmitter, mockClientMethodsUser } from "../../test-utils/client";
|
||||||
import { ReEmitter } from "../../../src/ReEmitter";
|
import { ReEmitter } from "../../../src/ReEmitter";
|
||||||
@@ -283,4 +283,143 @@ describe("Thread", () => {
|
|||||||
expect(thread2.getEventReadUpTo(myUserId)).toBe(null);
|
expect(thread2.getEventReadUpTo(myUserId)).toBe(null);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("resetLiveTimeline", () => {
|
||||||
|
// ResetLiveTimeline is used when we have missing messages between the current live timeline's end and newly
|
||||||
|
// received messages. In that case, we want to replace the existing live timeline. To ensure pagination
|
||||||
|
// continues working correctly, new pagination tokens need to be set on both the old live timeline (which is
|
||||||
|
// now a regular timeline) and the new live timeline.
|
||||||
|
it("replaces the live timeline and correctly sets pagination tokens", async () => {
|
||||||
|
const myUserId = "@bob:example.org";
|
||||||
|
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
|
||||||
|
timelineSupport: false,
|
||||||
|
});
|
||||||
|
const client = testClient.client;
|
||||||
|
const room = new Room("123", client, myUserId, {
|
||||||
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
||||||
|
});
|
||||||
|
|
||||||
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
||||||
|
|
||||||
|
const { thread } = mkThread({
|
||||||
|
room,
|
||||||
|
client,
|
||||||
|
authorId: myUserId,
|
||||||
|
participantUserIds: ["@alice:example.org"],
|
||||||
|
length: 3,
|
||||||
|
});
|
||||||
|
await emitPromise(thread, ThreadEvent.Update);
|
||||||
|
expect(thread.length).toBe(2);
|
||||||
|
|
||||||
|
jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) =>
|
||||||
|
Promise.resolve({
|
||||||
|
chunk: [],
|
||||||
|
start: `${token}-new`,
|
||||||
|
end: `${token}-new`,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
function timelines(): [string | null, string | null][] {
|
||||||
|
return thread.timelineSet
|
||||||
|
.getTimelines()
|
||||||
|
.map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(timelines()).toEqual([[null, null]]);
|
||||||
|
const promise = thread.resetLiveTimeline("b1", "f1");
|
||||||
|
expect(timelines()).toEqual([
|
||||||
|
[null, "f1"],
|
||||||
|
["b1", null],
|
||||||
|
]);
|
||||||
|
await promise;
|
||||||
|
expect(timelines()).toEqual([
|
||||||
|
[null, "f1-new"],
|
||||||
|
["b1-new", null],
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
// As the pagination tokens cannot be used right now, resetLiveTimeline needs to replace them before they can
|
||||||
|
// be used. But if in the future the bug in synapse is fixed, and they can actually be used, we can get into a
|
||||||
|
// state where the client has paginated (and changed the tokens) while resetLiveTimeline tries to set the
|
||||||
|
// corrected tokens. To prevent such a race condition, we make sure that resetLiveTimeline respects any
|
||||||
|
// changes done to the pagination tokens.
|
||||||
|
it("replaces the live timeline but does not replace changed pagination tokens", async () => {
|
||||||
|
const myUserId = "@bob:example.org";
|
||||||
|
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
|
||||||
|
timelineSupport: false,
|
||||||
|
});
|
||||||
|
const client = testClient.client;
|
||||||
|
const room = new Room("123", client, myUserId, {
|
||||||
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
||||||
|
});
|
||||||
|
|
||||||
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
||||||
|
|
||||||
|
const { thread } = mkThread({
|
||||||
|
room,
|
||||||
|
client,
|
||||||
|
authorId: myUserId,
|
||||||
|
participantUserIds: ["@alice:example.org"],
|
||||||
|
length: 3,
|
||||||
|
});
|
||||||
|
await emitPromise(thread, ThreadEvent.Update);
|
||||||
|
expect(thread.length).toBe(2);
|
||||||
|
|
||||||
|
jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) =>
|
||||||
|
Promise.resolve({
|
||||||
|
chunk: [],
|
||||||
|
start: `${token}-new`,
|
||||||
|
end: `${token}-new`,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
function timelines(): [string | null, string | null][] {
|
||||||
|
return thread.timelineSet
|
||||||
|
.getTimelines()
|
||||||
|
.map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(timelines()).toEqual([[null, null]]);
|
||||||
|
const promise = thread.resetLiveTimeline("b1", "f1");
|
||||||
|
expect(timelines()).toEqual([
|
||||||
|
[null, "f1"],
|
||||||
|
["b1", null],
|
||||||
|
]);
|
||||||
|
thread.timelineSet.getTimelines()[0].setPaginationToken("f2", Direction.Forward);
|
||||||
|
thread.timelineSet.getTimelines()[1].setPaginationToken("b2", Direction.Backward);
|
||||||
|
await promise;
|
||||||
|
expect(timelines()).toEqual([
|
||||||
|
[null, "f2"],
|
||||||
|
["b2", null],
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("is correctly called by the room", async () => {
|
||||||
|
const myUserId = "@bob:example.org";
|
||||||
|
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
|
||||||
|
timelineSupport: false,
|
||||||
|
});
|
||||||
|
const client = testClient.client;
|
||||||
|
const room = new Room("123", client, myUserId, {
|
||||||
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
||||||
|
});
|
||||||
|
|
||||||
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
||||||
|
|
||||||
|
const { thread } = mkThread({
|
||||||
|
room,
|
||||||
|
client,
|
||||||
|
authorId: myUserId,
|
||||||
|
participantUserIds: ["@alice:example.org"],
|
||||||
|
length: 3,
|
||||||
|
});
|
||||||
|
await emitPromise(thread, ThreadEvent.Update);
|
||||||
|
expect(thread.length).toBe(2);
|
||||||
|
const mock = jest.spyOn(thread, "resetLiveTimeline");
|
||||||
|
mock.mockReturnValue(Promise.resolve());
|
||||||
|
|
||||||
|
room.resetLiveTimeline("b1", "f1");
|
||||||
|
expect(mock).toHaveBeenCalledWith("b1", "f1");
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
@@ -1114,6 +1114,9 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
|
|||||||
for (const timelineSet of this.timelineSets) {
|
for (const timelineSet of this.timelineSets) {
|
||||||
timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined);
|
timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined);
|
||||||
}
|
}
|
||||||
|
for (const thread of this.threads.values()) {
|
||||||
|
thread.resetLiveTimeline(backPaginationToken, forwardPaginationToken);
|
||||||
|
}
|
||||||
|
|
||||||
this.fixUpLegacyTimelineFields();
|
this.fixUpLegacyTimelineFields();
|
||||||
}
|
}
|
||||||
@@ -1223,7 +1226,7 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
|
|||||||
const event = this.findEventById(eventId);
|
const event = this.findEventById(eventId);
|
||||||
const thread = this.findThreadForEvent(event);
|
const thread = this.findThreadForEvent(event);
|
||||||
if (thread) {
|
if (thread) {
|
||||||
return thread.timelineSet.getLiveTimeline();
|
return thread.timelineSet.getTimelineForEvent(eventId);
|
||||||
} else {
|
} else {
|
||||||
return this.getUnfilteredTimelineSet().getTimelineForEvent(eventId);
|
return this.getUnfilteredTimelineSet().getTimelineForEvent(eventId);
|
||||||
}
|
}
|
||||||
|
@@ -256,7 +256,7 @@ export class Thread extends ReadReceipt<EmittedEvents, EventHandlerMap> {
|
|||||||
this.setEventMetadata(event);
|
this.setEventMetadata(event);
|
||||||
|
|
||||||
const lastReply = this.lastReply();
|
const lastReply = this.lastReply();
|
||||||
const isNewestReply = !lastReply || event.localTimestamp > lastReply!.localTimestamp;
|
const isNewestReply = !lastReply || event.localTimestamp >= lastReply!.localTimestamp;
|
||||||
|
|
||||||
// Add all incoming events to the thread's timeline set when there's no server support
|
// Add all incoming events to the thread's timeline set when there's no server support
|
||||||
if (!Thread.hasServerSideSupport) {
|
if (!Thread.hasServerSideSupport) {
|
||||||
@@ -358,6 +358,63 @@ export class Thread extends ReadReceipt<EmittedEvents, EventHandlerMap> {
|
|||||||
this.pendingReplyCount = pendingEvents.length;
|
this.pendingReplyCount = pendingEvents.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the live timeline of all timelineSets, and start new ones.
|
||||||
|
*
|
||||||
|
* <p>This is used when /sync returns a 'limited' timeline. 'Limited' means that there's a gap between the messages
|
||||||
|
* /sync returned, and the last known message in our timeline. In such a case, our live timeline isn't live anymore
|
||||||
|
* and has to be replaced by a new one. To make sure we can continue paginating our timelines correctly, we have to
|
||||||
|
* set new pagination tokens on the old and the new timeline.
|
||||||
|
*
|
||||||
|
* @param backPaginationToken - token for back-paginating the new timeline
|
||||||
|
* @param forwardPaginationToken - token for forward-paginating the old live timeline,
|
||||||
|
* if absent or null, all timelines are reset, removing old ones (including the previous live
|
||||||
|
* timeline which would otherwise be unable to paginate forwards without this token).
|
||||||
|
* Removing just the old live timeline whilst preserving previous ones is not supported.
|
||||||
|
*/
|
||||||
|
public async resetLiveTimeline(
|
||||||
|
backPaginationToken?: string | null,
|
||||||
|
forwardPaginationToken?: string | null,
|
||||||
|
): Promise<void> {
|
||||||
|
const oldLive = this.liveTimeline;
|
||||||
|
this.timelineSet.resetLiveTimeline(backPaginationToken ?? undefined, forwardPaginationToken ?? undefined);
|
||||||
|
const newLive = this.liveTimeline;
|
||||||
|
|
||||||
|
// FIXME: Remove the following as soon as https://github.com/matrix-org/synapse/issues/14830 is resolved.
|
||||||
|
//
|
||||||
|
// The pagination API for thread timelines currently can't handle the type of pagination tokens returned by sync
|
||||||
|
//
|
||||||
|
// To make this work anyway, we'll have to transform them into one of the types that the API can handle.
|
||||||
|
// One option is passing the tokens to /messages, which can handle sync tokens, and returns the right format.
|
||||||
|
// /messages does not return new tokens on requests with a limit of 0.
|
||||||
|
// This means our timelines might overlap a slight bit, but that's not an issue, as we deduplicate messages
|
||||||
|
// anyway.
|
||||||
|
|
||||||
|
let newBackward: string | undefined;
|
||||||
|
let oldForward: string | undefined;
|
||||||
|
if (backPaginationToken) {
|
||||||
|
const res = await this.client.createMessagesRequest(this.roomId, backPaginationToken, 1, Direction.Forward);
|
||||||
|
newBackward = res.end;
|
||||||
|
}
|
||||||
|
if (forwardPaginationToken) {
|
||||||
|
const res = await this.client.createMessagesRequest(
|
||||||
|
this.roomId,
|
||||||
|
forwardPaginationToken,
|
||||||
|
1,
|
||||||
|
Direction.Backward,
|
||||||
|
);
|
||||||
|
oldForward = res.start;
|
||||||
|
}
|
||||||
|
// Only replace the token if we don't have paginated away from this position already. This situation doesn't
|
||||||
|
// occur today, but if the above issue is resolved, we'd have to go down this path.
|
||||||
|
if (forwardPaginationToken && oldLive.getPaginationToken(Direction.Forward) === forwardPaginationToken) {
|
||||||
|
oldLive.setPaginationToken(oldForward ?? null, Direction.Forward);
|
||||||
|
}
|
||||||
|
if (backPaginationToken && newLive.getPaginationToken(Direction.Backward) === backPaginationToken) {
|
||||||
|
newLive.setPaginationToken(newBackward ?? null, Direction.Backward);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private async updateThreadMetadata(): Promise<void> {
|
private async updateThreadMetadata(): Promise<void> {
|
||||||
this.updatePendingReplyCount();
|
this.updatePendingReplyCount();
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user