1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-07-31 15:24:23 +03:00

Fix race conditions around threads (#2331)

This commit is contained in:
Michael Telatynski
2022-05-03 14:25:17 +01:00
committed by GitHub
parent 274d6a9597
commit ac5fee0a69
7 changed files with 218 additions and 195 deletions

View File

@ -811,9 +811,7 @@ describe("MatrixClient", function() {
} }
}, },
}, },
threads: { getThread: jest.fn(),
get: jest.fn(),
},
addPendingEvent: jest.fn(), addPendingEvent: jest.fn(),
updatePendingEvent: jest.fn(), updatePendingEvent: jest.fn(),
reEmitter: { reEmitter: {

View File

@ -36,7 +36,7 @@ import { RoomState } from "../../src/models/room-state";
import { UNSTABLE_ELEMENT_FUNCTIONAL_USERS } from "../../src/@types/event"; import { UNSTABLE_ELEMENT_FUNCTIONAL_USERS } from "../../src/@types/event";
import { TestClient } from "../TestClient"; import { TestClient } from "../TestClient";
import { emitPromise } from "../test-utils/test-utils"; import { emitPromise } from "../test-utils/test-utils";
import { ThreadEvent } from "../../src/models/thread"; import { Thread, ThreadEvent } from "../../src/models/thread";
describe("Room", function() { describe("Room", function() {
const roomId = "!foo:bar"; const roomId = "!foo:bar";
@ -1914,7 +1914,7 @@ describe("Room", function() {
}, },
}); });
room.createThread(undefined, [eventWithoutARootEvent]); room.createThread("$000", undefined, [eventWithoutARootEvent]);
const rootEvent = new MatrixEvent({ const rootEvent = new MatrixEvent({
event_id: "$666", event_id: "$666",
@ -1932,7 +1932,7 @@ describe("Room", function() {
}, },
}); });
expect(() => room.createThread(rootEvent, [])).not.toThrow(); expect(() => room.createThread(rootEvent.getId(), rootEvent, [])).not.toThrow();
}); });
it("Edits update the lastReply event", async () => { it("Edits update the lastReply event", async () => {
@ -1959,14 +1959,16 @@ describe("Room", function() {
}, },
}); });
let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([randomMessage, threadRoot, threadResponse]); room.addLiveEvents([randomMessage, threadRoot, threadResponse]);
const thread = await emitPromise(room, ThreadEvent.New); const thread = await prom;
expect(thread.replyToEvent).toBe(threadResponse); expect(thread.replyToEvent).toBe(threadResponse);
expect(thread.replyToEvent.getContent().body).toBe(threadResponse.getContent().body); expect(thread.replyToEvent.getContent().body).toBe(threadResponse.getContent().body);
prom = emitPromise(thread, ThreadEvent.Update);
room.addLiveEvents([threadResponseEdit]); room.addLiveEvents([threadResponseEdit]);
await emitPromise(thread, ThreadEvent.Update); await prom;
expect(thread.replyToEvent.getContent().body).toBe(threadResponseEdit.getContent()["m.new_content"].body); expect(thread.replyToEvent.getContent().body).toBe(threadResponseEdit.getContent()["m.new_content"].body);
}); });
@ -1993,15 +1995,17 @@ describe("Room", function() {
}, },
}); });
let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]); room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]);
const thread = await emitPromise(room, ThreadEvent.New); const thread = await prom;
expect(thread).toHaveLength(2); expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId()); expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
prom = emitPromise(thread, ThreadEvent.Update);
const threadResponse1Redaction = mkRedaction(threadResponse1); const threadResponse1Redaction = mkRedaction(threadResponse1);
room.addLiveEvents([threadResponse1Redaction]); room.addLiveEvents([threadResponse1Redaction]);
await emitPromise(thread, ThreadEvent.Update); await prom;
expect(thread).toHaveLength(1); expect(thread).toHaveLength(1);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId()); expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
}); });
@ -2030,15 +2034,17 @@ describe("Room", function() {
}, },
}); });
let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]); room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]);
const thread = await emitPromise(room, ThreadEvent.New); const thread = await prom;
expect(thread).toHaveLength(2); expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId()); expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
prom = emitPromise(thread, ThreadEvent.Update);
const threadResponse2ReactionRedaction = mkRedaction(threadResponse2Reaction); const threadResponse2ReactionRedaction = mkRedaction(threadResponse2Reaction);
room.addLiveEvents([threadResponse2ReactionRedaction]); room.addLiveEvents([threadResponse2ReactionRedaction]);
await emitPromise(thread, ThreadEvent.Update); await prom;
expect(thread).toHaveLength(2); expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId()); expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
}); });
@ -2067,15 +2073,17 @@ describe("Room", function() {
}, },
}); });
let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]); room.addLiveEvents([threadRoot, threadResponse1, threadResponse2, threadResponse2Reaction]);
const thread = await emitPromise(room, ThreadEvent.New); const thread = await prom;
expect(thread).toHaveLength(2); expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId()); expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
prom = emitPromise(room, ThreadEvent.Update);
const threadRootRedaction = mkRedaction(threadRoot); const threadRootRedaction = mkRedaction(threadRoot);
room.addLiveEvents([threadRootRedaction]); room.addLiveEvents([threadRootRedaction]);
await emitPromise(thread, ThreadEvent.Update); await prom;
expect(thread).toHaveLength(2); expect(thread).toHaveLength(2);
}); });
@ -2102,21 +2110,24 @@ describe("Room", function() {
}, },
}); });
let prom = emitPromise(room, ThreadEvent.New);
room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]); room.addLiveEvents([threadRoot, threadResponse1, threadResponse2]);
const thread = await emitPromise(room, ThreadEvent.New); const thread = await prom;
expect(thread).toHaveLength(2); expect(thread).toHaveLength(2);
expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId()); expect(thread.replyToEvent.getId()).toBe(threadResponse2.getId());
prom = emitPromise(room, ThreadEvent.Update);
const threadResponse2Redaction = mkRedaction(threadResponse2); const threadResponse2Redaction = mkRedaction(threadResponse2);
room.addLiveEvents([threadResponse2Redaction]); room.addLiveEvents([threadResponse2Redaction]);
await emitPromise(thread, ThreadEvent.Update); await prom;
expect(thread).toHaveLength(1); expect(thread).toHaveLength(1);
expect(thread.replyToEvent.getId()).toBe(threadResponse1.getId()); expect(thread.replyToEvent.getId()).toBe(threadResponse1.getId());
prom = emitPromise(room, ThreadEvent.Update);
const threadResponse1Redaction = mkRedaction(threadResponse1); const threadResponse1Redaction = mkRedaction(threadResponse1);
room.addLiveEvents([threadResponse1Redaction]); room.addLiveEvents([threadResponse1Redaction]);
await emitPromise(thread, ThreadEvent.Update); await prom;
expect(thread).toHaveLength(0); expect(thread).toHaveLength(0);
expect(thread.replyToEvent.getId()).toBe(threadRoot.getId()); expect(thread.replyToEvent.getId()).toBe(threadRoot.getId());
}); });
@ -2234,5 +2245,45 @@ describe("Room", function() {
expect(room.eventShouldLiveIn(reply2, events, roots).shouldLiveInRoom).toBeTruthy(); expect(room.eventShouldLiveIn(reply2, events, roots).shouldLiveInRoom).toBeTruthy();
expect(room.eventShouldLiveIn(reply2, events, roots).shouldLiveInThread).toBeFalsy(); expect(room.eventShouldLiveIn(reply2, events, roots).shouldLiveInThread).toBeFalsy();
}); });
it("should aggregate relations in thread event timeline set", () => {
Thread.setServerSideSupport(true, true);
const threadRoot = mkMessage();
const rootReaction = mkReaction(threadRoot);
const threadResponse = mkThreadResponse(threadRoot);
const threadReaction = mkReaction(threadResponse);
const events = [
threadRoot,
rootReaction,
threadResponse,
threadReaction,
];
room.addLiveEvents(events);
const thread = threadRoot.getThread();
expect(thread.rootEvent).toBe(threadRoot);
const rootRelations = thread.timelineSet.getRelationsForEvent(
threadRoot.getId(),
RelationType.Annotation,
EventType.Reaction,
).getSortedAnnotationsByKey();
expect(rootRelations).toHaveLength(1);
expect(rootRelations[0][0]).toEqual(rootReaction.getRelation().key);
expect(rootRelations[0][1].size).toEqual(1);
expect(rootRelations[0][1].has(rootReaction)).toBeTruthy();
const responseRelations = thread.timelineSet.getRelationsForEvent(
threadResponse.getId(),
RelationType.Annotation,
EventType.Reaction,
).getSortedAnnotationsByKey();
expect(responseRelations).toHaveLength(1);
expect(responseRelations[0][0]).toEqual(threadReaction.getRelation().key);
expect(responseRelations[0][1].size).toEqual(1);
expect(responseRelations[0][1].has(threadReaction)).toBeTruthy();
});
}); });
}); });

View File

@ -48,7 +48,9 @@ import { IRoomEncryption, RoomList } from './crypto/RoomList';
import { logger } from './logger'; import { logger } from './logger';
import { SERVICE_TYPES } from './service-types'; import { SERVICE_TYPES } from './service-types';
import { import {
FileType, HttpApiEvent, HttpApiEventHandlerMap, FileType,
HttpApiEvent,
HttpApiEventHandlerMap,
IHttpOpts, IHttpOpts,
IUpload, IUpload,
MatrixError, MatrixError,
@ -3741,7 +3743,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
"rel_type": THREAD_RELATION_TYPE.name, "rel_type": THREAD_RELATION_TYPE.name,
"event_id": threadId, "event_id": threadId,
}; };
const thread = this.getRoom(roomId)?.threads.get(threadId); const thread = this.getRoom(roomId)?.getThread(threadId);
if (thread) { if (thread) {
content["m.relates_to"]["m.in_reply_to"] = { content["m.relates_to"]["m.in_reply_to"] = {
"event_id": thread.lastReply((ev: MatrixEvent) => { "event_id": thread.lastReply((ev: MatrixEvent) => {
@ -3790,7 +3792,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
})); }));
const room = this.getRoom(roomId); const room = this.getRoom(roomId);
const thread = room?.threads.get(threadId); const thread = room?.getThread(threadId);
if (thread) { if (thread) {
localEvent.setThread(thread); localEvent.setThread(thread);
} }
@ -5185,7 +5187,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
limit, limit,
Direction.Backward, Direction.Backward,
); );
}).then(async (res: IMessagesResponse) => { }).then((res: IMessagesResponse) => {
const matrixEvents = res.chunk.map(this.getEventMapper()); const matrixEvents = res.chunk.map(this.getEventMapper());
if (res.state) { if (res.state) {
const stateEvents = res.state.map(this.getEventMapper()); const stateEvents = res.state.map(this.getEventMapper());
@ -5196,7 +5198,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
this.processBeaconEvents(room, timelineEvents); this.processBeaconEvents(room, timelineEvents);
room.addEventsToTimeline(timelineEvents, true, room.getLiveTimeline()); room.addEventsToTimeline(timelineEvents, true, room.getLiveTimeline());
await this.processThreadEvents(room, threadedEvents, true); this.processThreadEvents(room, threadedEvents, true);
room.oldState.paginationToken = res.end; room.oldState.paginationToken = res.end;
if (res.chunk.length === 0) { if (res.chunk.length === 0) {
@ -5299,25 +5301,27 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
event.isRelation(THREAD_RELATION_TYPE.name) event.isRelation(THREAD_RELATION_TYPE.name)
) { ) {
const [, threadedEvents] = timelineSet.room.partitionThreadedEvents(events); const [, threadedEvents] = timelineSet.room.partitionThreadedEvents(events);
const thread = await timelineSet.room.createThreadFetchRoot(event.threadRootId, threadedEvents, true); let thread = timelineSet.room.getThread(event.threadRootId);
if (!thread) {
let nextBatch: string; thread = timelineSet.room.createThread(event.threadRootId, undefined, threadedEvents, true);
const response = await thread.fetchInitialEvents();
if (response?.nextBatch) {
nextBatch = response.nextBatch;
} }
const opts: IRelationsRequestOpts = { const opts: IRelationsRequestOpts = {
direction: Direction.Backward,
limit: 50, limit: 50,
}; };
// Fetch events until we find the one we were asked for await thread.fetchInitialEvents();
let nextBatch = thread.liveTimeline.getPaginationToken(Direction.Backward);
// Fetch events until we find the one we were asked for, or we run out of pages
while (!thread.findEventById(eventId)) { while (!thread.findEventById(eventId)) {
if (nextBatch) { if (nextBatch) {
opts.from = nextBatch; opts.from = nextBatch;
} }
({ nextBatch } = await thread.fetchEvents(opts)); ({ nextBatch } = await thread.fetchEvents(opts));
if (!nextBatch) break;
} }
return thread.liveTimeline; return thread.liveTimeline;
@ -5336,7 +5340,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(events); const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(events);
timelineSet.addEventsToTimeline(timelineEvents, true, timeline, res.start); timelineSet.addEventsToTimeline(timelineEvents, true, timeline, res.start);
// The target event is not in a thread but process the contextual events, so we can show any threads around it. // The target event is not in a thread but process the contextual events, so we can show any threads around it.
await this.processThreadEvents(timelineSet.room, threadedEvents, true); this.processThreadEvents(timelineSet.room, threadedEvents, true);
this.processBeaconEvents(timelineSet.room, timelineEvents); this.processBeaconEvents(timelineSet.room, timelineEvents);
// There is no guarantee that the event ended up in "timeline" (we might have switched to a neighbouring // There is no guarantee that the event ended up in "timeline" (we might have switched to a neighbouring
@ -5493,7 +5497,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
opts.limit, opts.limit,
dir, dir,
eventTimeline.getFilter(), eventTimeline.getFilter(),
).then(async (res) => { ).then((res) => {
if (res.state) { if (res.state) {
const roomState = eventTimeline.getState(dir); const roomState = eventTimeline.getState(dir);
const stateEvents = res.state.map(this.getEventMapper()); const stateEvents = res.state.map(this.getEventMapper());
@ -5506,7 +5510,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(matrixEvents); const [timelineEvents, threadedEvents] = timelineSet.room.partitionThreadedEvents(matrixEvents);
timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token); timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
this.processBeaconEvents(timelineSet.room, timelineEvents); this.processBeaconEvents(timelineSet.room, timelineEvents);
await this.processThreadEvents(room, threadedEvents, backwards); this.processThreadEvents(room, threadedEvents, backwards);
// if we've hit the end of the timeline, we need to stop trying to // if we've hit the end of the timeline, we need to stop trying to
// paginate. We need to keep the 'forwards' token though, to make sure // paginate. We need to keep the 'forwards' token though, to make sure
@ -6663,7 +6667,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventId: string, eventId: string,
relationType?: RelationType | string | null, relationType?: RelationType | string | null,
eventType?: EventType | string | null, eventType?: EventType | string | null,
opts: IRelationsRequestOpts = {}, opts: IRelationsRequestOpts = { direction: Direction.Backward },
): Promise<{ ): Promise<{
originalEvent: MatrixEvent; originalEvent: MatrixEvent;
events: MatrixEvent[]; events: MatrixEvent[];
@ -7204,7 +7208,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventId: string, eventId: string,
relationType?: RelationType | string | null, relationType?: RelationType | string | null,
eventType?: EventType | string | null, eventType?: EventType | string | null,
opts: IRelationsRequestOpts = {}, opts: IRelationsRequestOpts = { direction: Direction.Backward },
): Promise<IRelationsResponse> { ): Promise<IRelationsResponse> {
const queryString = utils.encodeParams(opts as Record<string, string | number>); const queryString = utils.encodeParams(opts as Record<string, string | number>);
@ -8916,12 +8920,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
/** /**
* @experimental * @experimental
*/ */
public async processThreadEvents( public processThreadEvents(room: Room, threadedEvents: MatrixEvent[], toStartOfTimeline: boolean): void {
room: Room, room.processThreadedEvents(threadedEvents, toStartOfTimeline);
threadedEvents: MatrixEvent[],
toStartOfTimeline: boolean,
): Promise<void> {
await room.processThreadedEvents(threadedEvents, toStartOfTimeline);
} }
public processBeaconEvents( public processBeaconEvents(

View File

@ -852,14 +852,13 @@ export class EventTimelineSet extends TypedEventEmitter<EmittedEvents, EventTime
} }
let relationsWithEventType = relationsWithRelType[eventType]; let relationsWithEventType = relationsWithRelType[eventType];
let relatesToEvent: MatrixEvent;
if (!relationsWithEventType) { if (!relationsWithEventType) {
relationsWithEventType = relationsWithRelType[eventType] = new Relations( relationsWithEventType = relationsWithRelType[eventType] = new Relations(
relationType, relationType,
eventType, eventType,
this.room, this.room,
); );
relatesToEvent = this.findEventById(relatesToEventId) || this.room.getPendingEvent(relatesToEventId); const relatesToEvent = this.findEventById(relatesToEventId) || this.room.getPendingEvent(relatesToEventId);
if (relatesToEvent) { if (relatesToEvent) {
relationsWithEventType.setTargetEvent(relatesToEvent); relationsWithEventType.setTargetEvent(relatesToEvent);
} }

View File

@ -1303,8 +1303,7 @@ export class MatrixEvent extends TypedEventEmitter<EmittedEvents, MatrixEventHan
public isRelation(relType: string = undefined): boolean { public isRelation(relType: string = undefined): boolean {
// Relation info is lifted out of the encrypted content when sent to // Relation info is lifted out of the encrypted content when sent to
// encrypted rooms, so we have to check `getWireContent` for this. // encrypted rooms, so we have to check `getWireContent` for this.
const content = this.getWireContent(); const relation = this.getWireContent()?.["m.relates_to"];
const relation = content && content["m.relates_to"];
return relation && relation.rel_type && relation.event_id && return relation && relation.rel_type && relation.event_id &&
((relType && relation.rel_type === relType) || !relType); ((relType && relation.rel_type === relType) || !relType);
} }

View File

@ -22,7 +22,7 @@ import { EventTimelineSet, DuplicateStrategy } from "./event-timeline-set";
import { Direction, EventTimeline } from "./event-timeline"; import { Direction, EventTimeline } from "./event-timeline";
import { getHttpUriForMxc } from "../content-repo"; import { getHttpUriForMxc } from "../content-repo";
import * as utils from "../utils"; import * as utils from "../utils";
import { defer, normalize } from "../utils"; import { normalize } from "../utils";
import { IEvent, IThreadBundledRelationship, MatrixEvent, MatrixEventEvent, MatrixEventHandlerMap } from "./event"; import { IEvent, IThreadBundledRelationship, MatrixEvent, MatrixEventEvent, MatrixEventHandlerMap } from "./event";
import { EventStatus } from "./event-status"; import { EventStatus } from "./event-status";
import { RoomMember } from "./room-member"; import { RoomMember } from "./room-member";
@ -214,8 +214,6 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
private getTypeWarning = false; private getTypeWarning = false;
private getVersionWarning = false; private getVersionWarning = false;
private membersPromise?: Promise<boolean>; private membersPromise?: Promise<boolean>;
// Map from threadId to pending Thread instance created by createThreadFetchRoot
private threadPromises = new Map<string, Promise<Thread>>();
// XXX: These should be read-only // XXX: These should be read-only
/** /**
@ -266,7 +264,7 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
/** /**
* @experimental * @experimental
*/ */
public threads = new Map<string, Thread>(); private threads = new Map<string, Thread>();
public lastThread: Thread; public lastThread: Thread;
/** /**
@ -1208,9 +1206,7 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
* @experimental * @experimental
*/ */
public getThread(eventId: string): Thread { public getThread(eventId: string): Thread {
return this.getThreads().find(thread => { return this.threads.get(eventId);
return thread.id === eventId;
});
} }
/** /**
@ -1524,7 +1520,7 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
} }
if (!this.getThread(rootEvent.getId())) { if (!this.getThread(rootEvent.getId())) {
this.createThread(rootEvent, [], true); this.createThread(rootEvent.getId(), rootEvent, [], true);
} }
} }
@ -1620,58 +1616,14 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
return threadId ? this.getThread(threadId) : null; return threadId ? this.getThread(threadId) : null;
} }
public async createThreadFetchRoot( private addThreadedEvents(threadId: string, events: MatrixEvent[], toStartOfTimeline = false): void {
threadId: string,
events?: MatrixEvent[],
toStartOfTimeline?: boolean,
): Promise<Thread | null> {
let thread = this.getThread(threadId); let thread = this.getThread(threadId);
if (!thread) {
const deferred = defer<Thread | null>();
this.threadPromises.set(threadId, deferred.promise);
let rootEvent = this.findEventById(threadId);
// If the rootEvent does not exist in the local stores, then fetch it from the server.
try {
const eventData = await this.client.fetchRoomEvent(this.roomId, threadId);
const mapper = this.client.getEventMapper();
rootEvent = mapper(eventData); // will merge with existing event object if such is known
} catch (e) {
logger.error("Failed to fetch thread root to construct thread with", e);
} finally {
this.threadPromises.delete(threadId);
// The root event might be not be visible to the person requesting it.
// If it wasn't fetched successfully the thread will work in "limited" mode and won't
// benefit from all the APIs a homeserver can provide to enhance the thread experience
thread = this.createThread(rootEvent, events, toStartOfTimeline);
if (thread) {
rootEvent?.setThread(thread);
}
deferred.resolve(thread);
}
}
return thread;
}
private async addThreadedEvents(events: MatrixEvent[], threadId: string, toStartOfTimeline = false): Promise<void> {
let thread = this.getThread(threadId);
if (this.threadPromises.has(threadId)) {
thread = await this.threadPromises.get(threadId);
}
events = events.filter(e => e.getId() !== threadId); // filter out any root events
if (thread) { if (thread) {
for (const event of events) { thread.addEvents(events, toStartOfTimeline);
await thread.addEvent(event, toStartOfTimeline);
}
} else { } else {
thread = await this.createThreadFetchRoot(threadId, events, toStartOfTimeline); const rootEvent = this.findEventById(threadId) ?? events.find(e => e.getId() === threadId);
} thread = this.createThread(threadId, rootEvent, events, toStartOfTimeline);
if (thread) {
this.emit(ThreadEvent.Update, thread); this.emit(ThreadEvent.Update, thread);
} }
} }
@ -1680,30 +1632,29 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
* Adds events to a thread's timeline. Will fire "Thread.update" * Adds events to a thread's timeline. Will fire "Thread.update"
* @experimental * @experimental
*/ */
public async processThreadedEvents(events: MatrixEvent[], toStartOfTimeline: boolean): Promise<unknown> { public processThreadedEvents(events: MatrixEvent[], toStartOfTimeline: boolean): void {
events.forEach(this.applyRedaction); events.forEach(this.applyRedaction);
const eventsByThread: { [threadId: string]: MatrixEvent[] } = {}; const eventsByThread: { [threadId: string]: MatrixEvent[] } = {};
for (const event of events) { for (const event of events) {
const { threadId, shouldLiveInThread } = this.eventShouldLiveIn(event); const { threadId, shouldLiveInThread } = this.eventShouldLiveIn(event);
if (shouldLiveInThread) { if (shouldLiveInThread && !eventsByThread[threadId]) {
if (!eventsByThread[threadId]) { eventsByThread[threadId] = [];
eventsByThread[threadId] = [];
}
eventsByThread[threadId].push(event);
} }
eventsByThread[threadId]?.push(event);
} }
return Promise.all(Object.entries(eventsByThread).map(([threadId, events]) => ( Object.entries(eventsByThread).map(([threadId, events]) => (
this.addThreadedEvents(events, threadId, toStartOfTimeline) this.addThreadedEvents(threadId, events, toStartOfTimeline)
))); ));
} }
public createThread( public createThread(
threadId: string,
rootEvent: MatrixEvent | undefined, rootEvent: MatrixEvent | undefined,
events: MatrixEvent[] = [], events: MatrixEvent[] = [],
toStartOfTimeline: boolean, toStartOfTimeline: boolean,
): Thread | undefined { ): Thread {
if (rootEvent) { if (rootEvent) {
const tl = this.getTimelineForEvent(rootEvent.getId()); const tl = this.getTimelineForEvent(rootEvent.getId());
const relatedEvents = tl?.getTimelineSet().getAllRelationsEventForEvent(rootEvent.getId()); const relatedEvents = tl?.getTimelineSet().getAllRelationsEventForEvent(rootEvent.getId());
@ -1712,45 +1663,44 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
} }
} }
const thread = new Thread(rootEvent, { const thread = new Thread(threadId, rootEvent, {
initialEvents: events, initialEvents: events,
room: this, room: this,
client: this.client, client: this.client,
}); });
// If we managed to create a thread and figure out its `id` then we can use it // If we managed to create a thread and figure out its `id` then we can use it
if (thread.id) { this.threads.set(thread.id, thread);
this.threads.set(thread.id, thread); this.reEmitter.reEmit(thread, [
this.reEmitter.reEmit(thread, [ ThreadEvent.Update,
ThreadEvent.Update, ThreadEvent.NewReply,
ThreadEvent.NewReply, RoomEvent.Timeline,
RoomEvent.Timeline, RoomEvent.TimelineReset,
RoomEvent.TimelineReset, ]);
]);
if (!this.lastThread || this.lastThread.rootEvent?.localTimestamp < rootEvent?.localTimestamp) { if (!this.lastThread || this.lastThread.rootEvent?.localTimestamp < rootEvent?.localTimestamp) {
this.lastThread = thread; this.lastThread = thread;
}
this.emit(ThreadEvent.New, thread, toStartOfTimeline);
if (this.threadsReady) {
this.threadsTimelineSets.forEach(timelineSet => {
if (thread.rootEvent) {
if (Thread.hasServerSideSupport) {
timelineSet.addLiveEvent(thread.rootEvent);
} else {
timelineSet.addEventToTimeline(
thread.rootEvent,
timelineSet.getLiveTimeline(),
toStartOfTimeline,
);
}
}
});
}
return thread;
} }
this.emit(ThreadEvent.New, thread, toStartOfTimeline);
if (this.threadsReady) {
this.threadsTimelineSets.forEach(timelineSet => {
if (thread.rootEvent) {
if (Thread.hasServerSideSupport) {
timelineSet.addLiveEvent(thread.rootEvent);
} else {
timelineSet.addEventToTimeline(
thread.rootEvent,
timelineSet.getLiveTimeline(),
toStartOfTimeline,
);
}
}
});
}
return thread;
} }
private applyRedaction = (event: MatrixEvent): void => { private applyRedaction = (event: MatrixEvent): void => {
@ -2191,7 +2141,6 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
} }
const threadRoots = this.findThreadRoots(events); const threadRoots = this.findThreadRoots(events);
const threadInfos = events.map(e => this.eventShouldLiveIn(e, events, threadRoots));
const eventsByThread: { [threadId: string]: MatrixEvent[] } = {}; const eventsByThread: { [threadId: string]: MatrixEvent[] } = {};
for (let i = 0; i < events.length; i++) { for (let i = 0; i < events.length; i++) {
@ -2202,14 +2151,12 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
shouldLiveInRoom, shouldLiveInRoom,
shouldLiveInThread, shouldLiveInThread,
threadId, threadId,
} = threadInfos[i]; } = this.eventShouldLiveIn(events[i], events, threadRoots);
if (shouldLiveInThread) { if (shouldLiveInThread && !eventsByThread[threadId]) {
if (!eventsByThread[threadId]) { eventsByThread[threadId] = [];
eventsByThread[threadId] = [];
}
eventsByThread[threadId].push(events[i]);
} }
eventsByThread[threadId]?.push(events[i]);
if (shouldLiveInRoom) { if (shouldLiveInRoom) {
this.addLiveEvent(events[i], duplicateStrategy, fromCache); this.addLiveEvent(events[i], duplicateStrategy, fromCache);
@ -2217,7 +2164,7 @@ export class Room extends TypedEventEmitter<EmittedEvents, RoomEventHandlerMap>
} }
Object.entries(eventsByThread).forEach(([threadId, threadEvents]) => { Object.entries(eventsByThread).forEach(([threadId, threadEvents]) => {
this.addThreadedEvents(threadEvents, threadId, false); this.addThreadedEvents(threadId, threadEvents, false);
}); });
} }

View File

@ -70,12 +70,11 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
public readonly room: Room; public readonly room: Room;
public readonly client: MatrixClient; public readonly client: MatrixClient;
public initialEventsFetched = false; public initialEventsFetched = !Thread.hasServerSideSupport;
public readonly id: string;
constructor( constructor(
public readonly rootEvent: MatrixEvent | undefined, public readonly id: string,
public rootEvent: MatrixEvent | undefined,
opts: IThreadOpts, opts: IThreadOpts,
) { ) {
super(); super();
@ -99,12 +98,33 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
this.room.on(RoomEvent.LocalEchoUpdated, this.onEcho); this.room.on(RoomEvent.LocalEchoUpdated, this.onEcho);
this.timelineSet.on(RoomEvent.Timeline, this.onEcho); this.timelineSet.on(RoomEvent.Timeline, this.onEcho);
// If we weren't able to find the root event, it's probably missing, if (opts.initialEvents) {
// and we define the thread ID from one of the thread relation this.addEvents(opts.initialEvents, false);
this.id = rootEvent?.getId() ?? opts?.initialEvents?.find(event => event.isThreadRelation)?.relationEventId; }
this.initialiseThread(this.rootEvent); // even if this thread is thought to be originating from this client, we initialise it as we may be in a
// gappy sync and a thread around this event may already exist.
this.initialiseThread();
opts?.initialEvents?.forEach(event => this.addEvent(event, false)); this.rootEvent?.setThread(this);
}
private async fetchRootEvent(): Promise<void> {
this.rootEvent = this.room.findEventById(this.id);
// If the rootEvent does not exist in the local stores, then fetch it from the server.
try {
const eventData = await this.client.fetchRoomEvent(this.roomId, this.id);
const mapper = this.client.getEventMapper();
this.rootEvent = mapper(eventData); // will merge with existing event object if such is known
} catch (e) {
logger.error("Failed to fetch thread root to construct thread with", e);
}
// The root event might be not be visible to the person requesting it.
// If it wasn't fetched successfully the thread will work in "limited" mode and won't
// benefit from all the APIs a homeserver can provide to enhance the thread experience
this.rootEvent?.setThread(this);
this.emit(ThreadEvent.Update, this);
} }
public static setServerSideSupport(hasServerSideSupport: boolean, useStable: boolean): void { public static setServerSideSupport(hasServerSideSupport: boolean, useStable: boolean): void {
@ -180,6 +200,11 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
} }
} }
public addEvents(events: MatrixEvent[], toStartOfTimeline: boolean): void {
events.forEach(ev => this.addEvent(ev, toStartOfTimeline, false));
this.emit(ThreadEvent.Update, this);
}
/** /**
* Add an event to the thread and updates * Add an event to the thread and updates
* the tail/root references if needed * the tail/root references if needed
@ -187,43 +212,59 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
* @param event The event to add * @param event The event to add
* @param {boolean} toStartOfTimeline whether the event is being added * @param {boolean} toStartOfTimeline whether the event is being added
* to the start (and not the end) of the timeline. * to the start (and not the end) of the timeline.
* @param {boolean} emit whether to emit the Update event if the thread was updated or not.
*/ */
public async addEvent(event: MatrixEvent, toStartOfTimeline: boolean): Promise<void> { public addEvent(event: MatrixEvent, toStartOfTimeline: boolean, emit = true): void {
event.setThread(this);
if (!this._currentUserParticipated && event.getSender() === this.client.getUserId()) {
this._currentUserParticipated = true;
}
// Add all annotations and replace relations to the timeline so that the relations are processed accordingly
if ([RelationType.Annotation, RelationType.Replace].includes(event.getRelation()?.rel_type as RelationType)) {
this.addEventToTimeline(event, toStartOfTimeline);
return;
}
// Add all incoming events to the thread's timeline set when there's no server support // Add all incoming events to the thread's timeline set when there's no server support
if (!Thread.hasServerSideSupport) { if (!Thread.hasServerSideSupport) {
// all the relevant membership info to hydrate events with a sender // all the relevant membership info to hydrate events with a sender
// is held in the main room timeline // is held in the main room timeline
// We want to fetch the room state from there and pass it down to this thread // We want to fetch the room state from there and pass it down to this thread
// timeline set to let it reconcile an event with its relevant RoomMember // timeline set to let it reconcile an event with its relevant RoomMember
event.setThread(this);
this.addEventToTimeline(event, toStartOfTimeline); this.addEventToTimeline(event, toStartOfTimeline);
await this.client.decryptEventIfNeeded(event, {}); this.client.decryptEventIfNeeded(event, {});
} else if (!toStartOfTimeline && } else if (!toStartOfTimeline &&
this.initialEventsFetched && this.initialEventsFetched &&
event.localTimestamp > this.lastReply().localTimestamp event.localTimestamp > this.lastReply()?.localTimestamp
) { ) {
await this.fetchEditsWhereNeeded(event); this.fetchEditsWhereNeeded(event);
this.addEventToTimeline(event, false); this.addEventToTimeline(event, false);
} }
if (!this._currentUserParticipated && event.getSender() === this.client.getUserId()) {
this._currentUserParticipated = true;
}
// If no thread support exists we want to count all thread relation // If no thread support exists we want to count all thread relation
// added as a reply. We can't rely on the bundled relationships count // added as a reply. We can't rely on the bundled relationships count
if (!Thread.hasServerSideSupport && event.isRelation(THREAD_RELATION_TYPE.name)) { if ((!Thread.hasServerSideSupport || !this.rootEvent) && event.isRelation(THREAD_RELATION_TYPE.name)) {
this.replyCount++; this.replyCount++;
} }
this.emit(ThreadEvent.Update, this); if (emit) {
this.emit(ThreadEvent.Update, this);
}
} }
private initialiseThread(rootEvent: MatrixEvent | undefined): void { private getRootEventBundledRelationship(rootEvent = this.rootEvent): IThreadBundledRelationship {
const bundledRelationship = rootEvent return rootEvent?.getServerAggregatedRelation<IThreadBundledRelationship>(THREAD_RELATION_TYPE.name);
?.getServerAggregatedRelation<IThreadBundledRelationship>(THREAD_RELATION_TYPE.name); }
private async initialiseThread(): Promise<void> {
let bundledRelationship = this.getRootEventBundledRelationship();
if (Thread.hasServerSideSupport && !bundledRelationship) {
await this.fetchRootEvent();
bundledRelationship = this.getRootEventBundledRelationship();
}
if (Thread.hasServerSideSupport && bundledRelationship) { if (Thread.hasServerSideSupport && bundledRelationship) {
this.replyCount = bundledRelationship.count; this.replyCount = bundledRelationship.count;
@ -236,6 +277,8 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
this.fetchEditsWhereNeeded(event); this.fetchEditsWhereNeeded(event);
} }
this.emit(ThreadEvent.Update, this);
} }
// XXX: Workaround for https://github.com/matrix-org/matrix-spec-proposals/pull/2676/files#r827240084 // XXX: Workaround for https://github.com/matrix-org/matrix-spec-proposals/pull/2676/files#r827240084
@ -253,24 +296,10 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
})); }));
} }
public async fetchInitialEvents(): Promise<{ public async fetchInitialEvents(): Promise<void> {
originalEvent: MatrixEvent; if (this.initialEventsFetched) return;
events: MatrixEvent[]; await this.fetchEvents();
nextBatch?: string; this.initialEventsFetched = true;
prevBatch?: string;
} | null> {
if (!Thread.hasServerSideSupport) {
this.initialEventsFetched = true;
return null;
}
try {
const response = await this.fetchEvents();
this.initialEventsFetched = true;
return response;
} catch (e) {
return null;
}
} }
private setEventMetadata(event: MatrixEvent): void { private setEventMetadata(event: MatrixEvent): void {
@ -319,7 +348,7 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
* A getter for the last event added to the thread * A getter for the last event added to the thread
*/ */
public get replyToEvent(): MatrixEvent { public get replyToEvent(): MatrixEvent {
return this.lastEvent; return this.lastEvent ?? this.lastReply();
} }
public get events(): MatrixEvent[] { public get events(): MatrixEvent[] {
@ -338,7 +367,7 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
return this.timelineSet.getLiveTimeline(); return this.timelineSet.getLiveTimeline();
} }
public async fetchEvents(opts: IRelationsRequestOpts = { limit: 20 }): Promise<{ public async fetchEvents(opts: IRelationsRequestOpts = { limit: 20, direction: Direction.Backward }): Promise<{
originalEvent: MatrixEvent; originalEvent: MatrixEvent;
events: MatrixEvent[]; events: MatrixEvent[];
nextBatch?: string; nextBatch?: string;
@ -370,7 +399,7 @@ export class Thread extends TypedEventEmitter<EmittedEvents, EventHandlerMap> {
return this.client.decryptEventIfNeeded(event); return this.client.decryptEventIfNeeded(event);
})); }));
const prependEvents = !opts.direction || opts.direction === Direction.Backward; const prependEvents = (opts.direction ?? Direction.Backward) === Direction.Backward;
this.timelineSet.addEventsToTimeline( this.timelineSet.addEventsToTimeline(
events, events,