You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-11-23 17:02:25 +03:00
* test(thread): add regression tests for edit-race; ensure reaction aggregation idempotence - Edit race: add failing test when `Replace` aggregated pre-init - Reaction: ensure pre-init aggregation and dedup on replay - Strengthen assertions for ordering and content * fix(thread): apply edits after init; keep reactions pre-init; remove redundant aggregation - Defer Replace aggregation until thread initialised and event is in timeline - Aggregate Annotation pre-init to preserve reaction summaries - Rely on EventTimelineSet to aggregate post-insert - Fixes: element-hq/element-web#30617 * style: run prettier; docs: clarify reaction pre-init comment about counts
1041 lines
45 KiB
TypeScript
1041 lines
45 KiB
TypeScript
/*
|
|
Copyright 2022 - 2023 The Matrix.org Foundation C.I.C.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
import { mocked } from "jest-mock";
|
|
|
|
import { MatrixClient, PendingEventOrdering } from "../../../src/client";
|
|
import { Room, RoomEvent } from "../../../src/models/room";
|
|
import { FeatureSupport, Thread, THREAD_RELATION_TYPE, ThreadEvent } from "../../../src/models/thread";
|
|
import { makeThreadEvent, mkThread, populateThread } from "../../test-utils/thread";
|
|
import { TestClient } from "../../TestClient";
|
|
import { emitPromise, mkEdit, mkMessage, mkReaction, mock } from "../../test-utils/test-utils";
|
|
import { Direction, EventStatus, EventType, MatrixEvent, RelationType } from "../../../src";
|
|
import { ReceiptType } from "../../../src/@types/read_receipts";
|
|
import { getMockClientWithEventEmitter, mockClientMethodsUser } from "../../test-utils/client";
|
|
import { ReEmitter } from "../../../src/ReEmitter";
|
|
import { Feature, ServerSupport } from "../../../src/feature";
|
|
import { eventMapperFor } from "../../../src/event-mapper";
|
|
|
|
describe("Thread", () => {
|
|
describe("constructor", () => {
|
|
it("should explode for element-web#22141 logging", () => {
|
|
// Logging/debugging for https://github.com/vector-im/element-web/issues/22141
|
|
expect(() => {
|
|
new Thread("$event", undefined, {} as any); // deliberate cast to test error case
|
|
}).toThrow("element-web#22141: A thread requires a room in order to function");
|
|
});
|
|
});
|
|
|
|
it("includes pending events in replyCount", async () => {
|
|
const myUserId = "@bob:example.org";
|
|
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, { timelineSupport: false });
|
|
const client = testClient.client;
|
|
client.supportsThreads = jest.fn().mockReturnValue(true);
|
|
const room = new Room("123", client, myUserId, {
|
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
|
});
|
|
|
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
|
|
|
const { thread } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: myUserId,
|
|
participantUserIds: ["@alice:example.org"],
|
|
length: 3,
|
|
});
|
|
await emitPromise(thread, ThreadEvent.Update);
|
|
expect(thread.length).toBe(2);
|
|
|
|
const event = mkMessage({
|
|
room: room.roomId,
|
|
user: myUserId,
|
|
msg: "thread reply",
|
|
relatesTo: {
|
|
rel_type: THREAD_RELATION_TYPE.name,
|
|
event_id: thread.id,
|
|
},
|
|
event: true,
|
|
});
|
|
await thread.processEvent(event);
|
|
event.setStatus(EventStatus.SENDING);
|
|
room.addPendingEvent(event, "txn01");
|
|
|
|
await emitPromise(thread, ThreadEvent.Update);
|
|
expect(thread.length).toBe(3);
|
|
});
|
|
|
|
describe("hasUserReadEvent", () => {
|
|
let myUserId: string;
|
|
let client: MatrixClient;
|
|
let room: Room;
|
|
|
|
beforeEach(() => {
|
|
client = getMockClientWithEventEmitter({
|
|
...mockClientMethodsUser(),
|
|
isInitialSyncComplete: jest.fn().mockReturnValue(false),
|
|
getRoom: jest.fn().mockImplementation(() => room),
|
|
decryptEventIfNeeded: jest.fn().mockResolvedValue(void 0),
|
|
supportsThreads: jest.fn().mockReturnValue(true),
|
|
});
|
|
client.reEmitter = mock(ReEmitter, "ReEmitter");
|
|
client.canSupport = new Map();
|
|
Object.keys(Feature).forEach((feature) => {
|
|
client.canSupport.set(feature as Feature, ServerSupport.Stable);
|
|
});
|
|
|
|
myUserId = client.getUserId()!;
|
|
|
|
room = new Room("123", client, myUserId);
|
|
|
|
const receipt = new MatrixEvent({
|
|
type: "m.receipt",
|
|
room_id: "!foo:bar",
|
|
content: {
|
|
// first threaded receipt
|
|
"$event0:localhost": {
|
|
[ReceiptType.Read]: {
|
|
[client.getUserId()!]: { ts: 100, thread_id: "$threadId:localhost" },
|
|
},
|
|
},
|
|
// last unthreaded receipt
|
|
"$event1:localhost": {
|
|
[ReceiptType.Read]: {
|
|
[client.getUserId()!]: { ts: 200 },
|
|
["@alice:example.org"]: { ts: 200 },
|
|
},
|
|
},
|
|
// last threaded receipt
|
|
"$event2:localhost": {
|
|
[ReceiptType.Read]: {
|
|
[client.getUserId()!]: { ts: 300, thread_id: "$threadId" },
|
|
},
|
|
},
|
|
},
|
|
});
|
|
room.addReceipt(receipt);
|
|
|
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
|
});
|
|
|
|
afterAll(() => {
|
|
jest.resetAllMocks();
|
|
});
|
|
|
|
it("considers own events with no RR as read", () => {
|
|
const { thread, events } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: myUserId,
|
|
participantUserIds: [myUserId],
|
|
length: 2,
|
|
});
|
|
|
|
// The event is automatically considered read as the current user is the sender
|
|
expect(thread.hasUserReadEvent(myUserId, events.at(-1)!.getId() ?? "")).toBeTruthy();
|
|
});
|
|
|
|
it("considers other events with no RR as unread", () => {
|
|
// Given a long thread exists
|
|
const { thread, events } = populateThread({
|
|
room,
|
|
client,
|
|
authorId: "@other:foo.com",
|
|
participantUserIds: ["@other:foo.com"],
|
|
length: 25,
|
|
ts: 190,
|
|
});
|
|
|
|
const event1 = events.at(1)!;
|
|
const event2 = events.at(2)!;
|
|
const event24 = events.at(24)!;
|
|
|
|
// And we have read the second message in it with an unthreaded receipt
|
|
const receipt = new MatrixEvent({
|
|
type: "m.receipt",
|
|
room_id: room.roomId,
|
|
content: {
|
|
// unthreaded receipt for the second message in the thread
|
|
[event2.getId()!]: {
|
|
[ReceiptType.Read]: {
|
|
[myUserId]: { ts: 200 },
|
|
},
|
|
},
|
|
},
|
|
});
|
|
room.addReceipt(receipt);
|
|
|
|
// Then we have read the first message in the thread, and not the last
|
|
expect(thread.hasUserReadEvent(myUserId, event1.getId()!)).toBe(true);
|
|
expect(thread.hasUserReadEvent(myUserId, event24.getId()!)).toBe(false);
|
|
});
|
|
|
|
it("considers event as read if there's a more recent unthreaded receipt", () => {
|
|
const { thread, events } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: myUserId,
|
|
participantUserIds: ["@alice:example.org"],
|
|
length: 2,
|
|
ts: 150, // before the latest unthreaded receipt
|
|
});
|
|
expect(thread.hasUserReadEvent(client.getUserId()!, events.at(-1)!.getId() ?? "")).toBe(true);
|
|
});
|
|
|
|
it("considers event as unread if there's no more recent unthreaded receipt", () => {
|
|
const { thread, events } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: myUserId,
|
|
participantUserIds: ["@alice:example.org"],
|
|
length: 2,
|
|
ts: 1000,
|
|
});
|
|
expect(thread.hasUserReadEvent(client.getUserId()!, events.at(-1)!.getId() ?? "")).toBe(false);
|
|
});
|
|
});
|
|
|
|
describe("getEventReadUpTo", () => {
|
|
let myUserId: string;
|
|
let client: MatrixClient;
|
|
let room: Room;
|
|
|
|
beforeEach(() => {
|
|
client = getMockClientWithEventEmitter({
|
|
...mockClientMethodsUser(),
|
|
isInitialSyncComplete: jest.fn().mockReturnValue(false),
|
|
getRoom: jest.fn().mockImplementation(() => room),
|
|
decryptEventIfNeeded: jest.fn().mockResolvedValue(void 0),
|
|
supportsThreads: jest.fn().mockReturnValue(true),
|
|
});
|
|
client.reEmitter = mock(ReEmitter, "ReEmitter");
|
|
client.canSupport = new Map();
|
|
Object.keys(Feature).forEach((feature) => {
|
|
client.canSupport.set(feature as Feature, ServerSupport.Stable);
|
|
});
|
|
|
|
myUserId = client.getUserId()!;
|
|
|
|
room = new Room("123", client, myUserId);
|
|
|
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
|
});
|
|
|
|
afterAll(() => {
|
|
jest.resetAllMocks();
|
|
});
|
|
|
|
it("uses unthreaded receipt to figure out read up to", () => {
|
|
const receipt = new MatrixEvent({
|
|
type: "m.receipt",
|
|
room_id: "!foo:bar",
|
|
content: {
|
|
// last unthreaded receipt
|
|
"$event1:localhost": {
|
|
[ReceiptType.Read]: {
|
|
["@alice:example.org"]: { ts: 200 },
|
|
},
|
|
},
|
|
},
|
|
});
|
|
room.addReceipt(receipt);
|
|
|
|
const { thread, events } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: myUserId,
|
|
participantUserIds: [myUserId],
|
|
length: 25,
|
|
ts: 190,
|
|
});
|
|
|
|
// The 10th event has been read, as alice's last unthreaded receipt is at ts 200
|
|
// and `mkThread` increment every thread response by 1ms.
|
|
expect(thread.getEventReadUpTo("@alice:example.org")).toBe(events.at(9)!.getId());
|
|
});
|
|
|
|
it("considers thread created before the first threaded receipt to be read", () => {
|
|
const receipt = new MatrixEvent({
|
|
type: "m.receipt",
|
|
room_id: "!foo:bar",
|
|
content: {
|
|
// last unthreaded receipt
|
|
"$event1:localhost": {
|
|
[ReceiptType.Read]: {
|
|
[myUserId]: { ts: 200, thread_id: "$threadId" },
|
|
},
|
|
},
|
|
},
|
|
});
|
|
room.addReceipt(receipt);
|
|
|
|
const { thread, events } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: "@alice:example.org",
|
|
participantUserIds: ["@alice:example.org"],
|
|
length: 2,
|
|
ts: 10,
|
|
});
|
|
|
|
// This is marked as read as it is before alice's first threaded receipt...
|
|
expect(thread.getEventReadUpTo(myUserId)).toBe(events.at(-1)!.getId());
|
|
|
|
const { thread: thread2 } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: "@alice:example.org",
|
|
participantUserIds: ["@alice:example.org"],
|
|
length: 2,
|
|
ts: 1000,
|
|
});
|
|
|
|
// Nothing has been read, this thread is after the first threaded receipt...
|
|
expect(thread2.getEventReadUpTo(myUserId)).toBe(null);
|
|
});
|
|
});
|
|
|
|
describe("resetLiveTimeline", () => {
|
|
// ResetLiveTimeline is used when we have missing messages between the current live timeline's end and newly
|
|
// received messages. In that case, we want to replace the existing live timeline. To ensure pagination
|
|
// continues working correctly, new pagination tokens need to be set on both the old live timeline (which is
|
|
// now a regular timeline) and the new live timeline.
|
|
it("replaces the live timeline and correctly sets pagination tokens", async () => {
|
|
const myUserId = "@bob:example.org";
|
|
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
|
|
timelineSupport: false,
|
|
});
|
|
const client = testClient.client;
|
|
client.supportsThreads = jest.fn().mockReturnValue(true);
|
|
const room = new Room("123", client, myUserId, {
|
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
|
});
|
|
|
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
|
|
|
const { thread } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: myUserId,
|
|
participantUserIds: ["@alice:example.org"],
|
|
length: 3,
|
|
});
|
|
await emitPromise(thread, ThreadEvent.Update);
|
|
expect(thread.length).toBe(2);
|
|
|
|
jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) =>
|
|
Promise.resolve({
|
|
chunk: [],
|
|
start: `${token}-new`,
|
|
end: `${token}-new`,
|
|
}),
|
|
);
|
|
|
|
function timelines(): [string | null, string | null][] {
|
|
return thread.timelineSet
|
|
.getTimelines()
|
|
.map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]);
|
|
}
|
|
|
|
expect(timelines()).toEqual([[null, null]]);
|
|
const promise = thread.resetLiveTimeline("b1", "f1");
|
|
expect(timelines()).toEqual([
|
|
[null, "f1"],
|
|
["b1", null],
|
|
]);
|
|
await promise;
|
|
expect(timelines()).toEqual([
|
|
[null, "f1-new"],
|
|
["b1-new", null],
|
|
]);
|
|
});
|
|
|
|
// As the pagination tokens cannot be used right now, resetLiveTimeline needs to replace them before they can
|
|
// be used. But if in the future the bug in synapse is fixed, and they can actually be used, we can get into a
|
|
// state where the client has paginated (and changed the tokens) while resetLiveTimeline tries to set the
|
|
// corrected tokens. To prevent such a race condition, we make sure that resetLiveTimeline respects any
|
|
// changes done to the pagination tokens.
|
|
it("replaces the live timeline but does not replace changed pagination tokens", async () => {
|
|
const myUserId = "@bob:example.org";
|
|
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
|
|
timelineSupport: false,
|
|
});
|
|
const client = testClient.client;
|
|
client.supportsThreads = jest.fn().mockReturnValue(true);
|
|
const room = new Room("123", client, myUserId, {
|
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
|
});
|
|
|
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
|
|
|
const { thread } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: myUserId,
|
|
participantUserIds: ["@alice:example.org"],
|
|
length: 3,
|
|
});
|
|
await emitPromise(thread, ThreadEvent.Update);
|
|
expect(thread.length).toBe(2);
|
|
|
|
jest.spyOn(client, "createMessagesRequest").mockImplementation((_, token) =>
|
|
Promise.resolve({
|
|
chunk: [],
|
|
start: `${token}-new`,
|
|
end: `${token}-new`,
|
|
}),
|
|
);
|
|
|
|
function timelines(): [string | null, string | null][] {
|
|
return thread.timelineSet
|
|
.getTimelines()
|
|
.map((it) => [it.getPaginationToken(Direction.Backward), it.getPaginationToken(Direction.Forward)]);
|
|
}
|
|
|
|
expect(timelines()).toEqual([[null, null]]);
|
|
const promise = thread.resetLiveTimeline("b1", "f1");
|
|
expect(timelines()).toEqual([
|
|
[null, "f1"],
|
|
["b1", null],
|
|
]);
|
|
thread.timelineSet.getTimelines()[0].setPaginationToken("f2", Direction.Forward);
|
|
thread.timelineSet.getTimelines()[1].setPaginationToken("b2", Direction.Backward);
|
|
await promise;
|
|
expect(timelines()).toEqual([
|
|
[null, "f2"],
|
|
["b2", null],
|
|
]);
|
|
});
|
|
|
|
it("is correctly called by the room", async () => {
|
|
const myUserId = "@bob:example.org";
|
|
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
|
|
timelineSupport: false,
|
|
});
|
|
const client = testClient.client;
|
|
client.supportsThreads = jest.fn().mockReturnValue(true);
|
|
const room = new Room("123", client, myUserId, {
|
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
|
});
|
|
|
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
|
|
|
const { thread } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: myUserId,
|
|
participantUserIds: ["@alice:example.org"],
|
|
length: 3,
|
|
});
|
|
await emitPromise(thread, ThreadEvent.Update);
|
|
expect(thread.length).toBe(2);
|
|
const mock = jest.spyOn(thread, "resetLiveTimeline");
|
|
mock.mockReturnValue(Promise.resolve());
|
|
|
|
room.resetLiveTimeline("b1", "f1");
|
|
expect(mock).toHaveBeenCalledWith("b1", "f1");
|
|
});
|
|
});
|
|
|
|
describe("insertEventIntoTimeline", () => {
|
|
it("Inserts a reaction in timestamp order", () => {
|
|
// Assumption: no server side support because if we have it, events
|
|
// can only be added to the timeline after the thread has been
|
|
// initialised, and we are not properly initialising it here.
|
|
expect(Thread.hasServerSideSupport).toBe(FeatureSupport.None);
|
|
|
|
const client = createClientWithEventMapper();
|
|
const userId = "user1";
|
|
const room = new Room("room1", client, userId);
|
|
|
|
// Given a thread with a root plus 5 messages
|
|
const { thread, events } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: userId,
|
|
participantUserIds: ["@bob:hs", "@chia:hs", "@dv:hs"],
|
|
length: 6,
|
|
ts: 100, // Events will be at ts 100, 101, 102, 103, 104 and 105
|
|
});
|
|
|
|
// When we insert a reaction to the second thread message
|
|
const replyEvent = mkReaction(events[2], client, userId, room.roomId, 104);
|
|
thread.insertEventIntoTimeline(replyEvent);
|
|
|
|
// Then the reaction is inserted based on its timestamp
|
|
expect(thread.events.map((ev) => ev.getId())).toEqual([
|
|
events[0].getId(),
|
|
events[1].getId(),
|
|
events[2].getId(),
|
|
events[3].getId(),
|
|
events[4].getId(),
|
|
replyEvent.getId(),
|
|
events[5].getId(),
|
|
]);
|
|
});
|
|
|
|
describe("Without relations recursion support", () => {
|
|
it("Creates a local echo receipt for new events", async () => {
|
|
// Assumption: no server side support because if we have it, events
|
|
// can only be added to the timeline after the thread has been
|
|
// initialised, and we are not properly initialising it here.
|
|
expect(Thread.hasServerSideSupport).toBe(FeatureSupport.None);
|
|
|
|
// Given a client without relations recursion support
|
|
const client = createClientWithEventMapper();
|
|
|
|
// And a thread with an added event (with later timestamp)
|
|
const userId = "user1";
|
|
const { thread, message2 } = await createThreadAnd2Events(client, 1, 100, 200, userId);
|
|
|
|
// Then a receipt was added to the thread
|
|
const receipt = thread.getReadReceiptForUserId(userId);
|
|
expect(receipt).toBeTruthy();
|
|
expect(receipt?.eventId).toEqual(message2.getId());
|
|
expect(receipt?.data.ts).toEqual(200);
|
|
expect(receipt?.data.thread_id).toEqual(thread.id);
|
|
|
|
// (And the receipt was synthetic)
|
|
expect(thread.getReadReceiptForUserId(userId, true)).toBeNull();
|
|
});
|
|
|
|
it("Doesn't create a local echo receipt for events before an existing receipt", async () => {
|
|
// Assumption: no server side support because if we have it, events
|
|
// can only be added to the timeline after the thread has been
|
|
// initialised, and we are not properly initialising it here.
|
|
expect(Thread.hasServerSideSupport).toBe(FeatureSupport.None);
|
|
|
|
// Given a client without relations recursion support
|
|
const client = createClientWithEventMapper();
|
|
|
|
// And a thread with an added event with a lower timestamp than its other events
|
|
const userId = "user1";
|
|
const { thread, message1 } = await createThreadAnd2Events(client, 300, 200, 100, userId);
|
|
|
|
// Then the receipt is for the first message, because its
|
|
// timestamp is later. This happens because since we have no
|
|
// recursive relations support, we know that sometimes events
|
|
// appear out of order, so we have to check their timestamps as
|
|
// a guess of the correct order.
|
|
expect(thread.getReadReceiptForUserId(userId)?.eventId).toEqual(message1.getId());
|
|
});
|
|
});
|
|
|
|
describe("With relations recursion support", () => {
|
|
it("Creates a local echo receipt for new events", async () => {
|
|
// Assumption: no server side support because if we have it, events
|
|
// can only be added to the timeline after the thread has been
|
|
// initialised, and we are not properly initialising it here.
|
|
expect(Thread.hasServerSideSupport).toBe(FeatureSupport.None);
|
|
|
|
// Given a client WITH relations recursion support
|
|
const client = createClientWithEventMapper(
|
|
new Map([[Feature.RelationsRecursion, ServerSupport.Stable]]),
|
|
);
|
|
|
|
// And a thread with an added event (with later timestamp)
|
|
const userId = "user1";
|
|
const { thread, message2 } = await createThreadAnd2Events(client, 1, 100, 200, userId);
|
|
|
|
// Then a receipt was added to the thread
|
|
const receipt = thread.getReadReceiptForUserId(userId);
|
|
expect(receipt?.eventId).toEqual(message2.getId());
|
|
});
|
|
|
|
it("Creates a local echo receipt even for events BEFORE an existing receipt", async () => {
|
|
// Assumption: no server side support because if we have it, events
|
|
// can only be added to the timeline after the thread has been
|
|
// initialised, and we are not properly initialising it here.
|
|
expect(Thread.hasServerSideSupport).toBe(FeatureSupport.None);
|
|
|
|
// Given a client WITH relations recursion support
|
|
const client = createClientWithEventMapper(
|
|
new Map([[Feature.RelationsRecursion, ServerSupport.Stable]]),
|
|
);
|
|
|
|
// And a thread with an added event with a lower timestamp than its other events
|
|
const userId = "user1";
|
|
const { thread, message2 } = await createThreadAnd2Events(client, 300, 200, 100, userId);
|
|
|
|
// Then a receipt was added for the last message, even though it
|
|
// has lower ts, because relations recursion is available, so we
|
|
// trust the server to have provided us with events in the right
|
|
// order.
|
|
const receipt = thread.getReadReceiptForUserId(userId);
|
|
expect(receipt?.eventId).toEqual(message2.getId());
|
|
});
|
|
});
|
|
|
|
async function createThreadAnd2Events(
|
|
client: MatrixClient,
|
|
rootTs: number,
|
|
message1Ts: number,
|
|
message2Ts: number,
|
|
userId: string,
|
|
): Promise<{ thread: Thread; message1: MatrixEvent; message2: MatrixEvent }> {
|
|
const room = new Room("room1", client, userId);
|
|
|
|
// Given a thread
|
|
const { thread } = mkThread({
|
|
room,
|
|
client,
|
|
authorId: userId,
|
|
participantUserIds: [],
|
|
ts: rootTs,
|
|
});
|
|
// Sanity: there is no read receipt on the thread yet because the
|
|
// thread events don't get properly added to the room by mkThread.
|
|
expect(thread.getReadReceiptForUserId(userId)).toBeNull();
|
|
|
|
const awaitTimelineEvent = new Promise<void>((res) => thread.on(RoomEvent.Timeline, () => res()));
|
|
|
|
// Add a message with ts message1Ts
|
|
const message1 = makeThreadEvent({
|
|
event: true,
|
|
rootEventId: thread.id,
|
|
replyToEventId: thread.id,
|
|
user: userId,
|
|
room: room.roomId,
|
|
ts: message1Ts,
|
|
});
|
|
await thread.addEvent(message1, false, true);
|
|
await awaitTimelineEvent;
|
|
|
|
// Sanity: the thread now has a properly-added event, so this event
|
|
// has a synthetic receipt.
|
|
expect(thread.getReadReceiptForUserId(userId)?.eventId).toEqual(message1.getId());
|
|
|
|
// Add a message with ts message2Ts
|
|
const message2 = makeThreadEvent({
|
|
event: true,
|
|
rootEventId: thread.id,
|
|
replyToEventId: thread.id,
|
|
user: userId,
|
|
room: room.roomId,
|
|
ts: message2Ts,
|
|
});
|
|
await thread.addEvent(message2, false, true);
|
|
await awaitTimelineEvent;
|
|
|
|
return { thread, message1, message2 };
|
|
}
|
|
|
|
function createClientWithEventMapper(canSupport: Map<Feature, ServerSupport> = new Map()): MatrixClient {
|
|
const client = mock(MatrixClient, "MatrixClient");
|
|
client.reEmitter = mock(ReEmitter, "ReEmitter");
|
|
client.canSupport = canSupport;
|
|
jest.spyOn(client, "getEventMapper").mockReturnValue(eventMapperFor(client, {}));
|
|
mocked(client.supportsThreads).mockReturnValue(true);
|
|
return client;
|
|
}
|
|
});
|
|
|
|
describe("Editing events", () => {
|
|
describe("Given server support for threads", () => {
|
|
let previousThreadHasServerSideSupport: FeatureSupport;
|
|
|
|
beforeAll(() => {
|
|
previousThreadHasServerSideSupport = Thread.hasServerSideSupport;
|
|
Thread.hasServerSideSupport = FeatureSupport.Stable;
|
|
});
|
|
|
|
afterAll(() => {
|
|
Thread.hasServerSideSupport = previousThreadHasServerSideSupport;
|
|
});
|
|
|
|
it("Adds edits from sync to the thread timeline and applies them", async () => {
|
|
// Given a thread
|
|
const client = createClient();
|
|
const user = "@alice:matrix.org";
|
|
const room = "!room:z";
|
|
const thread = await createThread(client, user, room);
|
|
|
|
// When a message and an edit are added to the thread
|
|
const messageToEdit = createThreadMessage(thread.id, user, room, "Thread reply");
|
|
const editEvent = mkEdit(messageToEdit, client, user, room, "edit");
|
|
await thread.addEvent(messageToEdit, false);
|
|
await thread.addEvent(editEvent, false);
|
|
|
|
// Then both events end up in the timeline
|
|
const lastEvent = thread.timeline.at(-1)!;
|
|
const secondLastEvent = thread.timeline.at(-2)!;
|
|
expect(lastEvent).toBe(editEvent);
|
|
expect(secondLastEvent).toBe(messageToEdit);
|
|
|
|
// And the first message has been edited
|
|
expect(secondLastEvent.getContent().body).toEqual("edit");
|
|
});
|
|
|
|
it("Adds edits fetched on demand to the thread timeline and applies them", async () => {
|
|
// Given we don't support recursive relations
|
|
const client = createClient(new Map([[Feature.RelationsRecursion, ServerSupport.Unsupported]]));
|
|
// And we have a thread
|
|
const user = "@alice:matrix.org";
|
|
const room = "!room:z";
|
|
const thread = await createThread(client, user, room);
|
|
|
|
// When a message is added to the thread, and an edit to it is provided on demand
|
|
const messageToEdit = createThreadMessage(thread.id, user, room, "Thread reply");
|
|
// (fetchEditsWhereNeeded only applies to encrypted messages for some reason)
|
|
messageToEdit.event.type = EventType.RoomMessageEncrypted;
|
|
const editEvent = mkEdit(messageToEdit, client, user, room, "edit");
|
|
mocked(client.relations).mockImplementation(async (_roomId, eventId) => {
|
|
if (eventId === messageToEdit.getId()) {
|
|
return { events: [editEvent] };
|
|
} else {
|
|
return { events: [] };
|
|
}
|
|
});
|
|
await thread.addEvent(messageToEdit, false);
|
|
|
|
// Then both events end up in the timeline
|
|
const lastEvent = thread.timeline.at(-1)!;
|
|
const secondLastEvent = thread.timeline.at(-2)!;
|
|
expect(lastEvent).toBe(editEvent);
|
|
expect(secondLastEvent).toBe(messageToEdit);
|
|
|
|
// And the first message has been edited
|
|
expect(secondLastEvent.getContent().body).toEqual("edit");
|
|
});
|
|
});
|
|
});
|
|
|
|
describe("addEvent", () => {
|
|
describe("Given server support for threads", () => {
|
|
let previousThreadHasServerSideSupport: FeatureSupport;
|
|
|
|
beforeAll(() => {
|
|
previousThreadHasServerSideSupport = Thread.hasServerSideSupport;
|
|
Thread.hasServerSideSupport = FeatureSupport.Stable;
|
|
});
|
|
|
|
afterAll(() => {
|
|
Thread.hasServerSideSupport = previousThreadHasServerSideSupport;
|
|
});
|
|
|
|
it("Adds events even if they appear out of order", async () => {
|
|
// Given a thread exists
|
|
const client = createClient();
|
|
const user = "@alice:matrix.org";
|
|
const room = "!room:z";
|
|
const thread = await createThread(client, user, room);
|
|
const prevNumEvents = thread.timeline.length;
|
|
|
|
// When two messages come in but the later one has an older timestamp
|
|
const message1 = createThreadMessage(thread.id, user, room, "message1");
|
|
const message2 = createThreadMessage(thread.id, user, room, "message2");
|
|
message2.localTimestamp -= 10000;
|
|
|
|
await thread.addEvent(message1, false);
|
|
await thread.addEvent(message2, false);
|
|
|
|
// Then both events end up in the timeline
|
|
expect(thread.timeline.length - prevNumEvents).toEqual(2);
|
|
const lastEvent = thread.timeline.at(-1)!;
|
|
const secondLastEvent = thread.timeline.at(-2)!;
|
|
expect(lastEvent).toBe(message2);
|
|
expect(secondLastEvent).toBe(message1);
|
|
});
|
|
|
|
it("Adds events to start even if they appear out of order", async () => {
|
|
// Given a thread exists
|
|
const client = createClient();
|
|
const user = "@alice:matrix.org";
|
|
const room = "!room:z";
|
|
const thread = await createThread(client, user, room);
|
|
const prevNumEvents = thread.timeline.length;
|
|
|
|
// When two messages come in but the later one has an older timestamp
|
|
const message1 = createThreadMessage(thread.id, user, room, "message1");
|
|
const message2 = createThreadMessage(thread.id, user, room, "message2");
|
|
message2.localTimestamp -= 10000;
|
|
|
|
await thread.addEvent(message1, false);
|
|
await thread.addEvent(message2, true);
|
|
|
|
// Then both events end up in the timeline
|
|
expect(thread.timeline.length - prevNumEvents).toEqual(2);
|
|
const lastEvent = thread.timeline.at(-1)!;
|
|
const firstEvent = thread.timeline.at(0)!;
|
|
expect(lastEvent).toBe(message1);
|
|
expect(firstEvent).toBe(message2);
|
|
});
|
|
|
|
it("Edit events are properly aggregated in threads with server-side support", async () => {
|
|
// This test reproduces the race condition bug from https://github.com/element-hq/element-web/issues/30617
|
|
// The bug occurs when edits arrive while the thread is not initialized,
|
|
// causing aggregation to fail because the target event isn't in the timeline yet
|
|
|
|
// Given a thread exists with server-side support enabled
|
|
const myUserId = "@alice:example.org";
|
|
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
|
|
timelineSupport: false,
|
|
});
|
|
const client = testClient.client;
|
|
client.supportsThreads = jest.fn().mockReturnValue(true);
|
|
|
|
const room = new Room("!room:z", client, myUserId, {
|
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
|
});
|
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
|
|
|
// Create a root event
|
|
const rootEvent = mkMessage({
|
|
room: room.roomId,
|
|
user: myUserId,
|
|
msg: "Root message",
|
|
event: true,
|
|
});
|
|
|
|
// Create thread manually - starts with initialEventsFetched = false
|
|
const thread = new Thread(rootEvent.getId()!, rootEvent, {
|
|
room: room,
|
|
client: client,
|
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
|
});
|
|
|
|
// The thread is NOT initialized - this is the key to reproducing the bug!
|
|
expect(thread.initialEventsFetched).toBe(false);
|
|
|
|
// Create a message that will be edited
|
|
const originalMessage = mkMessage({
|
|
room: room.roomId,
|
|
user: myUserId,
|
|
msg: "Original message in thread",
|
|
relatesTo: {
|
|
"rel_type": THREAD_RELATION_TYPE.name,
|
|
"event_id": thread.id,
|
|
"m.in_reply_to": {
|
|
event_id: thread.id,
|
|
},
|
|
},
|
|
event: true,
|
|
});
|
|
|
|
// Create edit events BEFORE the original message is in the timeline
|
|
const edit1 = mkEdit(originalMessage, client, myUserId, room.roomId, "Edit 1");
|
|
const edit2 = mkEdit(originalMessage, client, myUserId, room.roomId, "Edit 2");
|
|
const edit3 = mkEdit(originalMessage, client, myUserId, room.roomId, "Final edit");
|
|
|
|
// CRITICAL: Add edits while thread is NOT initialized
|
|
// They will be queued in replayEvents and aggregation will be attempted but fail
|
|
await thread.addEvent(edit1, false);
|
|
|
|
// Check the aggregation state after adding first edit
|
|
// With our fix: edits should NOT be aggregated yet (thread not initialized)
|
|
// Without fix: edits would be aggregated but fail to link to target
|
|
const relationsAfterFirstEdit = thread.timelineSet.relations?.getChildEventsForEvent(
|
|
originalMessage.getId()!,
|
|
RelationType.Replace,
|
|
EventType.RoomMessage,
|
|
);
|
|
|
|
// With the fix, no aggregation happens yet (which is correct)
|
|
// Without the fix, aggregation would happen but fail silently
|
|
expect(relationsAfterFirstEdit).toBeUndefined();
|
|
|
|
// Add remaining edits
|
|
await thread.addEvent(edit2, false);
|
|
await thread.addEvent(edit3, false);
|
|
|
|
// Check that edits went to replayEvents
|
|
expect(thread.replayEvents).toHaveLength(3);
|
|
|
|
// Now initialize the thread and add the original message
|
|
thread.initialEventsFetched = true;
|
|
|
|
// Clear replayEvents and add the original message
|
|
const replayEvents = [...(thread.replayEvents || [])];
|
|
thread.replayEvents = [];
|
|
|
|
// Add original message first
|
|
await thread.addEvent(originalMessage, false);
|
|
|
|
// At this point, the original message should NOT have the edits aggregated yet
|
|
// because they were attempted when the target wasn't in timeline
|
|
const replacingEventBeforeReplay = originalMessage.replacingEvent();
|
|
// With the fix, edits should not be aggregated yet (pre-init)
|
|
expect(replacingEventBeforeReplay).toBeNull();
|
|
|
|
// Then replay the edits
|
|
for (const event of replayEvents) {
|
|
await thread.addEvent(event, false);
|
|
}
|
|
|
|
// After replay, check aggregation
|
|
const replacingEvent = originalMessage.replacingEvent();
|
|
|
|
// This should now work because edits were re-aggregated when replayed
|
|
expect(replacingEvent).toBe(edit3);
|
|
|
|
// The content should also be updated
|
|
expect(originalMessage.getContent().body).toBe("Final edit");
|
|
|
|
// Relations for replaces should now exist and include all edits in order
|
|
const replaceRels = thread.timelineSet.relations!.getChildEventsForEvent(
|
|
originalMessage.getId()!,
|
|
RelationType.Replace,
|
|
EventType.RoomMessage,
|
|
)!;
|
|
const replaceIds = replaceRels.getRelations().map((e) => e.getId());
|
|
expect(replaceIds).toHaveLength(3);
|
|
expect(replaceIds[0]).toBe(edit1.getId());
|
|
expect(replaceIds[1]).toBe(edit2.getId());
|
|
expect(replaceIds[2]).toBe(edit3.getId());
|
|
});
|
|
|
|
it("Reactions aggregate pre-init and remain idempotent on replay", async () => {
|
|
const myUserId = "@alice:example.org";
|
|
const testClient = new TestClient(myUserId, "DEVICE", "ACCESS_TOKEN", undefined, {
|
|
timelineSupport: false,
|
|
});
|
|
const client = testClient.client;
|
|
client.supportsThreads = jest.fn().mockReturnValue(true);
|
|
|
|
// Force server-side support so threads start uninitialised
|
|
const prevSupport = Thread.hasServerSideSupport;
|
|
Thread.setServerSideSupport(FeatureSupport.Stable);
|
|
|
|
try {
|
|
const room = new Room("!room:z", client, myUserId, {
|
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
|
});
|
|
jest.spyOn(client, "getRoom").mockReturnValue(room);
|
|
|
|
// Create a root event and thread
|
|
const rootEvent = mkMessage({ room: room.roomId, user: myUserId, msg: "Root", event: true });
|
|
const thread = new Thread(rootEvent.getId()!, rootEvent, {
|
|
room,
|
|
client,
|
|
pendingEventOrdering: PendingEventOrdering.Detached,
|
|
});
|
|
|
|
expect(thread.initialEventsFetched).toBe(false);
|
|
|
|
// A message inside the thread to react to
|
|
const originalMessage = mkMessage({
|
|
room: room.roomId,
|
|
user: myUserId,
|
|
msg: "Thread message",
|
|
relatesTo: {
|
|
"rel_type": THREAD_RELATION_TYPE.name,
|
|
"event_id": thread.id,
|
|
"m.in_reply_to": { event_id: thread.id },
|
|
},
|
|
event: true,
|
|
});
|
|
|
|
// Create 2 reactions before the message is in the timeline (pre-init)
|
|
const reaction1 = mkReaction(originalMessage, client, myUserId, room.roomId);
|
|
const reaction2 = mkReaction(originalMessage, client, myUserId, room.roomId);
|
|
|
|
// Add reactions while thread is NOT initialised
|
|
thread.addEvent(reaction1, false);
|
|
thread.addEvent(reaction2, false);
|
|
|
|
// Relations should already include the reactions pre-init
|
|
const relsBefore = thread.timelineSet.relations!.getChildEventsForEvent(
|
|
originalMessage.getId()!,
|
|
RelationType.Annotation,
|
|
EventType.Reaction,
|
|
)!;
|
|
expect(relsBefore).toBeTruthy();
|
|
const beforeIds = new Set(relsBefore.getRelations().map((e) => e.getId()));
|
|
expect(beforeIds.size).toBe(2);
|
|
|
|
// Now initialise and replay
|
|
// Ensure reactions are queued for replay as well
|
|
expect(thread.replayEvents).toHaveLength(2);
|
|
const replay = [...(thread.replayEvents || [])];
|
|
thread.replayEvents = [];
|
|
thread.initialEventsFetched = true;
|
|
|
|
// Add the original message first so it becomes findable
|
|
thread.addEvent(originalMessage, false);
|
|
// Replay reactions
|
|
for (const ev of replay) thread.addEvent(ev, false);
|
|
|
|
// Ensure no duplicates after replay (idempotent aggregation)
|
|
const relsAfter = thread.timelineSet.relations!.getChildEventsForEvent(
|
|
originalMessage.getId()!,
|
|
RelationType.Annotation,
|
|
EventType.Reaction,
|
|
)!;
|
|
const afterIds = new Set(relsAfter.getRelations().map((e) => e.getId()));
|
|
expect(afterIds.size).toBe(2);
|
|
expect(afterIds).toEqual(beforeIds);
|
|
} finally {
|
|
// restore
|
|
Thread.setServerSideSupport(prevSupport);
|
|
}
|
|
});
|
|
});
|
|
});
|
|
});
|
|
|
|
/**
|
|
* Create a message event that lives in a thread
|
|
*/
|
|
function createThreadMessage(threadId: string, user: string, room: string, msg: string): MatrixEvent {
|
|
return makeThreadEvent({
|
|
event: true,
|
|
user,
|
|
room,
|
|
msg,
|
|
rootEventId: threadId,
|
|
replyToEventId: threadId,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create a thread and wait for it to be properly initialised (so you can safely
|
|
* add events to it and expect them to appear in the timeline.
|
|
*/
|
|
async function createThread(client: MatrixClient, user: string, roomId: string): Promise<Thread> {
|
|
const root = mkMessage({ event: true, user, room: roomId, msg: "Thread root" });
|
|
const room = new Room(roomId, client, "@roomcreator:x");
|
|
|
|
// Ensure the root is in the room timeline
|
|
root.setThreadId(root.getId());
|
|
await room.addLiveEvents([root], { addToState: false });
|
|
|
|
// Create the thread and wait for it to be initialised
|
|
const thread = room.createThread(root.getId()!, root, [], false);
|
|
await new Promise<void>((res) => thread.once(RoomEvent.TimelineReset, () => res()));
|
|
|
|
return thread;
|
|
}
|
|
|
|
/**
|
|
* Create a MatrixClient that supports threads and has all the methods used when
|
|
* creating a thread that call out to HTTP endpoints mocked out.
|
|
*/
|
|
function createClient(canSupport = new Map()): MatrixClient {
|
|
const client = mock(MatrixClient, "MatrixClient");
|
|
client.reEmitter = mock(ReEmitter, "ReEmitter");
|
|
client.canSupport = canSupport;
|
|
|
|
jest.spyOn(client, "supportsThreads").mockReturnValue(true);
|
|
jest.spyOn(client, "getEventMapper").mockReturnValue(eventMapperFor(client, {}));
|
|
|
|
// Mock methods that call out to HTTP endpoints
|
|
jest.spyOn(client, "paginateEventTimeline").mockResolvedValue(true);
|
|
jest.spyOn(client, "relations").mockResolvedValue({ events: [] });
|
|
jest.spyOn(client, "fetchRoomEvent").mockResolvedValue({});
|
|
|
|
return client;
|
|
}
|