diff --git a/spec/unit/embedded.spec.ts b/spec/unit/embedded.spec.ts index fc430678d..0b27541db 100644 --- a/spec/unit/embedded.spec.ts +++ b/spec/unit/embedded.spec.ts @@ -49,26 +49,26 @@ const testOIDCToken = { token_type: "Bearer", }; class MockWidgetApi extends EventEmitter { - public start = jest.fn(); - public requestCapability = jest.fn(); - public requestCapabilities = jest.fn(); - public requestCapabilityForRoomTimeline = jest.fn(); - public requestCapabilityToSendEvent = jest.fn(); - public requestCapabilityToReceiveEvent = jest.fn(); - public requestCapabilityToSendMessage = jest.fn(); - public requestCapabilityToReceiveMessage = jest.fn(); - public requestCapabilityToSendState = jest.fn(); - public requestCapabilityToReceiveState = jest.fn(); - public requestCapabilityToSendToDevice = jest.fn(); - public requestCapabilityToReceiveToDevice = jest.fn(); + public start = jest.fn().mockResolvedValue(undefined); + public requestCapability = jest.fn().mockResolvedValue(undefined); + public requestCapabilities = jest.fn().mockResolvedValue(undefined); + public requestCapabilityForRoomTimeline = jest.fn().mockResolvedValue(undefined); + public requestCapabilityToSendEvent = jest.fn().mockResolvedValue(undefined); + public requestCapabilityToReceiveEvent = jest.fn().mockResolvedValue(undefined); + public requestCapabilityToSendMessage = jest.fn().mockResolvedValue(undefined); + public requestCapabilityToReceiveMessage = jest.fn().mockResolvedValue(undefined); + public requestCapabilityToSendState = jest.fn().mockResolvedValue(undefined); + public requestCapabilityToReceiveState = jest.fn().mockResolvedValue(undefined); + public requestCapabilityToSendToDevice = jest.fn().mockResolvedValue(undefined); + public requestCapabilityToReceiveToDevice = jest.fn().mockResolvedValue(undefined); public sendRoomEvent = jest.fn( - (eventType: string, content: unknown, roomId?: string, delay?: number, parentDelayId?: string) => + async (eventType: string, content: unknown, roomId?: string, delay?: number, parentDelayId?: string) => delay === undefined && parentDelayId === undefined ? { event_id: `$${Math.random()}` } : { delay_id: `id-${Math.random()}` }, ); public sendStateEvent = jest.fn( - ( + async ( eventType: string, stateKey: string, content: unknown, @@ -80,17 +80,17 @@ class MockWidgetApi extends EventEmitter { ? { event_id: `$${Math.random()}` } : { delay_id: `id-${Math.random()}` }, ); - public updateDelayedEvent = jest.fn(); - public sendToDevice = jest.fn(); - public requestOpenIDConnectToken = jest.fn(() => { + public updateDelayedEvent = jest.fn().mockResolvedValue(undefined); + public sendToDevice = jest.fn().mockResolvedValue(undefined); + public requestOpenIDConnectToken = jest.fn(async () => { return testOIDCToken; return new Promise(() => { return testOIDCToken; }); }); - public readStateEvents = jest.fn(() => []); - public getTurnServers = jest.fn(() => []); - public sendContentLoaded = jest.fn(); + public readStateEvents = jest.fn(async () => []); + public getTurnServers = jest.fn(async () => []); + public sendContentLoaded = jest.fn().mockResolvedValue(undefined); public transport = { reply: jest.fn(), diff --git a/spec/unit/matrixrtc/MembershipManager.spec.ts b/spec/unit/matrixrtc/MembershipManager.spec.ts index 6bb4810d4..a7a5143af 100644 --- a/spec/unit/matrixrtc/MembershipManager.spec.ts +++ b/spec/unit/matrixrtc/MembershipManager.spec.ts @@ -19,10 +19,11 @@ limitations under the License. import { type MockedFunction, type Mock } from "jest-mock"; -import { EventType, HTTPError, MatrixError, type Room } from "../../../src"; +import { EventType, HTTPError, MatrixError, UnsupportedDelayedEventsEndpointError, type Room } from "../../../src"; import { type Focus, type LivekitFocusActive, type SessionMembershipData } from "../../../src/matrixrtc"; -import { LegacyMembershipManager } from "../../../src/matrixrtc/MembershipManager"; +import { LegacyMembershipManager } from "../../../src/matrixrtc/LegacyMembershipManager"; import { makeMockClient, makeMockRoom, membershipTemplate, mockCallMembership, type MockClient } from "./mocks"; +import { MembershipManager } from "../../../src/matrixrtc/NewMembershipManager"; import { defer } from "../../../src/utils"; function waitForMockCall(method: MockedFunction, returnVal?: Promise) { @@ -44,9 +45,10 @@ function createAsyncHandle(method: MockedFunction) { * Tests different MembershipManager implementations. Some tests don't apply to `LegacyMembershipManager` * use !FailsForLegacy to skip those. See: testEnvironment for more details. */ + describe.each([ { TestMembershipManager: LegacyMembershipManager, description: "LegacyMembershipManager" }, - // { TestMembershipManager: MembershipManager, description: "MembershipManager" }, + { TestMembershipManager: MembershipManager, description: "MembershipManager" }, ])("$description", ({ TestMembershipManager }) => { let client: MockClient; let room: Room; @@ -244,7 +246,12 @@ describe.each([ const delayedHandle = createAsyncHandle(client._unstable_sendDelayedStateEvent as Mock); const manager = new TestMembershipManager({}, room, client, () => undefined); manager.join([focus], focusActive); - delayedHandle.reject?.(Error("Server does not support the delayed events API")); + delayedHandle.reject?.( + new UnsupportedDelayedEventsEndpointError( + "Server does not support the delayed events API", + "sendDelayedStateEvent", + ), + ); expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1); }); it("does try to schedule a delayed leave event again if rate limited", async () => { @@ -328,6 +335,7 @@ describe.each([ await jest.advanceTimersByTimeAsync(1); (client._unstable_updateDelayedEvent as Mock).mockRejectedValue("unknown"); await manager.leave(); + // We send a normal leave event since we failed using updateDelayedEvent with the "send" action. expect(client.sendStateEvent).toHaveBeenLastCalledWith( room.roomId, @@ -337,9 +345,9 @@ describe.each([ ); }); // FailsForLegacy because legacy implementation always sends the empty state event even though it isn't needed - it("does nothing if not joined !FailsForLegacy", async () => { + it("does nothing if not joined !FailsForLegacy", () => { const manager = new TestMembershipManager({}, room, client, () => undefined); - await manager.leave(); + expect(async () => await manager.leave()).not.toThrow(); expect(client._unstable_sendDelayedStateEvent).not.toHaveBeenCalled(); expect(client.sendStateEvent).not.toHaveBeenCalled(); }); @@ -470,10 +478,10 @@ describe.each([ // !FailsForLegacy because the expires logic was removed for the legacy call manager. // Delayed events should replace it entirely but before they have wide adoption // the expiration logic still makes sense. - // TODO: add git commit when we removed it. - it("extends `expires` when call still active !FailsForLegacy", async () => { + // TODO: Add git commit when we removed it. + async function testExpires(expire: number, headroom?: number) { const manager = new TestMembershipManager( - { membershipExpiryTimeout: 10_000 }, + { membershipExpiryTimeout: expire, membershipExpiryTimeoutHeadroom: headroom }, room, client, () => undefined, @@ -482,13 +490,19 @@ describe.each([ await waitForMockCall(client.sendStateEvent); expect(client.sendStateEvent).toHaveBeenCalledTimes(1); const sentMembership = (client.sendStateEvent as Mock).mock.calls[0][2] as SessionMembershipData; - expect(sentMembership.expires).toBe(10_000); + expect(sentMembership.expires).toBe(expire); for (let i = 2; i <= 12; i++) { - await jest.advanceTimersByTimeAsync(10_000); + await jest.advanceTimersByTimeAsync(expire); expect(client.sendStateEvent).toHaveBeenCalledTimes(i); const sentMembership = (client.sendStateEvent as Mock).mock.lastCall![2] as SessionMembershipData; - expect(sentMembership.expires).toBe(10_000 * i); + expect(sentMembership.expires).toBe(expire * i); } + } + it("extends `expires` when call still active !FailsForLegacy", async () => { + await testExpires(10_000); + }); + it("extends `expires` using headroom configuration !FailsForLegacy", async () => { + await testExpires(10_000, 1_000); }); }); @@ -544,7 +558,7 @@ describe.each([ expect(client.sendStateEvent).toHaveBeenCalledTimes(1); }); // FailsForLegacy as implementation does not re-check membership before retrying. - it("abandons retry loop if leave() was called !FailsForLegacy", async () => { + it("abandons retry loop if leave() was called before sending state event !FailsForLegacy", async () => { const handle = createAsyncHandle(client._unstable_sendDelayedStateEvent); const manager = new TestMembershipManager({}, room, client, () => undefined); @@ -565,7 +579,6 @@ describe.each([ await manager.leave(); // Wait for all timers to be setup - // await flushPromises(); await jest.advanceTimersByTimeAsync(1000); // No new events should have been sent: @@ -603,4 +616,87 @@ describe.each([ }); }); }); + describe("unrecoverable errors", () => { + // !FailsForLegacy because legacy does not have a retry limit and no mechanism to communicate unrecoverable errors. + it("throws, when reaching maximum number of retries for initial delayed event creation !FailsForLegacy", async () => { + const delayEventSendError = jest.fn(); + (client._unstable_sendDelayedStateEvent as Mock).mockRejectedValue( + new MatrixError( + { errcode: "M_LIMIT_EXCEEDED" }, + 429, + undefined, + undefined, + new Headers({ "Retry-After": "2" }), + ), + ); + const manager = new TestMembershipManager({}, room, client, () => undefined); + manager.join([focus], focusActive, delayEventSendError); + + for (let i = 0; i < 10; i++) { + await jest.advanceTimersByTimeAsync(2000); + } + expect(delayEventSendError).toHaveBeenCalled(); + }); + // !FailsForLegacy because legacy does not have a retry limit and no mechanism to communicate unrecoverable errors. + it("throws, when reaching maximum number of retries !FailsForLegacy", async () => { + const delayEventRestartError = jest.fn(); + (client._unstable_updateDelayedEvent as Mock).mockRejectedValue( + new MatrixError( + { errcode: "M_LIMIT_EXCEEDED" }, + 429, + undefined, + undefined, + new Headers({ "Retry-After": "1" }), + ), + ); + const manager = new TestMembershipManager({}, room, client, () => undefined); + manager.join([focus], focusActive, delayEventRestartError); + + for (let i = 0; i < 10; i++) { + await jest.advanceTimersByTimeAsync(1000); + } + expect(delayEventRestartError).toHaveBeenCalled(); + }); + it("falls back to using pure state events when some error occurs while sending delayed events !FailsForLegacy", async () => { + const unrecoverableError = jest.fn(); + (client._unstable_sendDelayedStateEvent as Mock).mockRejectedValue(new HTTPError("unknown", 601)); + const manager = new TestMembershipManager({}, room, client, () => undefined); + manager.join([focus], focusActive, unrecoverableError); + await waitForMockCall(client.sendStateEvent); + expect(unrecoverableError).not.toHaveBeenCalledWith(); + expect(client.sendStateEvent).toHaveBeenCalled(); + }); + it("retries before failing in case its a network error !FailsForLegacy", async () => { + const unrecoverableError = jest.fn(); + (client._unstable_sendDelayedStateEvent as Mock).mockRejectedValue(new HTTPError("unknown", 501)); + const manager = new TestMembershipManager( + { callMemberEventRetryDelayMinimum: 1000, maximumNetworkErrorRetryCount: 7 }, + room, + client, + () => undefined, + ); + manager.join([focus], focusActive, unrecoverableError); + for (let retries = 0; retries < 7; retries++) { + expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(retries + 1); + await jest.advanceTimersByTimeAsync(1000); + } + expect(unrecoverableError).toHaveBeenCalled(); + expect(unrecoverableError.mock.lastCall![0].message).toMatch( + "The MembershipManager shut down because of the end condition", + ); + expect(client.sendStateEvent).not.toHaveBeenCalled(); + }); + it("falls back to using pure state events when UnsupportedDelayedEventsEndpointError encountered for delayed events !FailsForLegacy", async () => { + const unrecoverableError = jest.fn(); + (client._unstable_sendDelayedStateEvent as Mock).mockRejectedValue( + new UnsupportedDelayedEventsEndpointError("not supported", "sendDelayedStateEvent"), + ); + const manager = new TestMembershipManager({}, room, client, () => undefined); + manager.join([focus], focusActive, unrecoverableError); + await jest.advanceTimersByTimeAsync(1); + + expect(unrecoverableError).not.toHaveBeenCalled(); + expect(client.sendStateEvent).toHaveBeenCalled(); + }); + }); }); diff --git a/spec/unit/matrixrtc/mocks.ts b/spec/unit/matrixrtc/mocks.ts index dc3949d9b..0c8cb2ade 100644 --- a/spec/unit/matrixrtc/mocks.ts +++ b/spec/unit/matrixrtc/mocks.ts @@ -25,7 +25,7 @@ export const membershipTemplate: SessionMembershipData = { call_id: "", device_id: "AAAAAAA", scope: "m.room", - focus_active: { type: "livekit", livekit_service_url: "https://lk.url" }, + focus_active: { type: "livekit", focus_selection: "oldest_membership" }, foci_preferred: [ { livekit_alias: "!alias:something.org", diff --git a/src/client.ts b/src/client.ts index 89ec7bcec..9638676a8 100644 --- a/src/client.ts +++ b/src/client.ts @@ -240,6 +240,7 @@ import { validateAuthMetadataAndKeys, } from "./oidc/index.ts"; import { type EmptyObject } from "./@types/common.ts"; +import { UnsupportedDelayedEventsEndpointError } from "./errors.ts"; export type Store = IStore; @@ -3351,7 +3352,10 @@ export class MatrixClient extends TypedEventEmitter { if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) { - throw Error("Server does not support the delayed events API"); + throw new UnsupportedDelayedEventsEndpointError( + "Server does not support the delayed events API", + "sendDelayedEvent", + ); } this.addThreadRelationIfNeeded(content, threadId, roomId); @@ -3374,7 +3378,10 @@ export class MatrixClient extends TypedEventEmitter { if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) { - throw Error("Server does not support the delayed events API"); + throw new UnsupportedDelayedEventsEndpointError( + "Server does not support the delayed events API", + "sendDelayedStateEvent", + ); } const pathParams = { @@ -3398,7 +3405,10 @@ export class MatrixClient extends TypedEventEmitter { if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) { - throw Error("Server does not support the delayed events API"); + throw new UnsupportedDelayedEventsEndpointError( + "Server does not support the delayed events API", + "getDelayedEvents", + ); } const queryDict = fromToken ? { from: fromToken } : undefined; @@ -3420,7 +3430,10 @@ export class MatrixClient extends TypedEventEmitter { if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) { - throw Error("Server does not support the delayed events API"); + throw new UnsupportedDelayedEventsEndpointError( + "Server does not support the delayed events API", + "updateDelayedEvent", + ); } const path = utils.encodeUri("/delayed_events/$delayId", { diff --git a/src/embedded.ts b/src/embedded.ts index 7dc617af1..0882872e5 100644 --- a/src/embedded.ts +++ b/src/embedded.ts @@ -50,12 +50,12 @@ import { } from "./client.ts"; import { SyncApi, SyncState } from "./sync.ts"; import { SlidingSyncSdk } from "./sliding-sync-sdk.ts"; -import { MatrixError } from "./http-api/errors.ts"; +import { ConnectionError, MatrixError } from "./http-api/errors.ts"; import { User } from "./models/user.ts"; import { type Room } from "./models/room.ts"; import { type ToDeviceBatch, type ToDevicePayload } from "./models/ToDeviceMessage.ts"; import { MapWithDefault, recursiveMapToObject } from "./utils.ts"; -import { type EmptyObject, TypedEventEmitter } from "./matrix.ts"; +import { type EmptyObject, TypedEventEmitter, UnsupportedDelayedEventsEndpointError } from "./matrix.ts"; interface IStateEventRequest { eventType: string; @@ -358,13 +358,15 @@ export class RoomWidgetClient extends MatrixClient { // Delayed event special case. if (delayOpts) { // TODO: updatePendingEvent for delayed events? - const response = await this.widgetApi.sendRoomEvent( - event.getType(), - content, - room.roomId, - "delay" in delayOpts ? delayOpts.delay : undefined, - "parent_delay_id" in delayOpts ? delayOpts.parent_delay_id : undefined, - ); + const response = await this.widgetApi + .sendRoomEvent( + event.getType(), + content, + room.roomId, + "delay" in delayOpts ? delayOpts.delay : undefined, + "parent_delay_id" in delayOpts ? delayOpts.parent_delay_id : undefined, + ) + .catch(timeoutToConnectionError); return this.validateSendDelayedEventResponse(response); } @@ -374,7 +376,9 @@ export class RoomWidgetClient extends MatrixClient { let response: ISendEventFromWidgetResponseData; try { - response = await this.widgetApi.sendRoomEvent(event.getType(), content, room.roomId); + response = await this.widgetApi + .sendRoomEvent(event.getType(), content, room.roomId) + .catch(timeoutToConnectionError); } catch (e) { this.updatePendingEventStatus(room, event, EventStatus.NOT_SENT); throw e; @@ -397,7 +401,9 @@ export class RoomWidgetClient extends MatrixClient { content: any, stateKey = "", ): Promise { - const response = await this.widgetApi.sendStateEvent(eventType, stateKey, content, roomId); + const response = await this.widgetApi + .sendStateEvent(eventType, stateKey, content, roomId) + .catch(timeoutToConnectionError); if (response.event_id === undefined) { throw new Error("'event_id' absent from response to an event request"); } @@ -416,17 +422,22 @@ export class RoomWidgetClient extends MatrixClient { stateKey = "", ): Promise { if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) { - throw Error("Server does not support the delayed events API"); + throw new UnsupportedDelayedEventsEndpointError( + "Server does not support the delayed events API", + "sendDelayedStateEvent", + ); } - const response = await this.widgetApi.sendStateEvent( - eventType, - stateKey, - content, - roomId, - "delay" in delayOpts ? delayOpts.delay : undefined, - "parent_delay_id" in delayOpts ? delayOpts.parent_delay_id : undefined, - ); + const response = await this.widgetApi + .sendStateEvent( + eventType, + stateKey, + content, + roomId, + "delay" in delayOpts ? delayOpts.delay : undefined, + "parent_delay_id" in delayOpts ? delayOpts.parent_delay_id : undefined, + ) + .catch(timeoutToConnectionError); return this.validateSendDelayedEventResponse(response); } @@ -443,20 +454,25 @@ export class RoomWidgetClient extends MatrixClient { // eslint-disable-next-line public async _unstable_updateDelayedEvent(delayId: string, action: UpdateDelayedEventAction): Promise { if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) { - throw Error("Server does not support the delayed events API"); + throw new UnsupportedDelayedEventsEndpointError( + "Server does not support the delayed events API", + "updateDelayedEvent", + ); } - await this.widgetApi.updateDelayedEvent(delayId, action); + await this.widgetApi.updateDelayedEvent(delayId, action).catch(timeoutToConnectionError); return {}; } public async sendToDevice(eventType: string, contentMap: SendToDeviceContentMap): Promise { - await this.widgetApi.sendToDevice(eventType, false, recursiveMapToObject(contentMap)); + await this.widgetApi + .sendToDevice(eventType, false, recursiveMapToObject(contentMap)) + .catch(timeoutToConnectionError); return {}; } public async getOpenIdToken(): Promise { - const token = await this.widgetApi.requestOpenIDConnectToken(); + const token = await this.widgetApi.requestOpenIDConnectToken().catch(timeoutToConnectionError); // the IOpenIDCredentials from the widget-api and IOpenIDToken form the matrix-js-sdk are compatible. // we still recreate the token to make this transparent and catch'able by the linter in case the types change in the future. return { @@ -474,7 +490,9 @@ export class RoomWidgetClient extends MatrixClient { contentMap.getOrCreate(userId).set(deviceId, payload); } - await this.widgetApi.sendToDevice(eventType, false, recursiveMapToObject(contentMap)); + await this.widgetApi + .sendToDevice(eventType, false, recursiveMapToObject(contentMap)) + .catch(timeoutToConnectionError); } public async encryptAndSendToDevices(userDeviceInfoArr: OlmDevice[], payload: object): Promise { @@ -484,7 +502,9 @@ export class RoomWidgetClient extends MatrixClient { contentMap.getOrCreate(userId).set(deviceId, payload); } - await this.widgetApi.sendToDevice((payload as { type: string }).type, true, recursiveMapToObject(contentMap)); + await this.widgetApi + .sendToDevice((payload as { type: string }).type, true, recursiveMapToObject(contentMap)) + .catch(timeoutToConnectionError); } /** @@ -505,7 +525,9 @@ export class RoomWidgetClient extends MatrixClient { encrypted: boolean, contentMap: SendToDeviceContentMap, ): Promise { - await this.widgetApi.sendToDevice(eventType, encrypted, recursiveMapToObject(contentMap)); + await this.widgetApi + .sendToDevice(eventType, encrypted, recursiveMapToObject(contentMap)) + .catch(timeoutToConnectionError); } // Overridden since we get TURN servers automatically over the widget API, @@ -670,3 +692,16 @@ function processAndThrow(error: unknown): never { throw error; } } + +/** + * This converts an "Request timed out" error from the PostmessageTransport into a ConnectionError. + * It either throws the original error or a new ConnectionError. + **/ +function timeoutToConnectionError(error: unknown): never { + // TODO: this should not check on error.message but instead it should be a specific type + // error instanceof WidgetTimeoutError + if (error instanceof Error && error.message === "Request timed out") { + throw new ConnectionError("widget api timeout"); + } + throw error; +} diff --git a/src/errors.ts b/src/errors.ts index 8345293be..8baf7979b 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -52,3 +52,16 @@ export class ClientStoppedError extends Error { super("MatrixClient has been stopped"); } } + +/** + * This error is thrown when the Homeserver does not support the delayed events enpdpoints. + */ +export class UnsupportedDelayedEventsEndpointError extends Error { + public constructor( + message: string, + public clientEndpoint: "sendDelayedEvent" | "updateDelayedEvent" | "sendDelayedStateEvent" | "getDelayedEvents", + ) { + super(message); + this.name = "UnsupportedDelayedEventsEndpointError"; + } +} diff --git a/src/matrixrtc/MembershipManager.ts b/src/matrixrtc/LegacyMembershipManager.ts similarity index 90% rename from src/matrixrtc/MembershipManager.ts rename to src/matrixrtc/LegacyMembershipManager.ts index bdd22944f..891ffa077 100644 --- a/src/matrixrtc/MembershipManager.ts +++ b/src/matrixrtc/LegacyMembershipManager.ts @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + import { EventType } from "../@types/event.ts"; import { UpdateDelayedEventAction } from "../@types/requests.ts"; import type { MatrixClient } from "../client.ts"; @@ -11,44 +27,7 @@ import { type Focus } from "./focus.ts"; import { isLivekitFocusActive } from "./LivekitFocus.ts"; import { type MembershipConfig } from "./MatrixRTCSession.ts"; import { type EmptyObject } from "../@types/common.ts"; -/** - * This interface defines what a MembershipManager uses and exposes. - * This interface is what we use to write tests and allows to change the actual implementation - * Without breaking tests because of some internal method renaming. - * - * @internal - */ -export interface IMembershipManager { - /** - * If we are trying to join the session. - * It does not reflect if the room state is already configures to represent us being joined. - * It only means that the Manager is running. - * @returns true if we intend to be participating in the MatrixRTC session - */ - isJoined(): boolean; - /** - * Start sending all necessary events to make this user participant in the RTC session. - * @param fociPreferred the list of preferred foci to use in the joined RTC membership event. - * @param fociActive the active focus to use in the joined RTC membership event. - */ - join(fociPreferred: Focus[], fociActive?: Focus): void; - /** - * Send all necessary events to make this user leave the RTC session. - * @param timeout the maximum duration in ms until the promise is forced to resolve. - * @returns It resolves with true in case the leave was sent successfully. - * It resolves with false in case we hit the timeout before sending successfully. - */ - leave(timeout?: number): Promise; - /** - * Call this if the MatrixRTC session members have changed. - */ - onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise; - /** - * The used active focus in the currently joined session. - * @returns the used active focus in the currently joined session or undefined if not joined. - */ - getActiveFocus(): Focus | undefined; -} +import { type IMembershipManager } from "./NewMembershipManager.ts"; /** * This internal class is used by the MatrixRTCSession to manage the local user's own membership of the session. @@ -64,7 +43,8 @@ export interface IMembershipManager { * * It is recommended to only use this interface for testing to allow replacing this class. * - * @internal + * @internal + * @deprecated Use {@link MembershipManager} instead */ export class LegacyMembershipManager implements IMembershipManager { private relativeExpiry: number | undefined; diff --git a/src/matrixrtc/MatrixRTCSession.ts b/src/matrixrtc/MatrixRTCSession.ts index d9879f6f8..4006c124f 100644 --- a/src/matrixrtc/MatrixRTCSession.ts +++ b/src/matrixrtc/MatrixRTCSession.ts @@ -25,8 +25,10 @@ import { RoomStateEvent } from "../models/room-state.ts"; import { type Focus } from "./focus.ts"; import { KnownMembership } from "../@types/membership.ts"; import { type MatrixEvent } from "../models/event.ts"; -import { LegacyMembershipManager, type IMembershipManager } from "./MembershipManager.ts"; +import { MembershipManager, type IMembershipManager } from "./NewMembershipManager.ts"; import { EncryptionManager, type IEncryptionManager, type Statistics } from "./EncryptionManager.ts"; +import { LegacyMembershipManager } from "./LegacyMembershipManager.ts"; +import { logDurationSync } from "../utils.ts"; const logger = rootLogger.getChild("MatrixRTCSession"); @@ -39,6 +41,8 @@ export enum MatrixRTCSessionEvent { JoinStateChanged = "join_state_changed", // The key used to encrypt media has changed EncryptionKeyChanged = "encryption_key_changed", + /** The membership manager had to shut down caused by an unrecoverable error */ + MembershipManagerError = "membership_manager_error", } export type MatrixRTCSessionEventHandlerMap = { @@ -52,9 +56,17 @@ export type MatrixRTCSessionEventHandlerMap = { encryptionKeyIndex: number, participantId: string, ) => void; + [MatrixRTCSessionEvent.MembershipManagerError]: (error: unknown) => void; }; export interface MembershipConfig { + /** + * Use the new Manager. + * + * Default: `false`. + */ + useNewMembershipManager?: boolean; + /** * The timeout (in milliseconds) after we joined the call, that our membership should expire * unless we have explicitly updated it. @@ -63,6 +75,17 @@ export interface MembershipConfig { */ membershipExpiryTimeout?: number; + /** + * The time in (in milliseconds) which the manager will prematurely send the updated state event before the membership `expires` time to make sure it + * sends the updated state event early enough. + * + * A headroom of 1000ms and a `membershipExpiryTimeout` of 10000ms would result in the first membership event update after 9s and + * a membership event that would be considered expired after 10s. + * + * This value does not have an effect on the value of `SessionMembershipData.expires`. + */ + membershipExpiryTimeoutHeadroom?: number; + /** * The period (in milliseconds) with which we check that our membership event still exists on the * server. If it is not found we create it again. @@ -90,6 +113,16 @@ export interface MembershipConfig { * @deprecated It should be possible to make it stable without this. */ callMemberEventRetryJitter?: number; + + /** + * The maximum number of retries that the manager will do for delayed event sending/updating and state event sending when a server rate limit has been hit. + */ + maximumRateLimitRetryCount?: number; + + /** + * The maximum number of retries that the manager will do for delayed event sending/updating and state event sending when a network error occurs. + */ + maximumNetworkErrorRetryCount?: number; } export interface EncryptionConfig { @@ -318,18 +351,28 @@ export class MatrixRTCSession extends TypedEventEmitter - this.getOldestMembership(), - ); + // Create MembershipManager + if (joinConfig?.useNewMembershipManager ?? false) { + this.membershipManager = new MembershipManager(joinConfig, this.roomSubset, this.client, () => + this.getOldestMembership(), + ); + } else { + this.membershipManager = new LegacyMembershipManager(joinConfig, this.roomSubset, this.client, () => + this.getOldestMembership(), + ); + } } // Join! - this.membershipManager!.join(fociPreferred, fociActive); + this.membershipManager!.join(fociPreferred, fociActive, (e) => { + logger.error("MembershipManager encountered an unrecoverable error: ", e); + this.emit(MatrixRTCSessionEvent.MembershipManagerError, e); + this.emit(MatrixRTCSessionEvent.JoinStateChanged, this.isJoined()); + }); this.encryptionManager!.join(joinConfig); this.emit(MatrixRTCSessionEvent.JoinStateChanged, true); @@ -492,7 +535,9 @@ export class MatrixRTCSession extends TypedEventEmitter { + this.emit(MatrixRTCSessionEvent.MembershipsChanged, oldMemberships, this.memberships); + }); void this.membershipManager?.onRTCSessionMemberUpdate(this.memberships); } diff --git a/src/matrixrtc/NewMembershipManager.ts b/src/matrixrtc/NewMembershipManager.ts new file mode 100644 index 000000000..9977364a9 --- /dev/null +++ b/src/matrixrtc/NewMembershipManager.ts @@ -0,0 +1,931 @@ +/* +Copyright 2025 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { EventType } from "../@types/event.ts"; +import { UpdateDelayedEventAction } from "../@types/requests.ts"; +import { type MatrixClient } from "../client.ts"; +import { UnsupportedDelayedEventsEndpointError } from "../errors.ts"; +import { ConnectionError, HTTPError, MatrixError } from "../http-api/errors.ts"; +import { logger as rootLogger } from "../logger.ts"; +import { type Room } from "../models/room.ts"; +import { defer, type IDeferred } from "../utils.ts"; +import { type CallMembership, DEFAULT_EXPIRE_DURATION, type SessionMembershipData } from "./CallMembership.ts"; +import { type Focus } from "./focus.ts"; +import { isLivekitFocusActive } from "./LivekitFocus.ts"; +import { type MembershipConfig } from "./MatrixRTCSession.ts"; +import { ActionScheduler, type ActionUpdate } from "./NewMembershipManagerActionScheduler.ts"; + +const logger = rootLogger.getChild("MatrixRTCSession"); + +/** + * This interface defines what a MembershipManager uses and exposes. + * This interface is what we use to write tests and allows changing the actual implementation + * without breaking tests because of some internal method renaming. + * + * @internal + */ +export interface IMembershipManager { + /** + * If we are trying to join, or have successfully joined the session. + * It does not reflect if the room state is already configured to represent us being joined. + * It only means that the Manager is running. + * @returns true if we intend to be participating in the MatrixRTC session + */ + isJoined(): boolean; + /** + * Start sending all necessary events to make this user participate in the RTC session. + * @param fociPreferred the list of preferred foci to use in the joined RTC membership event. + * @param fociActive the active focus to use in the joined RTC membership event. + * @throws can throw if it exceeds a configured maximum retry. + */ + join(fociPreferred: Focus[], fociActive?: Focus, onError?: (error: unknown) => void): void; + /** + * Send all necessary events to make this user leave the RTC session. + * @param timeout the maximum duration in ms until the promise is forced to resolve. + * @returns It resolves with true in case the leave was sent successfully. + * It resolves with false in case we hit the timeout before sending successfully. + */ + leave(timeout?: number): Promise; + /** + * Call this if the MatrixRTC session members have changed. + */ + onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise; + /** + * The used active focus in the currently joined session. + * @returns the used active focus in the currently joined session or undefined if not joined. + */ + getActiveFocus(): Focus | undefined; +} + +/* MembershipActionTypes: + ▼ + ┌─────────────────────┐ + │SendFirstDelayedEvent│ + └─────────────────────┘ + │ + ▼ + ┌─────────────┐ + ┌────────────│SendJoinEvent│────────────┐ + │ └─────────────┘ │ + │ ┌─────┐ ┌──────┐ │ ┌──────┐ + ▼ ▼ │ │ ▼ ▼ ▼ │ +┌────────────┐ │ │ ┌───────────────────┐ │ +│UpdateExpiry│ │ │ │RestartDelayedEvent│ │ +└────────────┘ │ │ └───────────────────┘ │ + │ │ │ │ │ │ + └─────┘ └──────┘ │ │ + │ │ + ┌────────────────────┐ │ │ + │SendMainDelayedEvent│◄───────┘ │ + └───────────────────┬┘ │ + │ │ + └─────────────────────┘ + STOP ALL ABOVE + ▼ + ┌───────────────────────────────┐ + │ SendScheduledDelayedLeaveEvent│ + └───────────────────────────────┘ + │ + ▼ + ┌──────────────┐ + │SendLeaveEvent│ + └──────────────┘ + +*/ +/** + * The different types of actions the MembershipManager can take. + * @internal + */ +export enum MembershipActionType { + SendFirstDelayedEvent = "SendFirstDelayedEvent", + // -> MembershipActionType.SendJoinEvent if successful + // -> DelayedLeaveActionType.SendFirstDelayedEvent on error, retry sending the first delayed event. + SendJoinEvent = "SendJoinEvent", + // -> MembershipActionType.SendJoinEvent if we run into a rate limit and need to retry + // -> MembershipActionType.Update if we successfully send the join event then schedule the expire event update + // -> DelayedLeaveActionType.RestartDelayedEvent to recheck the delayed event + RestartDelayedEvent = "RestartDelayedEvent", + // -> DelayedLeaveActionType.SendMainDelayedEvent on missing delay id but there is a rtc state event + // -> DelayedLeaveActionType.SendFirstDelayedEvent on missing delay id and there is no state event + // -> DelayedLeaveActionType.RestartDelayedEvent on success we schedule the next restart + UpdateExpiry = "UpdateExpiry", + // -> MembershipActionType.Update if the timeout has passed so the next update is required. + SendMainDelayedEvent = "SendMainDelayedEvent", + // -> DelayedLeaveActionType.RestartDelayedEvent on success start updating the delayed event + // -> DelayedLeaveActionType.SendMainDelayedEvent on error try again + SendScheduledDelayedLeaveEvent = "SendScheduledDelayedLeaveEvent", + // -> MembershipActionType.SendLeaveEvent on failiour (not found) we need to send the leave manually and cannot use the scheduled delayed event + // -> DelayedLeaveActionType.SendScheduledDelayedLeaveEvent on error we try again. + SendLeaveEvent = "SendLeaveEvent", + // -> MembershipActionType.SendLeaveEvent +} + +/** + * @internal + */ +export interface ActionSchedulerState { + /** The delayId we got when successfully sending the delayed leave event. + * Gets set to undefined if the server claims it cannot find the delayed event anymore. */ + delayId?: string; + /** Stores how often we have update the `expires` field. + * `expireUpdateIterations` * `membershipEventExpiryTimeout` resolves to the value the expires field should contain next */ + expireUpdateIterations: number; + /** The time at which we send the first state event. The time the call started from the DAG point of view. + * This is used to compute the local sleep timestamps when to next update the member event with a new expires value. */ + startTime: number; + /** The manager is in the state where its actually connected to the session. */ + hasMemberStateEvent: boolean; + // There can be multiple retries at once so we need to store counters per action + // e.g. the send update membership and the restart delayed could be rate limited at the same time. + /** Retry counter for rate limits */ + rateLimitRetries: Map; + /** Retry counter for other errors */ + networkErrorRetries: Map; +} + +enum Status { + Disconnected = "Disconnected", + Connecting = "Connecting", + ConnectingFailed = "ConnectingFailed", + Connected = "Connected", + Reconnecting = "Reconnecting", + Disconnecting = "Disconnecting", + Stuck = "Stuck", + Unknown = "Unknown", +} + +/** + * This class is responsible for sending all events relating to the own membership of a matrixRTC call. + * It has the following tasks: + * - Send the users leave delayed event before sending the membership + * - Send the users membership if the state machine is started + * - Check if the delayed event was canceled due to sending the membership + * - update the delayed event (`restart`) + * - Update the state event every ~5h = `DEFAULT_EXPIRE_DURATION` (so it does not get treated as expired) + * - When the state machine is stopped: + * - Disconnect the member + * - Stop the timer for the delay refresh + * - Stop the timer for updating the state event + */ +export class MembershipManager implements IMembershipManager { + public isJoined(): boolean { + return this.scheduler.running; + } + + /** + * Puts the MembershipManager in a state where it tries to be joined. + * It will send delayed events and membership events + * @param fociPreferred + * @param focusActive + * @param onError This will be called once the membership manager encounters an unrecoverable error. + * This should bubble up the the frontend to communicate that the call does not work in the current environment. + */ + public join(fociPreferred: Focus[], focusActive?: Focus, onError?: (error: unknown) => void): void { + if (this.isJoined()) { + logger.error("MembershipManager is already running. Ignoring join request."); + return; + } + this.fociPreferred = fociPreferred; + this.focusActive = focusActive; + this.leavePromiseDefer = undefined; + + this.state = MembershipManager.defaultState; + + this.scheduler + .startWithJoin() + .then(() => { + this.leavePromiseDefer?.resolve(true); + this.leavePromiseDefer = undefined; + }) + .catch((e) => { + logger.error("MembershipManager stopped because: ", e); + onError?.(e); + }); + } + + /** + * Leave from the call (Send an rtc session event with content: `{}`) + * @param timeout the maximum duration this promise will take to resolve + * @returns true if it managed to leave and false if the timeout condition happened. + */ + public leave(timeout?: number): Promise { + if (!this.scheduler.running) return Promise.resolve(true); + + // We use the promise to track if we already scheduled a leave event + // So we do not check scheduler.actions/scheduler.insertions + if (!this.leavePromiseDefer) { + // reset scheduled actions so we will not do any new actions. + this.leavePromiseDefer = defer(); + this.scheduler.initiateLeave(); + if (timeout) setTimeout(() => this.leavePromiseDefer?.resolve(false), timeout); + } + return this.leavePromiseDefer.promise; + } + private leavePromiseDefer?: IDeferred; + + public async onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise { + const isMyMembership = (m: CallMembership): boolean => + m.sender === this.client.getUserId() && m.deviceId === this.client.getDeviceId(); + + if (this.isJoined() && !memberships.some(isMyMembership)) { + // If one of these actions are scheduled or are getting inserted in the next iteration, we should already + // take care of our missing membership. + const sendingMembershipActions = [ + MembershipActionType.SendFirstDelayedEvent, + MembershipActionType.SendJoinEvent, + ]; + logger.warn("Missing own membership: force re-join"); + if (this.scheduler.actions.find((a) => sendingMembershipActions.includes(a.type as MembershipActionType))) { + logger.error( + "NewMembershipManger tried adding another `SendFirstDelayedEvent` actions even though we already have one in the Queue\nActionQueueOnMemberUpdate:", + this.scheduler.actions, + ); + } else { + // Only react to our own membership missing if we have not already scheduled sending a new membership DirectMembershipManagerAction.Join + this.state.hasMemberStateEvent = false; + this.scheduler.initiateJoin(); + } + } + return Promise.resolve(); + } + + public getActiveFocus(): Focus | undefined { + if (this.focusActive) { + // A livekit active focus + if (isLivekitFocusActive(this.focusActive)) { + if (this.focusActive.focus_selection === "oldest_membership") { + const oldestMembership = this.getOldestMembership(); + return oldestMembership?.getPreferredFoci()[0]; + } + } else { + logger.warn("Unknown own ActiveFocus type. This makes it impossible to connect to an SFU."); + } + } else { + // We do not understand the membership format (could be legacy). We default to oldestMembership + // Once there are other methods this is a hard error! + const oldestMembership = this.getOldestMembership(); + return oldestMembership?.getPreferredFoci()[0]; + } + } + + /** + * @throws if the client does not return user or device id. + * @param joinConfig + * @param room + * @param client + * @param getOldestMembership + */ + public constructor( + private joinConfig: MembershipConfig | undefined, + private room: Pick, + private client: Pick< + MatrixClient, + | "getUserId" + | "getDeviceId" + | "sendStateEvent" + | "_unstable_sendDelayedStateEvent" + | "_unstable_updateDelayedEvent" + >, + private getOldestMembership: () => CallMembership | undefined, + ) { + const [userId, deviceId] = [this.client.getUserId(), this.client.getDeviceId()]; + if (userId === null) throw Error("Missing userId in client"); + if (deviceId === null) throw Error("Missing deviceId in client"); + this.deviceId = deviceId; + this.stateKey = this.makeMembershipStateKey(userId, deviceId); + this.state = MembershipManager.defaultState; + } + + // MembershipManager mutable state. + private state: ActionSchedulerState; + private static get defaultState(): ActionSchedulerState { + return { + hasMemberStateEvent: false, + delayId: undefined, + + startTime: 0, + rateLimitRetries: new Map(), + networkErrorRetries: new Map(), + expireUpdateIterations: 1, + }; + } + // Membership Event static parameters: + private deviceId: string; + private stateKey: string; + private fociPreferred?: Focus[]; + private focusActive?: Focus; + + // Config: + private membershipServerSideExpiryTimeoutOverride?: number; + + private get callMemberEventRetryDelayMinimum(): number { + return this.joinConfig?.callMemberEventRetryDelayMinimum ?? 3_000; + } + private get membershipEventExpiryTimeout(): number { + return this.joinConfig?.membershipExpiryTimeout ?? DEFAULT_EXPIRE_DURATION; + } + private get membershipEventExpiryTimeoutHeadroom(): number { + return this.joinConfig?.membershipExpiryTimeoutHeadroom ?? 5_000; + } + private computeNextExpiryActionTs(iteration: number): number { + return ( + this.state.startTime + + this.membershipEventExpiryTimeout * iteration - + this.membershipEventExpiryTimeoutHeadroom + ); + } + private get membershipServerSideExpiryTimeout(): number { + return ( + this.membershipServerSideExpiryTimeoutOverride ?? + this.joinConfig?.membershipServerSideExpiryTimeout ?? + 8_000 + ); + } + private get membershipKeepAlivePeriod(): number { + return this.joinConfig?.membershipKeepAlivePeriod ?? 5_000; + } + private get maximumRateLimitRetryCount(): number { + return this.joinConfig?.maximumRateLimitRetryCount ?? 10; + } + private get maximumNetworkErrorRetryCount(): number { + return this.joinConfig?.maximumNetworkErrorRetryCount ?? 10; + } + + // Scheduler: + private oldStatus?: Status; + private scheduler = new ActionScheduler((type): Promise => { + if (this.oldStatus) { + // we put this at the beginning of the actions scheduler loop handle callback since it is a loop this + // is equivalent to running it at the end of the loop. (just after applying the status/action list changes) + logger.debug(`MembershipManager applied action changes. Status: ${this.oldStatus} -> ${this.status}`); + } + this.oldStatus = this.status; + logger.debug(`MembershipManager before processing action. status=${this.oldStatus}`); + return this.membershipLoopHandler(type); + }); + + // LOOP HANDLER: + private async membershipLoopHandler(type: MembershipActionType): Promise { + this.oldStatus = this.status; + switch (type) { + case MembershipActionType.SendFirstDelayedEvent: { + // Before we start we check if we come from a state where we have a delay id. + if (!this.state.delayId) { + return this.sendFirstDelayedLeaveEvent(); // Normal case without any previous delayed id. + } else { + // This can happen if someone else (or another client) removes our own membership event. + // It will trigger `onRTCSessionMemberUpdate` queue `MembershipActionType.SendFirstDelayedEvent`. + // We might still have our delayed event from the previous participation and dependent on the server this might not + // get automatically removed if the state changes. Hence It would remove our membership unexpectedly shortly after the rejoin. + // + // In this block we will try to cancel this delayed event before setting up a new one. + + return this.cancelKnownDelayIdBeforeSendFirstDelayedEvent(this.state.delayId); + } + } + case MembershipActionType.RestartDelayedEvent: { + if (!this.state.delayId) { + // Delay id got reset. This action was used to check if the hs canceled the delayed event when the join state got sent. + return createInsertActionUpdate( + this.state.hasMemberStateEvent + ? MembershipActionType.SendMainDelayedEvent + : MembershipActionType.SendFirstDelayedEvent, + ); + } + return this.restartDelayedEvent(this.state.delayId); + } + case MembershipActionType.SendMainDelayedEvent: { + return this.sendMainDelayedEvent(); + } + case MembershipActionType.SendScheduledDelayedLeaveEvent: { + // We are already good + if (!this.state.hasMemberStateEvent) { + return { replace: [] }; + } + if (this.state.delayId) { + return this.sendScheduledDelayedLeaveEventOrFallbackToSendLeaveEvent(this.state.delayId); + } else { + return createInsertActionUpdate(MembershipActionType.SendLeaveEvent); + } + } + case MembershipActionType.SendJoinEvent: { + return this.sendJoinEvent(); + } + case MembershipActionType.UpdateExpiry: { + return this.updateExpiryOnJoinedEvent(); + } + case MembershipActionType.SendLeaveEvent: { + // We are good already + if (!this.state.hasMemberStateEvent) { + return { replace: [] }; + } + // This is only a fallback in case we do not have working delayed events support. + // first we should try to just send the scheduled leave event + return this.sendFallbackLeaveEvent(); + } + } + } + + // HANDLERS (used in the membershipLoopHandler) + private async sendFirstDelayedLeaveEvent(): Promise { + return await this.client + ._unstable_sendDelayedStateEvent( + this.room.roomId, + { + delay: this.membershipServerSideExpiryTimeout, + }, + EventType.GroupCallMemberPrefix, + {}, // leave event + this.stateKey, + ) + .then((response) => { + // On success we reset retries and set delayId. + this.state.rateLimitRetries.set(MembershipActionType.SendFirstDelayedEvent, 0); + this.state.networkErrorRetries.set(MembershipActionType.SendFirstDelayedEvent, 0); + this.state.delayId = response.delay_id; + return createInsertActionUpdate(MembershipActionType.SendJoinEvent); + }) + .catch((e) => { + const repeatActionType = MembershipActionType.SendFirstDelayedEvent; + if (this.manageMaxDelayExceededSituation(e)) { + return createInsertActionUpdate(repeatActionType); + } + const update = this.actionUpdateFromErrors(e, repeatActionType, "sendDelayedStateEvent"); + if (update) return update; + + // log and fall through + if (this.isUnsupportedDelayedEndpoint(e)) { + logger.info("Not using delayed event because the endpoint is not supported"); + } else { + logger.info("Not using delayed event because: " + e); + } + // On any other error we fall back to not using delayed events and send the join state event immediately + return createInsertActionUpdate(MembershipActionType.SendJoinEvent); + }); + } + + private async cancelKnownDelayIdBeforeSendFirstDelayedEvent(delayId: string): Promise { + // Remove all running updates and restarts + return await this.client + ._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Cancel) + .then(() => { + this.state.delayId = undefined; + this.resetRateLimitCounter(MembershipActionType.SendFirstDelayedEvent); + return createReplaceActionUpdate(MembershipActionType.SendFirstDelayedEvent); + }) + .catch((e) => { + const repeatActionType = MembershipActionType.SendFirstDelayedEvent; + const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent"); + if (update) return update; + + if (this.isNotFoundError(e)) { + // If we get a M_NOT_FOUND we know that the delayed event got already removed. + // This means we are good and can set it to undefined and run this again. + this.state.delayId = undefined; + return createReplaceActionUpdate(repeatActionType); + } + if (this.isUnsupportedDelayedEndpoint(e)) { + return createReplaceActionUpdate(MembershipActionType.SendJoinEvent); + } + // We do not just ignore and log this error since we would also need to reset the delayId. + + // This becomes an unrecoverable error case since something is significantly off if we don't hit any of the above cases + // when state.delayId !== undefined + // We do not just ignore and log this error since we would also need to reset the delayId. + // It is cleaner if we, the frontend, rejoins instead of resetting the delayId here and behaving like in the success case. + throw Error( + "We failed to cancel a delayed event where we already had a delay id with an error we cannot automatically handle", + ); + }); + } + + private async restartDelayedEvent(delayId: string): Promise { + return await this.client + ._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Restart) + .then(() => { + this.resetRateLimitCounter(MembershipActionType.RestartDelayedEvent); + return createInsertActionUpdate( + MembershipActionType.RestartDelayedEvent, + this.membershipKeepAlivePeriod, + ); + }) + .catch((e) => { + const repeatActionType = MembershipActionType.RestartDelayedEvent; + if (this.isNotFoundError(e)) { + this.state.delayId = undefined; + return createInsertActionUpdate(MembershipActionType.SendMainDelayedEvent); + } + // If the HS does not support delayed events we wont reschedule. + if (this.isUnsupportedDelayedEndpoint(e)) return {}; + + // TODO this also needs a test: get rate limit while checking id delayed event is scheduled + const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent"); + if (update) return update; + + // In other error cases we have no idea what is happening + throw Error("Could not restart delayed event, even though delayed events are supported. " + e); + }); + } + + private async sendMainDelayedEvent(): Promise { + return await this.client + ._unstable_sendDelayedStateEvent( + this.room.roomId, + { + delay: this.membershipServerSideExpiryTimeout, + }, + EventType.GroupCallMemberPrefix, + {}, // leave event + this.stateKey, + ) + .then((response) => { + this.state.delayId = response.delay_id; + this.resetRateLimitCounter(MembershipActionType.SendMainDelayedEvent); + return createInsertActionUpdate( + MembershipActionType.RestartDelayedEvent, + this.membershipKeepAlivePeriod, + ); + }) + .catch((e) => { + const repeatActionType = MembershipActionType.SendMainDelayedEvent; + // Don't do any other delayed event work if its not supported. + if (this.isUnsupportedDelayedEndpoint(e)) return {}; + + if (this.manageMaxDelayExceededSituation(e)) { + return createInsertActionUpdate(repeatActionType); + } + const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent"); + if (update) return update; + + throw Error("Could not send delayed event, even though delayed events are supported. " + e); + }); + } + + private async sendScheduledDelayedLeaveEventOrFallbackToSendLeaveEvent(delayId: string): Promise { + return await this.client + ._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Send) + .then(() => { + this.state.hasMemberStateEvent = false; + this.resetRateLimitCounter(MembershipActionType.SendScheduledDelayedLeaveEvent); + + return { replace: [] }; + }) + .catch((e) => { + const repeatActionType = MembershipActionType.SendLeaveEvent; + if (this.isUnsupportedDelayedEndpoint(e)) return {}; + if (this.isNotFoundError(e)) { + this.state.delayId = undefined; + return createInsertActionUpdate(repeatActionType); + } + const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent"); + if (update) return update; + + // On any other error we fall back to SendLeaveEvent (this includes hard errors from rate limiting) + logger.warn( + "Encountered unexpected error during SendScheduledDelayedLeaveEvent. Falling back to SendLeaveEvent", + e, + ); + return createInsertActionUpdate(repeatActionType); + }); + } + + private async sendJoinEvent(): Promise { + return await this.client + .sendStateEvent( + this.room.roomId, + EventType.GroupCallMemberPrefix, + this.makeMyMembership(this.membershipEventExpiryTimeout), + this.stateKey, + ) + .then(() => { + this.state.startTime = Date.now(); + // The next update should already use twice the membershipEventExpiryTimeout + this.state.expireUpdateIterations = 1; + this.state.hasMemberStateEvent = true; + this.resetRateLimitCounter(MembershipActionType.SendJoinEvent); + return { + insert: [ + { ts: Date.now(), type: MembershipActionType.RestartDelayedEvent }, + { + ts: this.computeNextExpiryActionTs(this.state.expireUpdateIterations), + type: MembershipActionType.UpdateExpiry, + }, + ], + }; + }) + .catch((e) => { + const update = this.actionUpdateFromErrors(e, MembershipActionType.SendJoinEvent, "sendStateEvent"); + if (update) return update; + throw e; + }); + } + + private async updateExpiryOnJoinedEvent(): Promise { + const nextExpireUpdateIteration = this.state.expireUpdateIterations + 1; + return await this.client + .sendStateEvent( + this.room.roomId, + EventType.GroupCallMemberPrefix, + this.makeMyMembership(this.membershipEventExpiryTimeout * nextExpireUpdateIteration), + this.stateKey, + ) + .then(() => { + // Success, we reset retries and schedule update. + this.resetRateLimitCounter(MembershipActionType.UpdateExpiry); + this.state.expireUpdateIterations = nextExpireUpdateIteration; + return { + insert: [ + { + ts: this.computeNextExpiryActionTs(nextExpireUpdateIteration), + type: MembershipActionType.UpdateExpiry, + }, + ], + }; + }) + .catch((e) => { + const update = this.actionUpdateFromErrors(e, MembershipActionType.UpdateExpiry, "sendStateEvent"); + if (update) return update; + + throw e; + }); + } + private async sendFallbackLeaveEvent(): Promise { + return await this.client + .sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, {}, this.stateKey) + .then(() => { + this.resetRateLimitCounter(MembershipActionType.SendLeaveEvent); + this.state.hasMemberStateEvent = false; + return { replace: [] }; + }) + .catch((e) => { + const update = this.actionUpdateFromErrors(e, MembershipActionType.SendLeaveEvent, "sendStateEvent"); + if (update) return update; + throw e; + }); + } + + // HELPERS + private makeMembershipStateKey(localUserId: string, localDeviceId: string): string { + const stateKey = `${localUserId}_${localDeviceId}`; + if (/^org\.matrix\.msc(3757|3779)\b/.exec(this.room.getVersion())) { + return stateKey; + } else { + return `_${stateKey}`; + } + } + + /** + * Constructs our own membership + */ + private makeMyMembership(expires: number): SessionMembershipData { + return { + call_id: "", + scope: "m.room", + application: "m.call", + device_id: this.deviceId, + expires, + focus_active: { type: "livekit", focus_selection: "oldest_membership" }, + foci_preferred: this.fociPreferred ?? [], + }; + } + + // Error checks and handlers + + /** + * Check if its a NOT_FOUND error + * @param error the error causing this handler check/execution + * @returns true if its a not found error + */ + private isNotFoundError(error: unknown): boolean { + return error instanceof MatrixError && error.errcode === "M_NOT_FOUND"; + } + + /** + * Check if this is a DelayExceeded timeout and update the TimeoutOverride for the next try + * @param error the error causing this handler check/execution + * @returns true if its a delay exceeded error and we updated the local TimeoutOverride + */ + private manageMaxDelayExceededSituation(error: unknown): boolean { + if ( + error instanceof MatrixError && + error.errcode === "M_UNKNOWN" && + error.data["org.matrix.msc4140.errcode"] === "M_MAX_DELAY_EXCEEDED" + ) { + const maxDelayAllowed = error.data["org.matrix.msc4140.max_delay"]; + if (typeof maxDelayAllowed === "number" && this.membershipServerSideExpiryTimeout > maxDelayAllowed) { + this.membershipServerSideExpiryTimeoutOverride = maxDelayAllowed; + } + logger.warn("Retry sending delayed disconnection event due to server timeout limitations:", error); + return true; + } + return false; + } + + private actionUpdateFromErrors( + error: unknown, + type: MembershipActionType, + method: string, + ): ActionUpdate | undefined { + const updateLimit = this.actionUpdateFromRateLimitError(error, method, type); + if (updateLimit) return updateLimit; + const updateNetwork = this.actionUpdateFromNetworkErrorRetry(error, type); + if (updateNetwork) return updateNetwork; + } + /** + * Check if we have a rate limit error and schedule the same action again if we dont exceed the rate limit retry count yet. + * @param error the error causing this handler check/execution + * @param method the method used for the throw message + * @param type which MembershipActionType we reschedule because of a rate limit. + * @throws If it is a rate limit error and the retry count got exceeded + * @returns Returns true if we handled the error by rescheduling the correct next action. + * Returns false if it is not a network error. + */ + private actionUpdateFromRateLimitError( + error: unknown, + method: string, + type: MembershipActionType, + ): ActionUpdate | undefined { + // "Is rate limit"-boundary + if (!((error instanceof HTTPError || error instanceof MatrixError) && error.isRateLimitError())) { + return undefined; + } + + // retry boundary + const rateLimitRetries = this.state.rateLimitRetries.get(type) ?? 0; + if (rateLimitRetries < this.maximumRateLimitRetryCount) { + let resendDelay: number; + const defaultMs = 5000; + try { + resendDelay = error.getRetryAfterMs() ?? defaultMs; + logger.info(`Rate limited by server, retrying in ${resendDelay}ms`); + } catch (e) { + logger.warn( + `Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`, + e, + ); + resendDelay = defaultMs; + } + this.state.rateLimitRetries.set(type, rateLimitRetries + 1); + return createInsertActionUpdate(type, resendDelay); + } + + throw Error("Exceeded maximum retries for " + type + " attempts (client." + method + "): " + (error as Error)); + } + + /** + * FIXME Don't Check the error and retry the same MembershipAction again in the configured time and for the configured retry count. + * @param error the error causing this handler check/execution + * @param type the action type that we need to repeat because of the error + * @throws If it is a network error and the retry count got exceeded + * @returns + * Returns true if we handled the error by rescheduling the correct next action. + * Returns false if it is not a network error. + */ + private actionUpdateFromNetworkErrorRetry(error: unknown, type: MembershipActionType): ActionUpdate | undefined { + // "Is a network error"-boundary + const retries = this.state.networkErrorRetries.get(type) ?? 0; + const retryDurationString = this.callMemberEventRetryDelayMinimum / 1000 + "s"; + const retryCounterString = "(" + retries + "/" + this.maximumNetworkErrorRetryCount + ")"; + if (error instanceof Error && error.name === "AbortError") { + logger.warn( + "Network local timeout error while sending event, retrying in " + + retryDurationString + + " " + + retryCounterString, + error, + ); + } else if (error instanceof Error && error.message.includes("updating delayed event")) { + // TODO: We do not want error message matching here but instead the error should be a typed HTTPError + // and be handled below automatically (the same as in the SPA case). + // + // The error originates because of https://github.com/matrix-org/matrix-widget-api/blob/5d81d4a26ff69e4bd3ddc79a884c9527999fb2f4/src/ClientWidgetApi.ts#L698-L701 + // uses `e` instance of HttpError (and not MatrixError) + // The element web widget driver (only checks for MatrixError) is then failing to process (`processError`) it as a typed error: https://github.com/element-hq/element-web/blob/471712cbf06a067e5499bd5d2d7a75f693d9a12d/src/stores/widgets/StopGapWidgetDriver.ts#L711-L715 + // So it will not call: `error.asWidgetApiErrorData()` which is also missing for `HttpError` + // + // A proper fix would be to either find a place to convert the `HttpError` into a `MatrixError` and the `processError` + // method to handle it as expected or to adjust `processError` to also process `HttpError`'s. + logger.warn( + "delayed event update timeout error, retrying in " + retryDurationString + " " + retryCounterString, + error, + ); + } else if (error instanceof ConnectionError) { + logger.warn( + "Network connection error while sending event, retrying in " + + retryDurationString + + " " + + retryCounterString, + error, + ); + } else if ( + (error instanceof HTTPError || error instanceof MatrixError) && + typeof error.httpStatus === "number" && + error.httpStatus >= 500 && + error.httpStatus < 600 + ) { + logger.warn( + "Server error while sending event, retrying in " + retryDurationString + " " + retryCounterString, + error, + ); + } else { + return undefined; + } + + // retry boundary + if (retries < this.maximumNetworkErrorRetryCount) { + this.state.networkErrorRetries.set(type, retries + 1); + return createInsertActionUpdate(type, this.callMemberEventRetryDelayMinimum); + } + + // Failure + throw Error( + "Reached maximum (" + this.maximumNetworkErrorRetryCount + ") retries cause by: " + (error as Error), + ); + } + + /** + * Check if its an UnsupportedDelayedEventsEndpointError and which implies that we cannot do any delayed event logic + * @param error The error to check + * @returns true it its an UnsupportedDelayedEventsEndpointError + */ + private isUnsupportedDelayedEndpoint(error: unknown): boolean { + return error instanceof UnsupportedDelayedEventsEndpointError; + } + + private resetRateLimitCounter(type: MembershipActionType): void { + this.state.rateLimitRetries.set(type, 0); + this.state.networkErrorRetries.set(type, 0); + } + + public get status(): Status { + const actions = this.scheduler.actions; + if (actions.length === 1) { + const { type } = actions[0]; + switch (type) { + case MembershipActionType.SendFirstDelayedEvent: + case MembershipActionType.SendJoinEvent: + case MembershipActionType.SendMainDelayedEvent: + return Status.Connecting; + case MembershipActionType.UpdateExpiry: // where no delayed events + return Status.Connected; + case MembershipActionType.SendScheduledDelayedLeaveEvent: + case MembershipActionType.SendLeaveEvent: + return Status.Disconnecting; + default: + // pass through as not expected + } + } else if (actions.length === 2) { + const types = actions.map((a) => a.type); + // normal state for connected with delayed events + if ( + (types.includes(MembershipActionType.RestartDelayedEvent) || + types.includes(MembershipActionType.SendMainDelayedEvent)) && + types.includes(MembershipActionType.UpdateExpiry) + ) { + return Status.Connected; + } + } else if (actions.length === 3) { + const types = actions.map((a) => a.type); + // It is a correct connected state if we already schedule the next Restart but have not yet cleaned up + // the current restart. + if ( + types.filter((t) => t === MembershipActionType.RestartDelayedEvent).length === 2 && + types.includes(MembershipActionType.UpdateExpiry) + ) { + return Status.Connected; + } + } + + if (!this.scheduler.running) { + return Status.Disconnected; + } + + logger.error("MembershipManager has an unknown state. Actions: ", actions); + return Status.Unknown; + } +} + +function createInsertActionUpdate(type: MembershipActionType, offset?: number): ActionUpdate { + return { + insert: [{ ts: Date.now() + (offset ?? 0), type }], + }; +} + +function createReplaceActionUpdate(type: MembershipActionType, offset?: number): ActionUpdate { + return { + replace: [{ ts: Date.now() + (offset ?? 0), type }], + }; +} diff --git a/src/matrixrtc/NewMembershipManagerActionScheduler.ts b/src/matrixrtc/NewMembershipManagerActionScheduler.ts new file mode 100644 index 000000000..8bb37bc11 --- /dev/null +++ b/src/matrixrtc/NewMembershipManagerActionScheduler.ts @@ -0,0 +1,131 @@ +import { logger as rootLogger } from "../logger.ts"; +import { type EmptyObject } from "../matrix.ts"; +import { sleep } from "../utils.ts"; +import { MembershipActionType } from "./NewMembershipManager.ts"; + +const logger = rootLogger.getChild("MatrixRTCSession"); + +/** @internal */ +export interface Action { + /** + * When this action should be executed + */ + ts: number; + /** + * The state of the different loops + * can also be thought of as the type of the action + */ + type: MembershipActionType; +} + +/** @internal */ +export type ActionUpdate = + | { + /** Replace all existing scheduled actions with this new array */ + replace: Action[]; + } + | { + /** Add these actions to the existing scheduled actions */ + insert: Action[]; + } + | EmptyObject; + +/** + * This scheduler tracks the state of the current membership participation + * and runs one central timer that wakes up a handler callback with the correct action + state + * whenever necessary. + * + * It can also be awakened whenever a new action is added which is + * earlier then the current "next awake". + * @internal + */ +export class ActionScheduler { + public running = false; + + public constructor( + /** This is the callback called for each scheduled action (`this.addAction()`) */ + private membershipLoopHandler: (type: MembershipActionType) => Promise, + ) {} + + // function for the wakeup mechanism (in case we add an action externally and need to leave the current sleep) + private wakeup: (update: ActionUpdate) => void = (update: ActionUpdate): void => { + logger.error("Cannot call wakeup before calling `startWithJoin()`"); + }; + private _actions: Action[] = []; + public get actions(): Action[] { + return this._actions; + } + + /** + * This starts the main loop of the membership manager that handles event sending, delayed event sending and delayed event restarting. + * @param initialActions The initial actions the manager will start with. It should be enough to pass: DelayedLeaveActionType.Initial + * @returns Promise that resolves once all actions have run and no more are scheduled. + * @throws This throws an error if one of the actions throws. + * In most other error cases the manager will try to handle any server errors by itself. + */ + public async startWithJoin(): Promise { + if (this.running) { + logger.error("Cannot call startWithJoin() on NewMembershipActionScheduler while already running"); + return; + } + this.running = true; + this._actions = [{ ts: Date.now(), type: MembershipActionType.SendFirstDelayedEvent }]; + try { + while (this._actions.length > 0) { + // Sort so next (smallest ts) action is at the beginning + this._actions.sort((a, b) => a.ts - b.ts); + const nextAction = this._actions[0]; + let wakeupUpdate: ActionUpdate | undefined = undefined; + + // while we await for the next action, wakeup has to resolve the wakeupPromise + const wakeupPromise = new Promise((resolve) => { + this.wakeup = (update: ActionUpdate): void => { + wakeupUpdate = update; + resolve(); + }; + }); + if (nextAction.ts > Date.now()) await Promise.race([wakeupPromise, sleep(nextAction.ts - Date.now())]); + + let handlerResult: ActionUpdate = {}; + if (!wakeupUpdate) { + logger.debug( + `Current MembershipManager processing: ${nextAction.type}\nQueue:`, + this._actions, + `\nDate.now: "${Date.now()}`, + ); + try { + // `this.wakeup` can also be called and sets the `wakupUpdate` object while we are in the handler. + handlerResult = await this.membershipLoopHandler(nextAction.type as MembershipActionType); + } catch (e) { + throw Error(`The MembershipManager shut down because of the end condition: ${e}`); + } + } + // remove the processed action only after we are done processing + this._actions.splice(0, 1); + // The wakeupUpdate always wins since that is a direct external update. + const actionUpdate = wakeupUpdate ?? handlerResult; + + if ("replace" in actionUpdate) { + this._actions = actionUpdate.replace; + } else if ("insert" in actionUpdate) { + this._actions.push(...actionUpdate.insert); + } + } + } catch (e) { + // Set the rtc session "not running" state since we cannot recover from here and the consumer user of the + // MatrixRTCSession class needs to manually rejoin. + this.running = false; + throw e; + } + this.running = false; + + logger.debug("Leave MembershipManager ActionScheduler loop (no more actions)"); + } + + public initiateJoin(): void { + this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendFirstDelayedEvent }] }); + } + public initiateLeave(): void { + this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendScheduledDelayedLeaveEvent }] }); + } +} diff --git a/src/utils.ts b/src/utils.ts index 93ef2bf73..9a822f92f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -405,6 +405,23 @@ export async function logDuration(logger: BaseLogger, name: string, block: () } } +/** + * Utility to log the duration of a synchronous block. + * + * @param logger - The logger to log to. + * @param name - The name of the operation. + * @param block - The block to execute. + */ +export function logDurationSync(logger: BaseLogger, name: string, block: () => T): T { + const start = Date.now(); + try { + return block(); + } finally { + const end = Date.now(); + logger.debug(`[Perf]: ${name} took ${end - start}ms`); + } +} + /** * Promise/async version of {@link setImmediate}. *