You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-11-25 05:23:13 +03:00
Refactor thread model to be created from the root event (#2142)
This commit is contained in:
@@ -19,6 +19,7 @@ import { IContent, IEvent } from "../models/event";
|
||||
import { Preset, Visibility } from "./partials";
|
||||
import { SearchKey } from "./search";
|
||||
import { IRoomEventFilter } from "../filter";
|
||||
import { Direction } from "../models/event-timeline";
|
||||
|
||||
// allow camelcase as these are things that go onto the wire
|
||||
/* eslint-disable camelcase */
|
||||
@@ -144,6 +145,7 @@ export interface IRelationsRequestOpts {
|
||||
from?: string;
|
||||
to?: string;
|
||||
limit?: number;
|
||||
direction?: Direction;
|
||||
}
|
||||
|
||||
export interface IRelationsResponse {
|
||||
|
||||
@@ -277,7 +277,7 @@ export class MatrixEvent extends EventEmitter {
|
||||
* it to us and the time we're now constructing this event, but that's better
|
||||
* than assuming the local clock is in sync with the origin HS's clock.
|
||||
*/
|
||||
private readonly localTimestamp: number;
|
||||
public readonly localTimestamp: number;
|
||||
|
||||
// XXX: these should be read-only
|
||||
public sender: RoomMember = null;
|
||||
|
||||
@@ -1377,8 +1377,7 @@ export class Room extends EventEmitter {
|
||||
} else {
|
||||
rootEvent.setUnsigned(eventData.unsigned);
|
||||
}
|
||||
events.unshift(rootEvent);
|
||||
thread = this.createThread(events);
|
||||
thread = this.createThread(rootEvent, events);
|
||||
}
|
||||
|
||||
if (event.getUnsigned().transaction_id) {
|
||||
@@ -1393,8 +1392,12 @@ export class Room extends EventEmitter {
|
||||
this.emit(ThreadEvent.Update, thread);
|
||||
}
|
||||
|
||||
public createThread(events: MatrixEvent[]): Thread {
|
||||
const thread = new Thread(events, this, this.client);
|
||||
public createThread(rootEvent: MatrixEvent, events?: MatrixEvent[]): Thread {
|
||||
const thread = new Thread(rootEvent, {
|
||||
initialEvents: events,
|
||||
room: this,
|
||||
client: this.client,
|
||||
});
|
||||
this.threads.set(thread.id, thread);
|
||||
this.reEmitter.reEmit(thread, [
|
||||
ThreadEvent.Update,
|
||||
|
||||
@@ -17,11 +17,13 @@ limitations under the License.
|
||||
import { MatrixClient } from "../matrix";
|
||||
import { ReEmitter } from "../ReEmitter";
|
||||
import { RelationType } from "../@types/event";
|
||||
import { IRelationsRequestOpts } from "../@types/requests";
|
||||
import { MatrixEvent, IThreadBundledRelationship } from "./event";
|
||||
import { EventTimeline } from "./event-timeline";
|
||||
import { Direction, EventTimeline } from "./event-timeline";
|
||||
import { EventTimelineSet } from './event-timeline-set';
|
||||
import { Room } from './room';
|
||||
import { TypedEventEmitter } from "./typed-event-emitter";
|
||||
import { RoomState } from "./room-state";
|
||||
|
||||
export enum ThreadEvent {
|
||||
New = "Thread.new",
|
||||
@@ -31,14 +33,16 @@ export enum ThreadEvent {
|
||||
ViewThread = "Thred.viewThread",
|
||||
}
|
||||
|
||||
interface IThreadOpts {
|
||||
initialEvents?: MatrixEvent[];
|
||||
room: Room;
|
||||
client: MatrixClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* @experimental
|
||||
*/
|
||||
export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
/**
|
||||
* A reference to the event ID at the top of the thread
|
||||
*/
|
||||
private root: string;
|
||||
/**
|
||||
* A reference to all the events ID at the bottom of the threads
|
||||
*/
|
||||
@@ -51,33 +55,37 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
private lastEvent: MatrixEvent;
|
||||
private replyCount = 0;
|
||||
|
||||
public readonly room: Room;
|
||||
public readonly client: MatrixClient;
|
||||
|
||||
public initialEventsFetched = false;
|
||||
|
||||
constructor(
|
||||
events: MatrixEvent[] = [],
|
||||
public readonly room: Room,
|
||||
public readonly client: MatrixClient,
|
||||
public readonly rootEvent: MatrixEvent,
|
||||
opts: IThreadOpts,
|
||||
) {
|
||||
super();
|
||||
if (events.length === 0) {
|
||||
throw new Error("Can't create an empty thread");
|
||||
}
|
||||
|
||||
this.reEmitter = new ReEmitter(this);
|
||||
|
||||
this.room = opts.room;
|
||||
this.client = opts.client;
|
||||
this.timelineSet = new EventTimelineSet(this.room, {
|
||||
unstableClientRelationAggregation: true,
|
||||
timelineSupport: true,
|
||||
pendingEvents: true,
|
||||
});
|
||||
this.reEmitter = new ReEmitter(this);
|
||||
|
||||
this.initialiseThread(this.rootEvent);
|
||||
|
||||
this.reEmitter.reEmit(this.timelineSet, [
|
||||
"Room.timeline",
|
||||
"Room.timelineReset",
|
||||
]);
|
||||
|
||||
events.forEach(event => this.addEvent(event));
|
||||
opts?.initialEvents.forEach(event => this.addEvent(event));
|
||||
|
||||
room.on("Room.localEchoUpdated", this.onEcho);
|
||||
room.on("Room.timeline", this.onEcho);
|
||||
this.room.on("Room.localEchoUpdated", this.onEcho);
|
||||
this.room.on("Room.timeline", this.onEcho);
|
||||
}
|
||||
|
||||
public get hasServerSideSupport(): boolean {
|
||||
@@ -91,6 +99,10 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
}
|
||||
};
|
||||
|
||||
private get roomState(): RoomState {
|
||||
return this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an event to the thread and updates
|
||||
* the tail/root references if needed
|
||||
@@ -98,39 +110,46 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
* @param event The event to add
|
||||
*/
|
||||
public async addEvent(event: MatrixEvent, toStartOfTimeline = false): Promise<void> {
|
||||
if (this.timelineSet.findEventById(event.getId())) {
|
||||
return;
|
||||
// Add all incoming events to the thread's timeline set when there's
|
||||
// no server support
|
||||
if (!this.hasServerSideSupport) {
|
||||
if (this.timelineSet.findEventById(event.getId())) {
|
||||
return;
|
||||
}
|
||||
|
||||
// all the relevant membership info to hydrate events with a sender
|
||||
// is held in the main room timeline
|
||||
// We want to fetch the room state from there and pass it down to this thread
|
||||
// timeline set to let it reconcile an event with its relevant RoomMember
|
||||
|
||||
event.setThread(this);
|
||||
this.timelineSet.addEventToTimeline(
|
||||
event,
|
||||
this.liveTimeline,
|
||||
toStartOfTimeline,
|
||||
false,
|
||||
this.roomState,
|
||||
);
|
||||
|
||||
await this.client.decryptEventIfNeeded(event, {});
|
||||
}
|
||||
|
||||
if (!this.root) {
|
||||
if (event.isThreadRelation) {
|
||||
this.root = event.threadRootId;
|
||||
} else {
|
||||
this.root = event.getId();
|
||||
if (this.hasServerSideSupport && this.initialEventsFetched) {
|
||||
if (event.localTimestamp > this.lastReply().localTimestamp && !this.findEventById(event.getId())) {
|
||||
this.timelineSet.addEventToTimeline(
|
||||
event,
|
||||
this.liveTimeline,
|
||||
false,
|
||||
false,
|
||||
this.roomState,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// all the relevant membership info to hydrate events with a sender
|
||||
// is held in the main room timeline
|
||||
// We want to fetch the room state from there and pass it down to this thread
|
||||
// timeline set to let it reconcile an event with its relevant RoomMember
|
||||
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
|
||||
|
||||
event.setThread(this);
|
||||
this.timelineSet.addEventToTimeline(
|
||||
event,
|
||||
this.timelineSet.getLiveTimeline(),
|
||||
toStartOfTimeline,
|
||||
false,
|
||||
roomState,
|
||||
);
|
||||
|
||||
if (!this._currentUserParticipated && event.getSender() === this.client.getUserId()) {
|
||||
this._currentUserParticipated = true;
|
||||
}
|
||||
|
||||
await this.client.decryptEventIfNeeded(event, {});
|
||||
|
||||
const isThreadReply = event.getRelation()?.rel_type === RelationType.Thread;
|
||||
// If no thread support exists we want to count all thread relation
|
||||
// added as a reply. We can't rely on the bundled relationships count
|
||||
@@ -138,38 +157,57 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
this.replyCount++;
|
||||
}
|
||||
|
||||
if (!this.lastEvent || (isThreadReply && event.getTs() > this.lastEvent.getTs())) {
|
||||
// There is a risk that the `localTimestamp` approximation will not be accurate
|
||||
// when threads are used over federation. That could results in the reply
|
||||
// count value drifting away from the value returned by the server
|
||||
if (!this.lastEvent || (isThreadReply && event.localTimestamp > this.replyToEvent.localTimestamp)) {
|
||||
this.lastEvent = event;
|
||||
if (this.lastEvent.getId() !== this.root) {
|
||||
if (this.lastEvent.getId() !== this.id) {
|
||||
// This counting only works when server side support is enabled
|
||||
// as we started the counting from the value returned in the
|
||||
// bundled relationship
|
||||
if (this.hasServerSideSupport) {
|
||||
this.replyCount++;
|
||||
}
|
||||
|
||||
this.emit(ThreadEvent.NewReply, this, event);
|
||||
}
|
||||
}
|
||||
|
||||
if (event.getId() === this.root) {
|
||||
const bundledRelationship = event
|
||||
.getServerAggregatedRelation<IThreadBundledRelationship>(RelationType.Thread);
|
||||
this.emit(ThreadEvent.Update, this);
|
||||
}
|
||||
|
||||
if (this.hasServerSideSupport && bundledRelationship) {
|
||||
this.replyCount = bundledRelationship.count;
|
||||
this._currentUserParticipated = bundledRelationship.current_user_participated;
|
||||
private initialiseThread(rootEvent: MatrixEvent): void {
|
||||
const bundledRelationship = rootEvent
|
||||
.getServerAggregatedRelation<IThreadBundledRelationship>(RelationType.Thread);
|
||||
|
||||
const lastReply = this.findEventById(bundledRelationship.latest_event.event_id);
|
||||
if (lastReply) {
|
||||
this.lastEvent = lastReply;
|
||||
} else {
|
||||
const event = new MatrixEvent(bundledRelationship.latest_event);
|
||||
this.lastEvent = event;
|
||||
}
|
||||
}
|
||||
if (this.hasServerSideSupport && bundledRelationship) {
|
||||
this.replyCount = bundledRelationship.count;
|
||||
this._currentUserParticipated = bundledRelationship.current_user_participated;
|
||||
|
||||
const event = new MatrixEvent(bundledRelationship.latest_event);
|
||||
this.setEventMetadata(event);
|
||||
this.lastEvent = event;
|
||||
}
|
||||
|
||||
this.emit(ThreadEvent.Update, this);
|
||||
if (!bundledRelationship) {
|
||||
this.addEvent(rootEvent);
|
||||
}
|
||||
}
|
||||
|
||||
public async fetchInitialEvents(): Promise<boolean> {
|
||||
try {
|
||||
await this.fetchEvents();
|
||||
this.initialEventsFetched = true;
|
||||
return true;
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private setEventMetadata(event: MatrixEvent): void {
|
||||
EventTimeline.setEventMetadata(event, this.roomState, false);
|
||||
event.setThread(this);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -185,7 +223,7 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
public lastReply(matches: (ev: MatrixEvent) => boolean = () => true): MatrixEvent {
|
||||
for (let i = this.events.length - 1; i >= 0; i--) {
|
||||
const event = this.events[i];
|
||||
if (event.isThreadRelation && matches(event)) {
|
||||
if (matches(event)) {
|
||||
return event;
|
||||
}
|
||||
}
|
||||
@@ -195,14 +233,7 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
* The thread ID, which is the same as the root event ID
|
||||
*/
|
||||
public get id(): string {
|
||||
return this.root;
|
||||
}
|
||||
|
||||
/**
|
||||
* The thread root event
|
||||
*/
|
||||
public get rootEvent(): MatrixEvent {
|
||||
return this.findEventById(this.root);
|
||||
return this.rootEvent.getId();
|
||||
}
|
||||
|
||||
public get roomId(): string {
|
||||
@@ -226,14 +257,7 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
}
|
||||
|
||||
public get events(): MatrixEvent[] {
|
||||
return this.timelineSet.getLiveTimeline().getEvents();
|
||||
}
|
||||
|
||||
public merge(thread: Thread): void {
|
||||
thread.events.forEach(event => {
|
||||
this.addEvent(event);
|
||||
});
|
||||
this.events.forEach(event => event.setThread(this));
|
||||
return this.liveTimeline.getEvents();
|
||||
}
|
||||
|
||||
public has(eventId: string): boolean {
|
||||
@@ -243,4 +267,55 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
public get hasCurrentUserParticipated(): boolean {
|
||||
return this._currentUserParticipated;
|
||||
}
|
||||
|
||||
public get liveTimeline(): EventTimeline {
|
||||
return this.timelineSet.getLiveTimeline();
|
||||
}
|
||||
|
||||
public async fetchEvents(opts: IRelationsRequestOpts = { limit: 20 }): Promise<{
|
||||
originalEvent: MatrixEvent;
|
||||
events: MatrixEvent[];
|
||||
nextBatch?: string;
|
||||
prevBatch?: string;
|
||||
}> {
|
||||
let {
|
||||
originalEvent,
|
||||
events,
|
||||
prevBatch,
|
||||
nextBatch,
|
||||
} = await this.client.relations(
|
||||
this.room.roomId,
|
||||
this.id,
|
||||
RelationType.Thread,
|
||||
null,
|
||||
opts,
|
||||
);
|
||||
|
||||
// When there's no nextBatch returned with a `from` request we have reached
|
||||
// the end of the thread, and therefore want to return an empty one
|
||||
if (!opts.to && !nextBatch) {
|
||||
events = [originalEvent, ...events];
|
||||
}
|
||||
|
||||
for (const event of events) {
|
||||
await this.client.decryptEventIfNeeded(event);
|
||||
this.setEventMetadata(event);
|
||||
}
|
||||
|
||||
const prependEvents = !opts.direction || opts.direction === Direction.Backward;
|
||||
|
||||
this.timelineSet.addEventsToTimeline(
|
||||
events,
|
||||
prependEvents,
|
||||
this.liveTimeline,
|
||||
prependEvents ? nextBatch : prevBatch,
|
||||
);
|
||||
|
||||
return {
|
||||
originalEvent,
|
||||
events,
|
||||
prevBatch,
|
||||
nextBatch,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user