From 687d08dc9d9c618c628ef59031fee2f8f81554cd Mon Sep 17 00:00:00 2001 From: Andrew Ferrazzutti Date: Tue, 30 Jul 2024 08:43:25 -0400 Subject: [PATCH] Support MSC4140: Delayed events (#4294) and use them for more reliable MatrixRTC session membership events. Also implement "parent" delayed events, which were in a previous version of the MSC and may be reintroduced or be part of a new MSC later. NOTE: Still missing is support for sending encrypted delayed events. --- spec/unit/matrix-client.spec.ts | 327 ++++++++++++++++++- spec/unit/matrixrtc/MatrixRTCSession.spec.ts | 86 ++++- src/@types/requests.ts | 50 +++ src/client.ts | 244 ++++++++++++-- src/embedded.ts | 17 +- src/matrixrtc/MatrixRTCSession.ts | 49 ++- 6 files changed, 715 insertions(+), 58 deletions(-) diff --git a/spec/unit/matrix-client.spec.ts b/spec/unit/matrix-client.spec.ts index fa9cd7769..3babb4574 100644 --- a/spec/unit/matrix-client.spec.ts +++ b/spec/unit/matrix-client.spec.ts @@ -57,6 +57,7 @@ import { Room, RuleId, TweakName, + UpdateDelayedEventAction, } from "../../src"; import { supportsMatrixCall } from "../../src/webrtc/call"; import { makeBeaconEvent } from "../test-utils/beacon"; @@ -97,7 +98,7 @@ type HttpLookup = { method: string; path: string; prefix?: string; - data?: Record; + data?: Record | Record[]; error?: object; expectBody?: Record; expectQueryParams?: QueryDict; @@ -704,6 +705,328 @@ describe("MatrixClient", function () { }); }); + describe("_unstable_sendDelayedEvent", () => { + const unstableMSC4140Prefix = `${ClientPrefix.Unstable}/org.matrix.msc4140`; + + const roomId = "!room:example.org"; + const body = "This is the body"; + const content = { body, msgtype: MsgType.Text } satisfies RoomMessageEventContent; + const timeoutDelayOpts = { delay: 2000 }; + const realTimeoutDelayOpts = { "org.matrix.msc4140.delay": 2000 }; + + beforeEach(() => { + unstableFeatures["org.matrix.msc4140"] = true; + }); + + it("throws when unsupported by server", async () => { + unstableFeatures["org.matrix.msc4140"] = false; + const errorMessage = "Server does not support"; + + await expect( + client._unstable_sendDelayedEvent( + roomId, + timeoutDelayOpts, + null, + EventType.RoomMessage, + { ...content }, + client.makeTxnId(), + ), + ).rejects.toThrow(errorMessage); + + await expect( + client._unstable_sendDelayedStateEvent(roomId, timeoutDelayOpts, EventType.RoomTopic, { + topic: "topic", + }), + ).rejects.toThrow(errorMessage); + + await expect(client._unstable_getDelayedEvents()).rejects.toThrow(errorMessage); + + await expect( + client._unstable_updateDelayedEvent("anyDelayId", UpdateDelayedEventAction.Send), + ).rejects.toThrow(errorMessage); + }); + + it("works with null threadId", async () => { + httpLookups = []; + + const timeoutDelayTxnId = client.makeTxnId(); + httpLookups.push({ + method: "PUT", + path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${timeoutDelayTxnId}`, + expectQueryParams: realTimeoutDelayOpts, + data: { delay_id: "id1" }, + expectBody: content, + }); + + const { delay_id: timeoutDelayId } = await client._unstable_sendDelayedEvent( + roomId, + timeoutDelayOpts, + null, + EventType.RoomMessage, + { ...content }, + timeoutDelayTxnId, + ); + + const actionDelayTxnId = client.makeTxnId(); + httpLookups.push({ + method: "PUT", + path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${actionDelayTxnId}`, + expectQueryParams: { "org.matrix.msc4140.parent_delay_id": timeoutDelayId }, + data: { delay_id: "id2" }, + expectBody: content, + }); + + await client._unstable_sendDelayedEvent( + roomId, + { parent_delay_id: timeoutDelayId }, + null, + EventType.RoomMessage, + { ...content }, + actionDelayTxnId, + ); + }); + + it("works with non-null threadId", async () => { + httpLookups = []; + const threadId = "$threadId:server"; + const expectBody = { + ...content, + "m.relates_to": { + event_id: threadId, + is_falling_back: true, + rel_type: "m.thread", + }, + }; + + const timeoutDelayTxnId = client.makeTxnId(); + httpLookups.push({ + method: "PUT", + path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${timeoutDelayTxnId}`, + expectQueryParams: realTimeoutDelayOpts, + data: { delay_id: "id1" }, + expectBody, + }); + + const { delay_id: timeoutDelayId } = await client._unstable_sendDelayedEvent( + roomId, + timeoutDelayOpts, + threadId, + EventType.RoomMessage, + { ...content }, + timeoutDelayTxnId, + ); + + const actionDelayTxnId = client.makeTxnId(); + httpLookups.push({ + method: "PUT", + path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${actionDelayTxnId}`, + expectQueryParams: { "org.matrix.msc4140.parent_delay_id": timeoutDelayId }, + data: { delay_id: "id2" }, + expectBody, + }); + + await client._unstable_sendDelayedEvent( + roomId, + { parent_delay_id: timeoutDelayId }, + threadId, + EventType.RoomMessage, + { ...content }, + actionDelayTxnId, + ); + }); + + it("should add thread relation if threadId is passed and the relation is missing", async () => { + httpLookups = []; + const threadId = "$threadId:server"; + const expectBody = { + ...content, + "m.relates_to": { + "m.in_reply_to": { + event_id: threadId, + }, + "event_id": threadId, + "is_falling_back": true, + "rel_type": "m.thread", + }, + }; + + const room = new Room(roomId, client, userId); + mocked(store.getRoom).mockReturnValue(room); + + const rootEvent = new MatrixEvent({ event_id: threadId }); + room.createThread(threadId, rootEvent, [rootEvent], false); + + const timeoutDelayTxnId = client.makeTxnId(); + httpLookups.push({ + method: "PUT", + path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${timeoutDelayTxnId}`, + expectQueryParams: realTimeoutDelayOpts, + data: { delay_id: "id1" }, + expectBody, + }); + + const { delay_id: timeoutDelayId } = await client._unstable_sendDelayedEvent( + roomId, + timeoutDelayOpts, + threadId, + EventType.RoomMessage, + { ...content }, + timeoutDelayTxnId, + ); + + const actionDelayTxnId = client.makeTxnId(); + httpLookups.push({ + method: "PUT", + path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${actionDelayTxnId}`, + expectQueryParams: { "org.matrix.msc4140.parent_delay_id": timeoutDelayId }, + data: { delay_id: "id2" }, + expectBody, + }); + + await client._unstable_sendDelayedEvent( + roomId, + { parent_delay_id: timeoutDelayId }, + threadId, + EventType.RoomMessage, + { ...content }, + actionDelayTxnId, + ); + }); + + it("should add thread relation if threadId is passed and the relation is missing with reply", async () => { + httpLookups = []; + const threadId = "$threadId:server"; + + const content = { + body, + "msgtype": MsgType.Text, + "m.relates_to": { + "m.in_reply_to": { + event_id: "$other:event", + }, + }, + } satisfies RoomMessageEventContent; + const expectBody = { + ...content, + "m.relates_to": { + "m.in_reply_to": { + event_id: "$other:event", + }, + "event_id": threadId, + "is_falling_back": false, + "rel_type": "m.thread", + }, + }; + + const room = new Room(roomId, client, userId); + mocked(store.getRoom).mockReturnValue(room); + + const rootEvent = new MatrixEvent({ event_id: threadId }); + room.createThread(threadId, rootEvent, [rootEvent], false); + + const timeoutDelayTxnId = client.makeTxnId(); + httpLookups.push({ + method: "PUT", + path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${timeoutDelayTxnId}`, + expectQueryParams: realTimeoutDelayOpts, + data: { delay_id: "id1" }, + expectBody, + }); + + const { delay_id: timeoutDelayId } = await client._unstable_sendDelayedEvent( + roomId, + timeoutDelayOpts, + threadId, + EventType.RoomMessage, + { ...content }, + timeoutDelayTxnId, + ); + + const actionDelayTxnId = client.makeTxnId(); + httpLookups.push({ + method: "PUT", + path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${actionDelayTxnId}`, + expectQueryParams: { "org.matrix.msc4140.parent_delay_id": timeoutDelayId }, + data: { delay_id: "id2" }, + expectBody, + }); + + await client._unstable_sendDelayedEvent( + roomId, + { parent_delay_id: timeoutDelayId }, + threadId, + EventType.RoomMessage, + { ...content }, + actionDelayTxnId, + ); + }); + + it("can send a delayed state event", async () => { + httpLookups = []; + const content = { topic: "The year 2000" }; + + httpLookups.push({ + method: "PUT", + path: `/rooms/${encodeURIComponent(roomId)}/state/m.room.topic/`, + expectQueryParams: realTimeoutDelayOpts, + data: { delay_id: "id1" }, + expectBody: content, + }); + + const { delay_id: timeoutDelayId } = await client._unstable_sendDelayedStateEvent( + roomId, + timeoutDelayOpts, + EventType.RoomTopic, + { ...content }, + ); + + httpLookups.push({ + method: "PUT", + path: `/rooms/${encodeURIComponent(roomId)}/state/m.room.topic/`, + expectQueryParams: { "org.matrix.msc4140.parent_delay_id": timeoutDelayId }, + data: { delay_id: "id2" }, + expectBody: content, + }); + + await client._unstable_sendDelayedStateEvent( + roomId, + { parent_delay_id: timeoutDelayId }, + EventType.RoomTopic, + { ...content }, + ); + }); + + it("can look up delayed events", async () => { + httpLookups = [ + { + method: "GET", + prefix: unstableMSC4140Prefix, + path: "/delayed_events", + data: [], + }, + ]; + + await client._unstable_getDelayedEvents(); + }); + + it("can update delayed events", async () => { + const delayId = "id"; + const action = UpdateDelayedEventAction.Restart; + httpLookups = [ + { + method: "POST", + prefix: unstableMSC4140Prefix, + path: `/delayed_events/${encodeURIComponent(delayId)}`, + data: { + action, + }, + }, + ]; + + await client._unstable_updateDelayedEvent(delayId, action); + }); + }); + it("should create (unstable) file trees", async () => { const userId = "@test:example.org"; const roomId = "!room:example.org"; @@ -963,7 +1286,7 @@ describe("MatrixClient", function () { const filter = new Filter(client.credentials.userId); const filterId = await client.getOrCreateFilter(filterName, filter); - expect(filterId).toEqual(FILTER_RESPONSE.data?.filter_id); + expect(filterId).toEqual(!Array.isArray(FILTER_RESPONSE.data) && FILTER_RESPONSE.data?.filter_id); }); }); diff --git a/spec/unit/matrixrtc/MatrixRTCSession.spec.ts b/spec/unit/matrixrtc/MatrixRTCSession.spec.ts index 2b3916614..5d8b1d306 100644 --- a/spec/unit/matrixrtc/MatrixRTCSession.spec.ts +++ b/spec/unit/matrixrtc/MatrixRTCSession.spec.ts @@ -46,6 +46,9 @@ describe("MatrixRTCSession", () => { client = new MatrixClient({ baseUrl: "base_url" }); client.getUserId = jest.fn().mockReturnValue("@alice:example.org"); client.getDeviceId = jest.fn().mockReturnValue("AAAAAAA"); + client.doesServerSupportUnstableFeature = jest.fn((feature) => + Promise.resolve(feature === "org.matrix.msc4140"), + ); }); afterEach(() => { @@ -241,35 +244,61 @@ describe("MatrixRTCSession", () => { foci_preferred: [mockFocus], }; - function testSession( + let sendStateEventMock: jest.Mock; + let sendDelayedStateMock: jest.Mock; + + let sentStateEvent: Promise; + let sentDelayedState: Promise; + + beforeEach(() => { + sentStateEvent = new Promise((resolve) => { + sendStateEventMock = jest.fn(resolve); + }); + sentDelayedState = new Promise((resolve) => { + sendDelayedStateMock = jest.fn(() => { + resolve(); + return { + delay_id: "id", + }; + }); + }); + client.sendStateEvent = sendStateEventMock; + client._unstable_sendDelayedStateEvent = sendDelayedStateMock; + }); + + async function testSession( membershipData: CallMembershipData[] | SessionMembershipData, shouldUseLegacy: boolean, - ): void { + ): Promise { sess = MatrixRTCSession.roomSessionForRoom(client, makeMockRoom(membershipData)); const makeNewLegacyMembershipsMock = jest.spyOn(sess as any, "makeNewLegacyMemberships"); const makeNewMembershipMock = jest.spyOn(sess as any, "makeNewMembership"); sess.joinRoomSession([mockFocus], mockFocus, joinSessionConfig); + await Promise.race([sentStateEvent, new Promise((resolve) => setTimeout(resolve, 500))]); expect(makeNewLegacyMembershipsMock).toHaveBeenCalledTimes(shouldUseLegacy ? 1 : 0); expect(makeNewMembershipMock).toHaveBeenCalledTimes(shouldUseLegacy ? 0 : 1); + + await Promise.race([sentDelayedState, new Promise((resolve) => setTimeout(resolve, 500))]); + expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(shouldUseLegacy ? 0 : 1); } - it("uses legacy events if there are any active legacy calls", () => { - testSession([expiredLegacyMembershipData, legacyMembershipData, sessionMembershipData], true); + it("uses legacy events if there are any active legacy calls", async () => { + await testSession([expiredLegacyMembershipData, legacyMembershipData, sessionMembershipData], true); }); - it('uses legacy events if a non-legacy call is in a "memberships" array', () => { - testSession([sessionMembershipData], true); + it('uses legacy events if a non-legacy call is in a "memberships" array', async () => { + await testSession([sessionMembershipData], true); }); - it("uses non-legacy events if all legacy calls are expired", () => { - testSession([expiredLegacyMembershipData], false); + it("uses non-legacy events if all legacy calls are expired", async () => { + await testSession([expiredLegacyMembershipData], false); }); - it("uses non-legacy events if there are only non-legacy calls", () => { - testSession(sessionMembershipData, false); + it("uses non-legacy events if there are only non-legacy calls", async () => { + await testSession(sessionMembershipData, false); }); }); @@ -347,12 +376,27 @@ describe("MatrixRTCSession", () => { describe("joining", () => { let mockRoom: Room; let sendStateEventMock: jest.Mock; + let sendDelayedStateMock: jest.Mock; let sendEventMock: jest.Mock; + let sentStateEvent: Promise; + let sentDelayedState: Promise; + beforeEach(() => { - sendStateEventMock = jest.fn(); + sentStateEvent = new Promise((resolve) => { + sendStateEventMock = jest.fn(resolve); + }); + sentDelayedState = new Promise((resolve) => { + sendDelayedStateMock = jest.fn(() => { + resolve(); + return { + delay_id: "id", + }; + }); + }); sendEventMock = jest.fn(); client.sendStateEvent = sendStateEventMock; + client._unstable_sendDelayedStateEvent = sendDelayedStateMock; client.sendEvent = sendEventMock; mockRoom = makeMockRoom([]); @@ -373,9 +417,11 @@ describe("MatrixRTCSession", () => { expect(sess!.isJoined()).toEqual(true); }); - it("sends a membership event when joining a call", () => { + it("sends a membership event when joining a call", async () => { + const realSetTimeout = setTimeout; jest.useFakeTimers(); sess!.joinRoomSession([mockFocus], mockFocus); + await Promise.race([sentStateEvent, new Promise((resolve) => realSetTimeout(resolve, 500))]); expect(client.sendStateEvent).toHaveBeenCalledWith( mockRoom!.roomId, EventType.GroupCallMemberPrefix, @@ -396,6 +442,8 @@ describe("MatrixRTCSession", () => { }, "@alice:example.org", ); + await Promise.race([sentDelayedState, new Promise((resolve) => realSetTimeout(resolve, 500))]); + expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(0); jest.useRealTimers(); }); @@ -403,13 +451,15 @@ describe("MatrixRTCSession", () => { const activeFocusConfig = { type: "livekit", livekit_service_url: "https://active.url" }; const activeFocus = { type: "livekit", focus_selection: "oldest_membership" }; - function testJoin(useOwnedStateEvents: boolean): void { + async function testJoin(useOwnedStateEvents: boolean): Promise { + const realSetTimeout = setTimeout; if (useOwnedStateEvents) { mockRoom.getVersion = jest.fn().mockReturnValue("org.matrix.msc3779.default"); } jest.useFakeTimers(); sess!.joinRoomSession([activeFocusConfig], activeFocus, { useLegacyMemberEvents: false }); + await Promise.race([sentStateEvent, new Promise((resolve) => realSetTimeout(resolve, 500))]); expect(client.sendStateEvent).toHaveBeenCalledWith( mockRoom!.roomId, EventType.GroupCallMemberPrefix, @@ -423,15 +473,17 @@ describe("MatrixRTCSession", () => { } satisfies SessionMembershipData, `${!useOwnedStateEvents ? "_" : ""}@alice:example.org_AAAAAAA`, ); + await Promise.race([sentDelayedState, new Promise((resolve) => realSetTimeout(resolve, 500))]); + expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1); jest.useRealTimers(); } - it("sends a membership event with session payload when joining a non-legacy call", () => { - testJoin(false); + it("sends a membership event with session payload when joining a non-legacy call", async () => { + await testJoin(false); }); - it("does not prefix the state key with _ for rooms that support user-owned state events", () => { - testJoin(true); + it("does not prefix the state key with _ for rooms that support user-owned state events", async () => { + await testJoin(true); }); }); diff --git a/src/@types/requests.ts b/src/@types/requests.ts index d6b7ff75f..ae3ea16dd 100644 --- a/src/@types/requests.ts +++ b/src/@types/requests.ts @@ -76,6 +76,56 @@ export interface ISendEventResponse { event_id: string; } +export type TimeoutDelay = { + delay: number; +}; + +export type ParentDelayId = { + parent_delay_id: string; +}; + +export type SendTimeoutDelayedEventRequestOpts = TimeoutDelay & Partial; +export type SendActionDelayedEventRequestOpts = ParentDelayId; + +export type SendDelayedEventRequestOpts = SendTimeoutDelayedEventRequestOpts | SendActionDelayedEventRequestOpts; + +export type SendDelayedEventResponse = { + delay_id: string; +}; + +export enum UpdateDelayedEventAction { + Cancel = "cancel", + Restart = "restart", + Send = "send", +} + +export type UpdateDelayedEventRequestOpts = SendDelayedEventResponse & { + action: UpdateDelayedEventAction; +}; + +type DelayedPartialTimelineEvent = { + room_id: string; + type: string; + content: IContent; +}; + +type DelayedPartialStateEvent = DelayedPartialTimelineEvent & { + state_key: string; + transaction_id: string; +}; + +type DelayedPartialEvent = DelayedPartialTimelineEvent | DelayedPartialStateEvent; + +export type DelayedEventInfo = { + delayed_events: DelayedPartialEvent & + SendDelayedEventResponse & + SendDelayedEventRequestOpts & + { + running_since: number; + }[]; + next_batch?: string; +}; + export interface IPresenceOpts { // One of "online", "offline" or "unavailable" presence: "online" | "offline" | "unavailable"; diff --git a/src/client.ts b/src/client.ts index ae80589e9..e2bab66dc 100644 --- a/src/client.ts +++ b/src/client.ts @@ -114,6 +114,7 @@ import { NotificationCountType, Room, RoomEvent, RoomEventHandlerMap, RoomNameSt import { RoomMemberEvent, RoomMemberEventHandlerMap } from "./models/room-member"; import { IPowerLevelsContent, RoomStateEvent, RoomStateEventHandlerMap } from "./models/room-state"; import { + DelayedEventInfo, IAddThreePidOnlyBody, IBindThreePidBody, IContextResponse, @@ -134,6 +135,9 @@ import { IStatusResponse, ITagsResponse, KnockRoomOpts, + SendDelayedEventRequestOpts, + SendDelayedEventResponse, + UpdateDelayedEventAction, } from "./@types/requests"; import { EventType, @@ -530,6 +534,8 @@ export const UNSTABLE_MSC2666_SHARED_ROOMS = "uk.half-shot.msc2666"; export const UNSTABLE_MSC2666_MUTUAL_ROOMS = "uk.half-shot.msc2666.mutual_rooms"; export const UNSTABLE_MSC2666_QUERY_MUTUAL_ROOMS = "uk.half-shot.msc2666.query_mutual_rooms"; +const UNSTABLE_MSC4140_DELAYED_EVENTS = "org.matrix.msc4140"; + enum CrossSigningKeyType { MasterKey = "master_key", SelfSigningKey = "self_signing_key", @@ -4573,12 +4579,19 @@ export class MatrixClient extends TypedEventEmitter { @@ -4596,8 +4609,6 @@ export class MatrixClient extends TypedEventEmitter, txnId?: string, - ): Promise { + ): Promise; + /** + * Sends a delayed event (MSC4140). + * @param eventObject - An object with the partial structure of an event, to which event_id, user_id, room_id and origin_server_ts will be added. + * @param delayOpts - Properties of the delay for this event. + * @param txnId - Optional. + * @returns Promise which resolves: to an empty object `{}` + * @returns Rejects: with an error response. + */ + private sendCompleteEvent( + roomId: string, + threadId: string | null, + eventObject: Partial, + delayOpts: SendDelayedEventRequestOpts, + txnId?: string, + ): Promise; + private sendCompleteEvent( + roomId: string, + threadId: string | null, + eventObject: Partial, + delayOptsOrTxnId?: SendDelayedEventRequestOpts | string, + txnIdOrVoid?: string, + ): Promise { + let delayOpts: SendDelayedEventRequestOpts | undefined; + let txnId: string | undefined; + if (typeof delayOptsOrTxnId === "string") { + txnId = delayOptsOrTxnId; + } else { + delayOpts = delayOptsOrTxnId; + txnId = txnIdOrVoid; + } + if (!txnId) { txnId = this.makeTxnId(); } @@ -4634,9 +4676,11 @@ export class MatrixClient extends TypedEventEmitter { + protected async encryptAndSendEvent(room: Room | null, event: MatrixEvent): Promise; + /** + * Simply sends a delayed event without encrypting it. + * TODO: Allow encrypted delayed events, and encrypt them properly + * @param delayOpts - Properties of the delay for this event. + * @returns returns a promise which resolves with the result of the delayed send request + */ + protected async encryptAndSendEvent( + room: Room | null, + event: MatrixEvent, + delayOpts: SendDelayedEventRequestOpts, + ): Promise; + protected async encryptAndSendEvent( + room: Room | null, + event: MatrixEvent, + delayOpts?: SendDelayedEventRequestOpts, + ): Promise { + if (delayOpts) { + return this.sendEventHttpRequest(event, delayOpts); + } + try { let cancelled: boolean; this.eventsBeingEncrypted.add(event.getId()!); @@ -4824,7 +4895,15 @@ export class MatrixClient extends TypedEventEmitter { + private sendEventHttpRequest(event: MatrixEvent): Promise; + private sendEventHttpRequest( + event: MatrixEvent, + delayOpts: SendDelayedEventRequestOpts, + ): Promise; + private sendEventHttpRequest( + event: MatrixEvent, + delayOpts?: SendDelayedEventRequestOpts, + ): Promise { let txnId = event.getTxnId(); if (!txnId) { txnId = this.makeTxnId(); @@ -4856,12 +4935,20 @@ export class MatrixClient extends TypedEventEmitter(Method.Put, path, undefined, event.getWireContent()) - .then((res) => { + const content = event.getWireContent(); + if (!delayOpts) { + return this.http.authedRequest(Method.Put, path, undefined, content).then((res) => { this.logger.debug(`Event sent to ${event.getRoomId()} with event id ${res.event_id}`); return res; }); + } else { + return this.http.authedRequest( + Method.Put, + path, + getUnstableDelayQueryOpts(delayOpts), + content, + ); + } } /** @@ -5191,6 +5278,101 @@ export class MatrixClient extends TypedEventEmitter( + roomId: string, + delayOpts: SendDelayedEventRequestOpts, + threadId: string | null, + eventType: K, + content: TimelineEvents[K], + txnId?: string, + ): Promise { + if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) { + throw Error("Server does not support the delayed events API"); + } + + this.addThreadRelationIfNeeded(content, threadId, roomId); + return this.sendCompleteEvent(roomId, threadId, { type: eventType, content }, delayOpts, txnId); + } + + /** + * Send a delayed state event. + * + * Note: This endpoint is unstable, and can throw an `Error`. + * Check progress on [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140) for more details. + */ + // eslint-disable-next-line + public async _unstable_sendDelayedStateEvent( + roomId: string, + delayOpts: SendDelayedEventRequestOpts, + eventType: K, + content: StateEvents[K], + stateKey = "", + opts: IRequestOpts = {}, + ): Promise { + if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) { + throw Error("Server does not support the delayed events API"); + } + + const pathParams = { + $roomId: roomId, + $eventType: eventType, + $stateKey: stateKey, + }; + let path = utils.encodeUri("/rooms/$roomId/state/$eventType", pathParams); + if (stateKey !== undefined) { + path = utils.encodeUri(path + "/$stateKey", pathParams); + } + return this.http.authedRequest(Method.Put, path, getUnstableDelayQueryOpts(delayOpts), content as Body, opts); + } + + /** + * Get all pending delayed events for the calling user. + * + * Note: This endpoint is unstable, and can throw an `Error`. + * Check progress on [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140) for more details. + */ + // eslint-disable-next-line + public async _unstable_getDelayedEvents(fromToken?: string): Promise { + if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) { + throw Error("Server does not support the delayed events API"); + } + + const queryDict = fromToken ? { from: fromToken } : undefined; + return await this.http.authedRequest(Method.Get, "/delayed_events", queryDict, undefined, { + prefix: `${ClientPrefix.Unstable}/${UNSTABLE_MSC4140_DELAYED_EVENTS}`, + }); + } + + /** + * Manage a delayed event associated with the given delay_id. + * + * Note: This endpoint is unstable, and can throw an `Error`. + * Check progress on [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140) for more details. + */ + // eslint-disable-next-line + public async _unstable_updateDelayedEvent(delayId: string, action: UpdateDelayedEventAction): Promise<{}> { + if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) { + throw Error("Server does not support the delayed events API"); + } + + const path = utils.encodeUri("/delayed_events/$delayId", { + $delayId: delayId, + }); + const data = { + action, + }; + return await this.http.authedRequest(Method.Post, path, undefined, data, { + prefix: `${ClientPrefix.Unstable}/${UNSTABLE_MSC4140_DELAYED_EVENTS}`, + }); + } + /** * Send a receipt. * @param event - The event being acknowledged @@ -9892,6 +10074,12 @@ export class MatrixClient extends TypedEventEmitter [`${UNSTABLE_MSC4140_DELAYED_EVENTS}.${k}`, v]), + ); +} + /** * recalculates an accurate notifications count on event decryption. * Servers do not have enough knowledge about encrypted events to calculate an diff --git a/src/embedded.ts b/src/embedded.ts index 8a1492622..d2ba19a57 100644 --- a/src/embedded.ts +++ b/src/embedded.ts @@ -26,7 +26,7 @@ import { } from "matrix-widget-api"; import { MatrixEvent, IEvent, IContent, EventStatus } from "./models/event"; -import { ISendEventResponse } from "./@types/requests"; +import { ISendEventResponse, SendDelayedEventRequestOpts, SendDelayedEventResponse } from "./@types/requests"; import { EventType } from "./@types/event"; import { logger } from "./logger"; import { @@ -248,7 +248,20 @@ export class RoomWidgetClient extends MatrixClient { throw new Error(`Unknown room: ${roomIdOrAlias}`); } - protected async encryptAndSendEvent(room: Room, event: MatrixEvent): Promise { + protected async encryptAndSendEvent(room: Room, event: MatrixEvent): Promise; + protected async encryptAndSendEvent( + room: Room, + event: MatrixEvent, + delayOpts: SendDelayedEventRequestOpts, + ): Promise; + protected async encryptAndSendEvent( + room: Room, + event: MatrixEvent, + delayOpts?: SendDelayedEventRequestOpts, + ): Promise { + if (delayOpts) { + throw new Error("Delayed event sending via widgets is not implemented"); + } let response: ISendEventFromWidgetResponseData; try { response = await this.widgetApi.sendRoomEvent(event.getType(), event.getContent(), room.roomId); diff --git a/src/matrixrtc/MatrixRTCSession.ts b/src/matrixrtc/MatrixRTCSession.ts index 7f6d12aa2..9c45d28ea 100644 --- a/src/matrixrtc/MatrixRTCSession.ts +++ b/src/matrixrtc/MatrixRTCSession.ts @@ -20,6 +20,7 @@ import { EventTimeline } from "../models/event-timeline"; import { Room } from "../models/room"; import { MatrixClient } from "../client"; import { EventType } from "../@types/event"; +import { UpdateDelayedEventAction } from "../@types/requests"; import { CallMembership, CallMembershipData, @@ -865,27 +866,57 @@ export class MatrixRTCSession extends TypedEventEmitter setTimeout(resolve, resendDelay)); await this.triggerCallMembershipEventUpdate(); } } + private scheduleDelayDisconnection(delayId: string): void { + this.memberEventTimeout = setTimeout(() => this.delayDisconnection(delayId), 5000); + } + + private async delayDisconnection(delayId: string): Promise { + try { + await this.client._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Restart); + this.scheduleDelayDisconnection(delayId); + } catch (e) { + logger.error("Failed to delay our disconnection event", e); + } + } + private stateEventsContainOngoingLegacySession(callMemberEvents: Map): boolean { for (const callMemberEvent of callMemberEvents.values()) { const content = callMemberEvent.getContent();