You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-11-25 05:23:13 +03:00
Improved threads reliability with/without server side support (#2132)
This commit is contained in:
@@ -811,7 +811,7 @@ export class MatrixClient extends EventEmitter {
|
||||
// TODO: This should expire: https://github.com/matrix-org/matrix-js-sdk/issues/1020
|
||||
protected serverVersionsPromise: Promise<IServerVersions>;
|
||||
|
||||
protected cachedCapabilities: {
|
||||
public cachedCapabilities: {
|
||||
capabilities: ICapabilities;
|
||||
expiration: number;
|
||||
};
|
||||
@@ -5055,7 +5055,7 @@ export class MatrixClient extends EventEmitter {
|
||||
limit,
|
||||
Direction.Backward,
|
||||
);
|
||||
}).then((res: IMessagesResponse) => {
|
||||
}).then(async (res: IMessagesResponse) => {
|
||||
const matrixEvents = res.chunk.map(this.getEventMapper());
|
||||
if (res.state) {
|
||||
const stateEvents = res.state.map(this.getEventMapper());
|
||||
@@ -5065,7 +5065,7 @@ export class MatrixClient extends EventEmitter {
|
||||
const [timelineEvents, threadedEvents] = this.partitionThreadedEvents(matrixEvents);
|
||||
|
||||
room.addEventsToTimeline(timelineEvents, true, room.getLiveTimeline());
|
||||
this.processThreadEvents(room, threadedEvents);
|
||||
await this.processThreadEvents(room, threadedEvents);
|
||||
|
||||
room.oldState.paginationToken = res.end;
|
||||
if (res.chunk.length === 0) {
|
||||
@@ -5143,7 +5143,7 @@ export class MatrixClient extends EventEmitter {
|
||||
|
||||
// TODO: we should implement a backoff (as per scrollback()) to deal more
|
||||
// nicely with HTTP errors.
|
||||
const promise = this.http.authedRequest<any>(undefined, Method.Get, path, params).then((res) => { // TODO types
|
||||
const promise = this.http.authedRequest<any>(undefined, Method.Get, path, params).then(async (res) => { // TODO types
|
||||
if (!res.event) {
|
||||
throw new Error("'event' not in '/context' result - homeserver too old?");
|
||||
}
|
||||
@@ -5176,7 +5176,7 @@ export class MatrixClient extends EventEmitter {
|
||||
const [timelineEvents, threadedEvents] = this.partitionThreadedEvents(matrixEvents);
|
||||
|
||||
timelineSet.addEventsToTimeline(timelineEvents, true, timeline, res.start);
|
||||
this.processThreadEvents(timelineSet.room, threadedEvents);
|
||||
await this.processThreadEvents(timelineSet.room, threadedEvents);
|
||||
|
||||
// there is no guarantee that the event ended up in "timeline" (we
|
||||
// might have switched to a neighbouring timeline) - so check the
|
||||
@@ -5291,7 +5291,7 @@ export class MatrixClient extends EventEmitter {
|
||||
|
||||
promise = this.http.authedRequest<any>( // TODO types
|
||||
undefined, Method.Get, path, params, undefined,
|
||||
).then((res) => {
|
||||
).then(async (res) => {
|
||||
const token = res.next_token;
|
||||
const matrixEvents = [];
|
||||
|
||||
@@ -5309,7 +5309,7 @@ export class MatrixClient extends EventEmitter {
|
||||
|
||||
const timelineSet = eventTimeline.getTimelineSet();
|
||||
timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
|
||||
this.processThreadEvents(timelineSet.room, threadedEvents);
|
||||
await this.processThreadEvents(timelineSet.room, threadedEvents);
|
||||
|
||||
// if we've hit the end of the timeline, we need to stop trying to
|
||||
// paginate. We need to keep the 'forwards' token though, to make sure
|
||||
@@ -5334,7 +5334,7 @@ export class MatrixClient extends EventEmitter {
|
||||
opts.limit,
|
||||
dir,
|
||||
eventTimeline.getFilter(),
|
||||
).then((res) => {
|
||||
).then(async (res) => {
|
||||
if (res.state) {
|
||||
const roomState = eventTimeline.getState(dir);
|
||||
const stateEvents = res.state.map(this.getEventMapper());
|
||||
@@ -5347,7 +5347,7 @@ export class MatrixClient extends EventEmitter {
|
||||
|
||||
eventTimeline.getTimelineSet()
|
||||
.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
|
||||
this.processThreadEvents(room, threadedEvents);
|
||||
await this.processThreadEvents(room, threadedEvents);
|
||||
|
||||
// if we've hit the end of the timeline, we need to stop trying to
|
||||
// paginate. We need to keep the 'forwards' token though, to make sure
|
||||
@@ -9067,7 +9067,10 @@ export class MatrixClient extends EventEmitter {
|
||||
const parentEvent = room?.findEventById(parentEventId) || events.find((mxEv: MatrixEvent) => {
|
||||
return mxEv.getId() === parentEventId;
|
||||
});
|
||||
shouldLiveInThreadTimeline = parentEvent?.isThreadRelation;
|
||||
if (parentEvent?.isThreadRelation) {
|
||||
shouldLiveInThreadTimeline = true;
|
||||
event.setThreadId(parentEvent.threadRootId);
|
||||
}
|
||||
|
||||
// Copy all the reactions and annotations to the root event
|
||||
// to the thread timeline. They will end up living in both
|
||||
@@ -9094,12 +9097,11 @@ export class MatrixClient extends EventEmitter {
|
||||
/**
|
||||
* @experimental
|
||||
*/
|
||||
public processThreadEvents(room: Room, threadedEvents: MatrixEvent[]): void {
|
||||
threadedEvents
|
||||
.sort((a, b) => a.getTs() - b.getTs())
|
||||
.forEach(event => {
|
||||
room.addThreadedEvent(event);
|
||||
});
|
||||
public async processThreadEvents(room: Room, threadedEvents: MatrixEvent[]): Promise<void> {
|
||||
threadedEvents.sort((a, b) => a.getTs() - b.getTs());
|
||||
for (const event of threadedEvents) {
|
||||
await room.addThreadedEvent(event);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -91,6 +91,13 @@ export interface IUnsigned {
|
||||
redacted_because?: IEvent;
|
||||
transaction_id?: string;
|
||||
invite_room_state?: StrippedState[];
|
||||
"m.relations"?: Record<RelationType | string, any>; // No common pattern for aggregated relations
|
||||
}
|
||||
|
||||
export interface IThreadBundledRelationship {
|
||||
latest_event: IEvent;
|
||||
count: number;
|
||||
current_user_participated?: boolean;
|
||||
}
|
||||
|
||||
export interface IEvent {
|
||||
@@ -112,7 +119,7 @@ export interface IEvent {
|
||||
age?: number;
|
||||
}
|
||||
|
||||
interface IAggregatedRelation {
|
||||
export interface IAggregatedRelation {
|
||||
origin_server_ts: number;
|
||||
event_id?: string;
|
||||
sender?: string;
|
||||
@@ -262,6 +269,7 @@ export class MatrixEvent extends EventEmitter {
|
||||
* A reference to the thread this event belongs to
|
||||
*/
|
||||
private thread: Thread = null;
|
||||
private threadId: string;
|
||||
|
||||
/* Set an approximate timestamp for the event relative the local clock.
|
||||
* This will inherently be approximate because it doesn't take into account
|
||||
@@ -499,10 +507,13 @@ export class MatrixEvent extends EventEmitter {
|
||||
* @experimental
|
||||
* Get the event ID of the thread head
|
||||
*/
|
||||
public get threadRootId(): string {
|
||||
public get threadRootId(): string | undefined {
|
||||
const relatesTo = this.getWireContent()?.["m.relates_to"];
|
||||
if (relatesTo?.rel_type === RelationType.Thread) {
|
||||
return relatesTo.event_id;
|
||||
} else {
|
||||
return this.threadId
|
||||
|| this.getThread()?.id;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -510,17 +521,20 @@ export class MatrixEvent extends EventEmitter {
|
||||
* @experimental
|
||||
*/
|
||||
public get isThreadRelation(): boolean {
|
||||
return !!this.threadRootId;
|
||||
return !!this.threadRootId && this.threadId !== this.getId();
|
||||
}
|
||||
|
||||
/**
|
||||
* @experimental
|
||||
*/
|
||||
public get isThreadRoot(): boolean {
|
||||
// TODO, change the inner working of this getter for it to use the
|
||||
// bundled relationship return on the event, view MSC3440
|
||||
const thread = this.getThread();
|
||||
return thread?.id === this.getId();
|
||||
const threadDetails = this
|
||||
.getServerAggregatedRelation<IThreadBundledRelationship>(RelationType.Thread);
|
||||
|
||||
// Bundled relationships only returned when the sync response is limited
|
||||
// hence us having to check both bundled relation and inspect the thread
|
||||
// model
|
||||
return !!threadDetails || (this.getThread()?.id === this.getId());
|
||||
}
|
||||
|
||||
public get parentEventId(): string {
|
||||
@@ -1000,6 +1014,10 @@ export class MatrixEvent extends EventEmitter {
|
||||
return this.event.unsigned || {};
|
||||
}
|
||||
|
||||
public setUnsigned(unsigned: IUnsigned): void {
|
||||
this.event.unsigned = unsigned;
|
||||
}
|
||||
|
||||
public unmarkLocallyRedacted(): boolean {
|
||||
const value = this._localRedactionEvent;
|
||||
this._localRedactionEvent = null;
|
||||
@@ -1340,11 +1358,8 @@ export class MatrixEvent extends EventEmitter {
|
||||
return this.status;
|
||||
}
|
||||
|
||||
public getServerAggregatedRelation(relType: RelationType): IAggregatedRelation {
|
||||
const relations = this.getUnsigned()["m.relations"];
|
||||
if (relations) {
|
||||
return relations[relType];
|
||||
}
|
||||
public getServerAggregatedRelation<T>(relType: RelationType): T | undefined {
|
||||
return this.getUnsigned()["m.relations"]?.[relType];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1353,7 +1368,7 @@ export class MatrixEvent extends EventEmitter {
|
||||
* @return {string?}
|
||||
*/
|
||||
public replacingEventId(): string | undefined {
|
||||
const replaceRelation = this.getServerAggregatedRelation(RelationType.Replace);
|
||||
const replaceRelation = this.getServerAggregatedRelation<IAggregatedRelation>(RelationType.Replace);
|
||||
if (replaceRelation) {
|
||||
return replaceRelation.event_id;
|
||||
} else if (this._replacingEvent) {
|
||||
@@ -1378,7 +1393,7 @@ export class MatrixEvent extends EventEmitter {
|
||||
* @return {Date?}
|
||||
*/
|
||||
public replacingEventDate(): Date | undefined {
|
||||
const replaceRelation = this.getServerAggregatedRelation(RelationType.Replace);
|
||||
const replaceRelation = this.getServerAggregatedRelation<IAggregatedRelation>(RelationType.Replace);
|
||||
if (replaceRelation) {
|
||||
const ts = replaceRelation.origin_server_ts;
|
||||
if (Number.isFinite(ts)) {
|
||||
@@ -1544,9 +1559,13 @@ export class MatrixEvent extends EventEmitter {
|
||||
/**
|
||||
* @experimental
|
||||
*/
|
||||
public getThread(): Thread {
|
||||
public getThread(): Thread | undefined {
|
||||
return this.thread;
|
||||
}
|
||||
|
||||
public setThreadId(threadId: string): void {
|
||||
this.threadId = threadId;
|
||||
}
|
||||
}
|
||||
|
||||
/* REDACT_KEEP_KEYS gives the keys we keep when an event is redacted
|
||||
|
||||
@@ -16,7 +16,7 @@ limitations under the License.
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
|
||||
import { EventStatus, MatrixEvent } from './event';
|
||||
import { EventStatus, MatrixEvent, IAggregatedRelation } from './event';
|
||||
import { Room } from './room';
|
||||
import { logger } from '../logger';
|
||||
import { RelationType } from "../@types/event";
|
||||
@@ -319,7 +319,7 @@ export class Relations extends EventEmitter {
|
||||
|
||||
// the all-knowning server tells us that the event at some point had
|
||||
// this timestamp for its replacement, so any following replacement should definitely not be less
|
||||
const replaceRelation = this.targetEvent.getServerAggregatedRelation(RelationType.Replace);
|
||||
const replaceRelation = this.targetEvent.getServerAggregatedRelation<IAggregatedRelation>(RelationType.Replace);
|
||||
const minTs = replaceRelation && replaceRelation.origin_server_ts;
|
||||
|
||||
const lastReplacement = this.getRelations().reduce((last, event) => {
|
||||
|
||||
@@ -1371,9 +1371,11 @@ export class Room extends EventEmitter {
|
||||
let rootEvent = this.findEventById(event.threadRootId);
|
||||
// If the rootEvent does not exist in the current sync, then look for
|
||||
// it over the network
|
||||
const eventData = await this.client.fetchRoomEvent(this.roomId, event.threadRootId);
|
||||
if (!rootEvent) {
|
||||
const eventData = await this.client.fetchRoomEvent(this.roomId, event.threadRootId);
|
||||
rootEvent = new MatrixEvent(eventData);
|
||||
} else {
|
||||
rootEvent.setUnsigned(eventData.unsigned);
|
||||
}
|
||||
events.unshift(rootEvent);
|
||||
thread = this.createThread(events);
|
||||
@@ -1563,8 +1565,7 @@ export class Room extends EventEmitter {
|
||||
}
|
||||
} else {
|
||||
if (thread) {
|
||||
thread.timelineSet.addEventToTimeline(event,
|
||||
thread.timelineSet.getLiveTimeline(), false);
|
||||
thread.addEvent(event, false);
|
||||
} else {
|
||||
for (let i = 0; i < this.timelineSets.length; i++) {
|
||||
const timelineSet = this.timelineSets[i];
|
||||
|
||||
@@ -16,7 +16,8 @@ limitations under the License.
|
||||
|
||||
import { MatrixClient } from "../matrix";
|
||||
import { ReEmitter } from "../ReEmitter";
|
||||
import { MatrixEvent } from "./event";
|
||||
import { RelationType } from "../@types/event";
|
||||
import { MatrixEvent, IThreadBundledRelationship } from "./event";
|
||||
import { EventTimeline } from "./event-timeline";
|
||||
import { EventTimelineSet } from './event-timeline-set';
|
||||
import { Room } from './room';
|
||||
@@ -47,6 +48,9 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
|
||||
private reEmitter: ReEmitter;
|
||||
|
||||
private lastEvent: MatrixEvent;
|
||||
private replyCount = 0;
|
||||
|
||||
constructor(
|
||||
events: MatrixEvent[] = [],
|
||||
public readonly room: Room,
|
||||
@@ -76,6 +80,11 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
room.on("Room.timeline", this.onEcho);
|
||||
}
|
||||
|
||||
public get hasServerSideSupport(): boolean {
|
||||
return this.client.cachedCapabilities
|
||||
?.capabilities?.[RelationType.Thread]?.enabled;
|
||||
}
|
||||
|
||||
onEcho = (event: MatrixEvent) => {
|
||||
if (this.timelineSet.eventIdToTimeline(event.getId())) {
|
||||
this.emit(ThreadEvent.Update, this);
|
||||
@@ -89,7 +98,7 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
* @param event The event to add
|
||||
*/
|
||||
public async addEvent(event: MatrixEvent, toStartOfTimeline = false): Promise<void> {
|
||||
if (this.timelineSet.findEventById(event.getId()) || event.status !== null) {
|
||||
if (this.timelineSet.findEventById(event.getId())) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -121,11 +130,46 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
}
|
||||
|
||||
await this.client.decryptEventIfNeeded(event, {});
|
||||
this.emit(ThreadEvent.Update, this);
|
||||
|
||||
if (event.isThreadRelation) {
|
||||
this.emit(ThreadEvent.NewReply, this, event);
|
||||
const isThreadReply = event.getRelation()?.rel_type === RelationType.Thread;
|
||||
// If no thread support exists we want to count all thread relation
|
||||
// added as a reply. We can't rely on the bundled relationships count
|
||||
if (!this.hasServerSideSupport && isThreadReply) {
|
||||
this.replyCount++;
|
||||
}
|
||||
|
||||
if (!this.lastEvent || (isThreadReply && event.getTs() > this.lastEvent.getTs())) {
|
||||
this.lastEvent = event;
|
||||
if (this.lastEvent.getId() !== this.root) {
|
||||
// This counting only works when server side support is enabled
|
||||
// as we started the counting from the value returned in the
|
||||
// bundled relationship
|
||||
if (this.hasServerSideSupport) {
|
||||
this.replyCount++;
|
||||
}
|
||||
this.emit(ThreadEvent.NewReply, this, event);
|
||||
}
|
||||
}
|
||||
|
||||
if (event.getId() === this.root) {
|
||||
const bundledRelationship = event
|
||||
.getServerAggregatedRelation<IThreadBundledRelationship>(RelationType.Thread);
|
||||
|
||||
if (this.hasServerSideSupport && bundledRelationship) {
|
||||
this.replyCount = bundledRelationship.count;
|
||||
this._currentUserParticipated = bundledRelationship.current_user_participated;
|
||||
|
||||
const lastReply = this.findEventById(bundledRelationship.latest_event.event_id);
|
||||
if (lastReply) {
|
||||
this.lastEvent = lastReply;
|
||||
} else {
|
||||
const event = new MatrixEvent(bundledRelationship.latest_event);
|
||||
this.lastEvent = event;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.emit(ThreadEvent.Update, this);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -171,17 +215,14 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
|
||||
* exclude annotations from that number
|
||||
*/
|
||||
public get length(): number {
|
||||
return this.events
|
||||
.filter(event => event.isThreadRelation)
|
||||
.length;
|
||||
return this.replyCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* A getter for the last event added to the thread
|
||||
*/
|
||||
public get replyToEvent(): MatrixEvent {
|
||||
const events = this.events;
|
||||
return events[events.length -1];
|
||||
return this.lastEvent;
|
||||
}
|
||||
|
||||
public get events(): MatrixEvent[] {
|
||||
|
||||
@@ -32,6 +32,7 @@ interface IOpts {
|
||||
export interface IMinimalEvent {
|
||||
content: IContent;
|
||||
type: EventType | string;
|
||||
unsigned?: IUnsigned;
|
||||
}
|
||||
|
||||
export interface IEphemeral {
|
||||
|
||||
12
src/sync.ts
12
src/sync.ts
@@ -284,7 +284,7 @@ export class SyncApi {
|
||||
leaveRooms = this.mapSyncResponseToRoomArray(data.rooms.leave);
|
||||
}
|
||||
const rooms = [];
|
||||
leaveRooms.forEach((leaveObj) => {
|
||||
leaveRooms.forEach(async (leaveObj) => {
|
||||
const room = leaveObj.room;
|
||||
rooms.push(room);
|
||||
if (!leaveObj.isBrandNewRoom) {
|
||||
@@ -310,7 +310,7 @@ export class SyncApi {
|
||||
EventTimeline.BACKWARDS);
|
||||
|
||||
this.processRoomEvents(room, stateEvents, timelineEvents);
|
||||
this.processThreadEvents(room, threadedEvents);
|
||||
await this.processThreadEvents(room, threadedEvents);
|
||||
|
||||
room.recalculate();
|
||||
client.store.storeRoom(room);
|
||||
@@ -1307,7 +1307,7 @@ export class SyncApi {
|
||||
const [timelineEvents, threadedEvents] = this.client.partitionThreadedEvents(events);
|
||||
|
||||
this.processRoomEvents(room, stateEvents, timelineEvents, syncEventData.fromCache);
|
||||
this.processThreadEvents(room, threadedEvents);
|
||||
await this.processThreadEvents(room, threadedEvents);
|
||||
|
||||
// set summary after processing events,
|
||||
// because it will trigger a name calculation
|
||||
@@ -1366,7 +1366,7 @@ export class SyncApi {
|
||||
});
|
||||
|
||||
// Handle leaves (e.g. kicked rooms)
|
||||
leaveRooms.forEach((leaveObj) => {
|
||||
leaveRooms.forEach(async (leaveObj) => {
|
||||
const room = leaveObj.room;
|
||||
const stateEvents = this.mapSyncEventsFormat(leaveObj.state, room);
|
||||
const events = this.mapSyncEventsFormat(leaveObj.timeline, room);
|
||||
@@ -1375,7 +1375,7 @@ export class SyncApi {
|
||||
const [timelineEvents, threadedEvents] = this.client.partitionThreadedEvents(events);
|
||||
|
||||
this.processRoomEvents(room, stateEvents, timelineEvents);
|
||||
this.processThreadEvents(room, threadedEvents);
|
||||
await this.processThreadEvents(room, threadedEvents);
|
||||
room.addAccountData(accountDataEvents);
|
||||
|
||||
room.recalculate();
|
||||
@@ -1720,7 +1720,7 @@ export class SyncApi {
|
||||
/**
|
||||
* @experimental
|
||||
*/
|
||||
private processThreadEvents(room: Room, threadedEvents: MatrixEvent[]): void {
|
||||
private processThreadEvents(room: Room, threadedEvents: MatrixEvent[]): Promise<void> {
|
||||
return this.client.processThreadEvents(room, threadedEvents);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user