From 5a65c8436dca5fc11ff297ceb4feb325cd0b6273 Mon Sep 17 00:00:00 2001 From: Timo <16718859+toger5@users.noreply.github.com> Date: Tue, 25 Mar 2025 13:49:47 +0100 Subject: [PATCH] MatrixRTC MembershipManger: remove redundant sendDelayedEventAction and expose status (#4747) * Remove redundant sendDelayedEventAction We do already have the state `hasMemberEvent` that allows to distinguish the two cases. No need to create two dedicated actions. * fix missing return * Make membership manager an event emitter to inform about status updates. - deprecate isJoined (replaced by isActivated) - move Interface types to types.ts * add tests for status updates. * lint * test "reschedules delayed leave event" in case the delayed event gets canceled * review * fix types * prettier * fix legacy membership manager --- spec/unit/matrixrtc/MembershipManager.spec.ts | 71 ++++- src/matrixrtc/LegacyMembershipManager.ts | 28 +- src/matrixrtc/MatrixRTCSession.ts | 3 +- src/matrixrtc/NewMembershipManager.ts | 285 ++++++++---------- .../NewMembershipManagerActionScheduler.ts | 6 +- src/matrixrtc/index.ts | 1 + src/matrixrtc/types.ts | 85 +++++- 7 files changed, 304 insertions(+), 175 deletions(-) diff --git a/spec/unit/matrixrtc/MembershipManager.spec.ts b/spec/unit/matrixrtc/MembershipManager.spec.ts index a7a5143af..265289141 100644 --- a/spec/unit/matrixrtc/MembershipManager.spec.ts +++ b/spec/unit/matrixrtc/MembershipManager.spec.ts @@ -20,7 +20,13 @@ limitations under the License. import { type MockedFunction, type Mock } from "jest-mock"; import { EventType, HTTPError, MatrixError, UnsupportedDelayedEventsEndpointError, type Room } from "../../../src"; -import { type Focus, type LivekitFocusActive, type SessionMembershipData } from "../../../src/matrixrtc"; +import { + MembershipManagerEvent, + Status, + type Focus, + type LivekitFocusActive, + type SessionMembershipData, +} from "../../../src/matrixrtc"; import { LegacyMembershipManager } from "../../../src/matrixrtc/LegacyMembershipManager"; import { makeMockClient, makeMockRoom, membershipTemplate, mockCallMembership, type MockClient } from "./mocks"; import { MembershipManager } from "../../../src/matrixrtc/NewMembershipManager"; @@ -34,6 +40,14 @@ function waitForMockCall(method: MockedFunction, returnVal?: Promise) }); }); } +function waitForMockCallOnce(method: MockedFunction, returnVal?: Promise) { + return new Promise((resolve) => { + method.mockImplementationOnce(() => { + resolve(); + return returnVal ?? Promise.resolve(); + }); + }); +} function createAsyncHandle(method: MockedFunction) { const { reject, resolve, promise } = defer(); @@ -78,16 +92,16 @@ describe.each([ // There is no need to clean up mocks since we will recreate the client. }); - describe("isJoined()", () => { + describe("isActivated()", () => { it("defaults to false", () => { const manager = new TestMembershipManager({}, room, client, () => undefined); - expect(manager.isJoined()).toEqual(false); + expect(manager.isActivated()).toEqual(false); }); it("returns true after join()", () => { const manager = new TestMembershipManager({}, room, client, () => undefined); manager.join([]); - expect(manager.isJoined()).toEqual(true); + expect(manager.isActivated()).toEqual(true); }); }); @@ -125,6 +139,23 @@ describe.each([ {}, "_@alice:example.org_AAAAAAA", ); + expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1); + }); + + it("reschedules delayed leave event if sending state cancels it", async () => { + const memberManager = new TestMembershipManager(undefined, room, client, () => undefined); + const waitForSendState = waitForMockCall(client.sendStateEvent); + const waitForUpdateDelaye = waitForMockCallOnce( + client._unstable_updateDelayedEvent, + Promise.reject(new MatrixError({ errcode: "M_NOT_FOUND" })), + ); + memberManager.join([focus], focusActive); + await waitForSendState; + await waitForUpdateDelaye; + await jest.advanceTimersByTimeAsync(1); + // Once for the initial event and once because of the errcode: "M_NOT_FOUND" + // Different to "sends a membership event and schedules delayed leave when joining a call" where its only called once (1) + expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(2); }); describe("does not prefix the state key with _ for rooms that support user-owned state events", () => { @@ -505,7 +536,39 @@ describe.each([ await testExpires(10_000, 1_000); }); }); + describe("status updates", () => { + it("starts 'Disconnected' !FailsForLegacy", () => { + const manager = new TestMembershipManager({}, room, client, () => undefined); + expect(manager.status).toBe(Status.Disconnected); + }); + it("emits 'Connection' and 'Connected' after join !FailsForLegacy", async () => { + const handleDelayedEvent = createAsyncHandle(client._unstable_sendDelayedStateEvent); + const handleStateEvent = createAsyncHandle(client.sendStateEvent); + const manager = new TestMembershipManager({}, room, client, () => undefined); + expect(manager.status).toBe(Status.Disconnected); + const connectEmit = jest.fn(); + manager.on(MembershipManagerEvent.StatusChanged, connectEmit); + manager.join([focus], focusActive); + expect(manager.status).toBe(Status.Connecting); + handleDelayedEvent.resolve(); + await jest.advanceTimersByTimeAsync(1); + expect(connectEmit).toHaveBeenCalledWith(Status.Disconnected, Status.Connecting); + handleStateEvent.resolve(); + await jest.advanceTimersByTimeAsync(1); + expect(connectEmit).toHaveBeenCalledWith(Status.Connecting, Status.Connected); + }); + it("emits 'Disconnecting' and 'Disconnected' after leave !FailsForLegacy", async () => { + const manager = new TestMembershipManager({}, room, client, () => undefined); + const connectEmit = jest.fn(); + manager.on(MembershipManagerEvent.StatusChanged, connectEmit); + manager.join([focus], focusActive); + await jest.advanceTimersByTimeAsync(1); + await manager.leave(); + expect(connectEmit).toHaveBeenCalledWith(Status.Connected, Status.Disconnecting); + expect(connectEmit).toHaveBeenCalledWith(Status.Disconnecting, Status.Disconnected); + }); + }); describe("server error handling", () => { // Types of server error: 429 rate limit with no retry-after header, 429 with retry-after, 50x server error (maybe retry every second), connection/socket timeout describe("retries sending delayed leave event", () => { diff --git a/src/matrixrtc/LegacyMembershipManager.ts b/src/matrixrtc/LegacyMembershipManager.ts index 891ffa077..6ecff9cbc 100644 --- a/src/matrixrtc/LegacyMembershipManager.ts +++ b/src/matrixrtc/LegacyMembershipManager.ts @@ -27,7 +27,7 @@ import { type Focus } from "./focus.ts"; import { isLivekitFocusActive } from "./LivekitFocus.ts"; import { type MembershipConfig } from "./MatrixRTCSession.ts"; import { type EmptyObject } from "../@types/common.ts"; -import { type IMembershipManager } from "./NewMembershipManager.ts"; +import { type IMembershipManager, type MembershipManagerEvent, Status } from "./types.ts"; /** * This internal class is used by the MatrixRTCSession to manage the local user's own membership of the session. @@ -103,9 +103,35 @@ export class LegacyMembershipManager implements IMembershipManager { private getOldestMembership: () => CallMembership | undefined, ) {} + public off( + event: MembershipManagerEvent.StatusChanged, + listener: (oldStatus: Status, newStatus: Status) => void, + ): this { + logger.error("off is not implemented on LegacyMembershipManager"); + return this; + } + + public on( + event: MembershipManagerEvent.StatusChanged, + listener: (oldStatus: Status, newStatus: Status) => void, + ): this { + logger.error("on is not implemented on LegacyMembershipManager"); + return this; + } + public isJoined(): boolean { return this.relativeExpiry !== undefined; } + public isActivated(): boolean { + return this.isJoined(); + } + /** + * Unimplemented + * @returns Status.Unknown + */ + public get status(): Status { + return Status.Unknown; + } public join(fociPreferred: Focus[], fociActive?: Focus): void { this.ownFocusActive = fociActive; diff --git a/src/matrixrtc/MatrixRTCSession.ts b/src/matrixrtc/MatrixRTCSession.ts index 4006c124f..26f11f124 100644 --- a/src/matrixrtc/MatrixRTCSession.ts +++ b/src/matrixrtc/MatrixRTCSession.ts @@ -25,10 +25,11 @@ import { RoomStateEvent } from "../models/room-state.ts"; import { type Focus } from "./focus.ts"; import { KnownMembership } from "../@types/membership.ts"; import { type MatrixEvent } from "../models/event.ts"; -import { MembershipManager, type IMembershipManager } from "./NewMembershipManager.ts"; +import { MembershipManager } from "./NewMembershipManager.ts"; import { EncryptionManager, type IEncryptionManager, type Statistics } from "./EncryptionManager.ts"; import { LegacyMembershipManager } from "./LegacyMembershipManager.ts"; import { logDurationSync } from "../utils.ts"; +import type { IMembershipManager } from "./types.ts"; const logger = rootLogger.getChild("MatrixRTCSession"); diff --git a/src/matrixrtc/NewMembershipManager.ts b/src/matrixrtc/NewMembershipManager.ts index a81dd9469..d4307a7a9 100644 --- a/src/matrixrtc/NewMembershipManager.ts +++ b/src/matrixrtc/NewMembershipManager.ts @@ -24,110 +24,80 @@ import { type Room } from "../models/room.ts"; import { defer, type IDeferred } from "../utils.ts"; import { type CallMembership, DEFAULT_EXPIRE_DURATION, type SessionMembershipData } from "./CallMembership.ts"; import { type Focus } from "./focus.ts"; +import { + type IMembershipManager, + type MembershipManagerEventHandlerMap, + MembershipManagerEvent, + Status, +} from "./types.ts"; import { isLivekitFocusActive } from "./LivekitFocus.ts"; import { type MembershipConfig } from "./MatrixRTCSession.ts"; import { ActionScheduler, type ActionUpdate } from "./NewMembershipManagerActionScheduler.ts"; +import { TypedEventEmitter } from "../models/typed-event-emitter.ts"; const logger = rootLogger.getChild("MatrixRTCSession"); -/** - * This interface defines what a MembershipManager uses and exposes. - * This interface is what we use to write tests and allows changing the actual implementation - * without breaking tests because of some internal method renaming. - * - * @internal - */ -export interface IMembershipManager { - /** - * If we are trying to join, or have successfully joined the session. - * It does not reflect if the room state is already configured to represent us being joined. - * It only means that the Manager should be trying to connect or to disconnect running. - * The Manager is still running right after isJoined becomes false to send the disconnect events. - * (A more accurate name would be `isActivated`) - * @returns true if we intend to be participating in the MatrixRTC session - */ - isJoined(): boolean; - /** - * Start sending all necessary events to make this user participate in the RTC session. - * @param fociPreferred the list of preferred foci to use in the joined RTC membership event. - * @param fociActive the active focus to use in the joined RTC membership event. - * @throws can throw if it exceeds a configured maximum retry. - */ - join(fociPreferred: Focus[], fociActive?: Focus, onError?: (error: unknown) => void): void; - /** - * Send all necessary events to make this user leave the RTC session. - * @param timeout the maximum duration in ms until the promise is forced to resolve. - * @returns It resolves with true in case the leave was sent successfully. - * It resolves with false in case we hit the timeout before sending successfully. - */ - leave(timeout?: number): Promise; - /** - * Call this if the MatrixRTC session members have changed. - */ - onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise; - /** - * The used active focus in the currently joined session. - * @returns the used active focus in the currently joined session or undefined if not joined. - */ - getActiveFocus(): Focus | undefined; -} - /* MembershipActionTypes: - ▼ - ┌─────────────────────┐ - │SendFirstDelayedEvent│ - └─────────────────────┘ - │ - ▼ - ┌─────────────┐ - ┌────────────│SendJoinEvent│────────────┐ - │ └─────────────┘ │ - │ ┌─────┐ ┌──────┐ │ ┌──────┐ - ▼ ▼ │ │ ▼ ▼ ▼ │ + +On Join: ───────────────┐ ┌───────────────(1)───────────┐ + ▼ ▼ │ + ┌────────────────┐ │ + │SendDelayedEvent│ ──────(2)───┐ │ + └────────────────┘ │ │ + │(3) │ │ + ▼ │ │ + ┌─────────────┐ │ │ + ┌──────(4)───│SendJoinEvent│────(4)─────┐ │ │ + │ └─────────────┘ │ │ │ + │ ┌─────┐ ┌──────┐ │ │ │ + ▼ ▼ │ │ ▼ ▼ ▼ │ ┌────────────┐ │ │ ┌───────────────────┐ │ -│UpdateExpiry│ │ │ │RestartDelayedEvent│ │ +│UpdateExpiry│ (s) (s)|RestartDelayedEvent│ │ └────────────┘ │ │ └───────────────────┘ │ - │ │ │ │ │ │ - └─────┘ └──────┘ │ │ - │ │ - ┌────────────────────┐ │ │ - │SendMainDelayedEvent│◄───────┘ │ - └───────────────────┬┘ │ - │ │ - └─────────────────────┘ - STOP ALL ABOVE + │ │ │ │ │ │ + └─────┘ └──────┘ └───────┘ + +On Leave: ───────── STOP ALL ABOVE ▼ - ┌───────────────────────────────┐ - │ SendScheduledDelayedLeaveEvent│ - └───────────────────────────────┘ - │ + ┌────────────────────────────────┐ + │ SendScheduledDelayedLeaveEvent │ + └────────────────────────────────┘ + │(5) ▼ ┌──────────────┐ │SendLeaveEvent│ └──────────────┘ - +(1) [Not found error] results in resending the delayed event +(2) [hasMemberEvent = true] Sending the delayed event if we + already have a call member event results jumping to the + RestartDelayedEvent loop directly +(3) [hasMemberEvent = false] if there is not call member event + sending it is the next step +(4) Both (UpdateExpiry and RestartDelayedEvent) actions are + scheduled when successfully sending the state event +(5) Only if delayed event sending failed (fallback) +(s) Successful restart/resend */ + /** * The different types of actions the MembershipManager can take. * @internal */ export enum MembershipActionType { - SendFirstDelayedEvent = "SendFirstDelayedEvent", + SendDelayedEvent = "SendDelayedEvent", // -> MembershipActionType.SendJoinEvent if successful - // -> DelayedLeaveActionType.SendFirstDelayedEvent on error, retry sending the first delayed event. + // -> DelayedLeaveActionType.SendDelayedEvent on error, retry sending the first delayed event. + // -> DelayedLeaveActionType.RestartDelayedEvent on success start updating the delayed event SendJoinEvent = "SendJoinEvent", // -> MembershipActionType.SendJoinEvent if we run into a rate limit and need to retry // -> MembershipActionType.Update if we successfully send the join event then schedule the expire event update // -> DelayedLeaveActionType.RestartDelayedEvent to recheck the delayed event RestartDelayedEvent = "RestartDelayedEvent", // -> DelayedLeaveActionType.SendMainDelayedEvent on missing delay id but there is a rtc state event - // -> DelayedLeaveActionType.SendFirstDelayedEvent on missing delay id and there is no state event + // -> DelayedLeaveActionType.SendDelayedEvent on missing delay id and there is no state event // -> DelayedLeaveActionType.RestartDelayedEvent on success we schedule the next restart UpdateExpiry = "UpdateExpiry", // -> MembershipActionType.Update if the timeout has passed so the next update is required. - SendMainDelayedEvent = "SendMainDelayedEvent", - // -> DelayedLeaveActionType.RestartDelayedEvent on success start updating the delayed event - // -> DelayedLeaveActionType.SendMainDelayedEvent on error try again SendScheduledDelayedLeaveEvent = "SendScheduledDelayedLeaveEvent", // -> MembershipActionType.SendLeaveEvent on failiour (not found) we need to send the leave manually and cannot use the scheduled delayed event // -> DelayedLeaveActionType.SendScheduledDelayedLeaveEvent on error we try again. @@ -138,7 +108,7 @@ export enum MembershipActionType { /** * @internal */ -export interface ActionSchedulerState { +export interface MembershipManagerState { /** The delayId we got when successfully sending the delayed leave event. * Gets set to undefined if the server claims it cannot find the delayed event anymore. */ delayId?: string; @@ -158,17 +128,6 @@ export interface ActionSchedulerState { networkErrorRetries: Map; } -enum Status { - Disconnected = "Disconnected", - Connecting = "Connecting", - ConnectingFailed = "ConnectingFailed", - Connected = "Connected", - Reconnecting = "Reconnecting", - Disconnecting = "Disconnecting", - Stuck = "Stuck", - Unknown = "Unknown", -} - /** * This class is responsible for sending all events relating to the own membership of a matrixRTC call. * It has the following tasks: @@ -182,11 +141,18 @@ enum Status { * - Stop the timer for the delay refresh * - Stop the timer for updating the state event */ -export class MembershipManager implements IMembershipManager { +export class MembershipManager + extends TypedEventEmitter + implements IMembershipManager +{ private activated = false; - public isJoined(): boolean { + public isActivated(): boolean { return this.activated; } + // DEPRECATED use isActivated + public isJoined(): boolean { + return this.isActivated(); + } /** * Puts the MembershipManager in a state where it tries to be joined. @@ -205,23 +171,27 @@ export class MembershipManager implements IMembershipManager { this.focusActive = focusActive; this.leavePromiseDefer = undefined; this.activated = true; - + this.oldStatus = this.status; this.state = MembershipManager.defaultState; this.scheduler .startWithJoin() - .then(() => { - if (!this.scheduler.running) { - this.leavePromiseDefer?.resolve(true); - this.leavePromiseDefer = undefined; - } - }) .catch((e) => { logger.error("MembershipManager stopped because: ", e); onError?.(e); }) - // Should already be set to false when calling `leave` in non error cases. - .finally(() => (this.activated = false)); + .finally(() => { + // Should already be set to false when calling `leave` in non error cases. + this.activated = false; + // Here the scheduler is not running anymore so we the `membershipLoopHandler` is not called to emit. + if (this.oldStatus && this.oldStatus !== this.status) { + this.emit(MembershipManagerEvent.StatusChanged, this.oldStatus, this.status); + } + if (!this.scheduler.running) { + this.leavePromiseDefer?.resolve(true); + this.leavePromiseDefer = undefined; + } + }); } /** @@ -256,7 +226,7 @@ export class MembershipManager implements IMembershipManager { // If one of these actions are scheduled or are getting inserted in the next iteration, we should already // take care of our missing membership. const sendingMembershipActions = [ - MembershipActionType.SendFirstDelayedEvent, + MembershipActionType.SendDelayedEvent, MembershipActionType.SendJoinEvent, ]; logger.warn("Missing own membership: force re-join"); @@ -313,6 +283,7 @@ export class MembershipManager implements IMembershipManager { >, private getOldestMembership: () => CallMembership | undefined, ) { + super(); const [userId, deviceId] = [this.client.getUserId(), this.client.getDeviceId()]; if (userId === null) throw Error("Missing userId in client"); if (deviceId === null) throw Error("Missing deviceId in client"); @@ -322,8 +293,8 @@ export class MembershipManager implements IMembershipManager { } // MembershipManager mutable state. - private state: ActionSchedulerState; - private static get defaultState(): ActionSchedulerState { + private state: MembershipManagerState; + private static get defaultState(): MembershipManagerState { return { hasMemberStateEvent: false, delayId: undefined, @@ -380,9 +351,13 @@ export class MembershipManager implements IMembershipManager { private oldStatus?: Status; private scheduler = new ActionScheduler((type): Promise => { if (this.oldStatus) { - // we put this at the beginning of the actions scheduler loop handle callback since it is a loop this + // we put this at the beginning of the actions scheduler loop handle callback since it is a loop this // is equivalent to running it at the end of the loop. (just after applying the status/action list changes) + // This order is required because this method needs to return the action updates. logger.debug(`MembershipManager applied action changes. Status: ${this.oldStatus} -> ${this.status}`); + if (this.oldStatus !== this.status) { + this.emit(MembershipManagerEvent.StatusChanged, this.oldStatus, this.status); + } } this.oldStatus = this.status; logger.debug(`MembershipManager before processing action. status=${this.oldStatus}`); @@ -391,17 +366,16 @@ export class MembershipManager implements IMembershipManager { // LOOP HANDLER: private async membershipLoopHandler(type: MembershipActionType): Promise { - this.oldStatus = this.status; switch (type) { - case MembershipActionType.SendFirstDelayedEvent: { + case MembershipActionType.SendDelayedEvent: { // Before we start we check if we come from a state where we have a delay id. if (!this.state.delayId) { - return this.sendFirstDelayedLeaveEvent(); // Normal case without any previous delayed id. + return this.sendOrResendDelayedLeaveEvent(); // Normal case without any previous delayed id. } else { // This can happen if someone else (or another client) removes our own membership event. // It will trigger `onRTCSessionMemberUpdate` queue `MembershipActionType.SendFirstDelayedEvent`. // We might still have our delayed event from the previous participation and dependent on the server this might not - // get automatically removed if the state changes. Hence It would remove our membership unexpectedly shortly after the rejoin. + // get removed automatically if the state changes. Hence, it would remove our membership unexpectedly shortly after the rejoin. // // In this block we will try to cancel this delayed event before setting up a new one. @@ -411,17 +385,10 @@ export class MembershipManager implements IMembershipManager { case MembershipActionType.RestartDelayedEvent: { if (!this.state.delayId) { // Delay id got reset. This action was used to check if the hs canceled the delayed event when the join state got sent. - return createInsertActionUpdate( - this.state.hasMemberStateEvent - ? MembershipActionType.SendMainDelayedEvent - : MembershipActionType.SendFirstDelayedEvent, - ); + return createInsertActionUpdate(MembershipActionType.SendDelayedEvent); } return this.restartDelayedEvent(this.state.delayId); } - case MembershipActionType.SendMainDelayedEvent: { - return this.sendMainDelayedEvent(); - } case MembershipActionType.SendScheduledDelayedLeaveEvent: { // We are already good if (!this.state.hasMemberStateEvent) { @@ -452,7 +419,11 @@ export class MembershipManager implements IMembershipManager { } // HANDLERS (used in the membershipLoopHandler) - private async sendFirstDelayedLeaveEvent(): Promise { + private async sendOrResendDelayedLeaveEvent(): Promise { + // We can reach this at the start of a call (where we do not yet have a membership: state.hasMemberStateEvent=false) + // or during a call if the state event canceled our delayed event or caused by an unexpected error that removed our delayed event. + // (Another client could have canceled it, the homeserver might have removed/lost it due to a restart, ...) + // In the `then` and `catch` block we treat both cases differently. "if (this.state.hasMemberStateEvent) {} else {}" return await this.client ._unstable_sendDelayedStateEvent( this.room.roomId, @@ -465,27 +436,46 @@ export class MembershipManager implements IMembershipManager { ) .then((response) => { // On success we reset retries and set delayId. - this.state.rateLimitRetries.set(MembershipActionType.SendFirstDelayedEvent, 0); - this.state.networkErrorRetries.set(MembershipActionType.SendFirstDelayedEvent, 0); + this.resetRateLimitCounter(MembershipActionType.SendDelayedEvent); this.state.delayId = response.delay_id; - return createInsertActionUpdate(MembershipActionType.SendJoinEvent); + if (this.state.hasMemberStateEvent) { + // This action was scheduled because the previous delayed event was cancelled + // due to lack of https://github.com/element-hq/synapse/pull/17810 + return createInsertActionUpdate( + MembershipActionType.RestartDelayedEvent, + this.membershipKeepAlivePeriod, + ); + } else { + // This action was scheduled because we are in the process of joining + return createInsertActionUpdate(MembershipActionType.SendJoinEvent); + } }) .catch((e) => { - const repeatActionType = MembershipActionType.SendFirstDelayedEvent; + const repeatActionType = MembershipActionType.SendDelayedEvent; if (this.manageMaxDelayExceededSituation(e)) { return createInsertActionUpdate(repeatActionType); } const update = this.actionUpdateFromErrors(e, repeatActionType, "sendDelayedStateEvent"); if (update) return update; - // log and fall through - if (this.isUnsupportedDelayedEndpoint(e)) { - logger.info("Not using delayed event because the endpoint is not supported"); + if (this.state.hasMemberStateEvent) { + // This action was scheduled because the previous delayed event was cancelled + // due to lack of https://github.com/element-hq/synapse/pull/17810 + + // Don't do any other delayed event work if its not supported. + if (this.isUnsupportedDelayedEndpoint(e)) return {}; + throw Error("Could not send delayed event, even though delayed events are supported. " + e); } else { - logger.info("Not using delayed event because: " + e); + // This action was scheduled because we are in the process of joining + // log and fall through + if (this.isUnsupportedDelayedEndpoint(e)) { + logger.info("Not using delayed event because the endpoint is not supported"); + } else { + logger.info("Not using delayed event because: " + e); + } + // On any other error we fall back to not using delayed events and send the join state event immediately + return createInsertActionUpdate(MembershipActionType.SendJoinEvent); } - // On any other error we fall back to not using delayed events and send the join state event immediately - return createInsertActionUpdate(MembershipActionType.SendJoinEvent); }); } @@ -495,11 +485,11 @@ export class MembershipManager implements IMembershipManager { ._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Cancel) .then(() => { this.state.delayId = undefined; - this.resetRateLimitCounter(MembershipActionType.SendFirstDelayedEvent); - return createReplaceActionUpdate(MembershipActionType.SendFirstDelayedEvent); + this.resetRateLimitCounter(MembershipActionType.SendDelayedEvent); + return createReplaceActionUpdate(MembershipActionType.SendDelayedEvent); }) .catch((e) => { - const repeatActionType = MembershipActionType.SendFirstDelayedEvent; + const repeatActionType = MembershipActionType.SendDelayedEvent; const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent"); if (update) return update; @@ -538,7 +528,7 @@ export class MembershipManager implements IMembershipManager { const repeatActionType = MembershipActionType.RestartDelayedEvent; if (this.isNotFoundError(e)) { this.state.delayId = undefined; - return createInsertActionUpdate(MembershipActionType.SendMainDelayedEvent); + return createInsertActionUpdate(MembershipActionType.SendDelayedEvent); } // If the HS does not support delayed events we wont reschedule. if (this.isUnsupportedDelayedEndpoint(e)) return {}; @@ -552,40 +542,6 @@ export class MembershipManager implements IMembershipManager { }); } - private async sendMainDelayedEvent(): Promise { - return await this.client - ._unstable_sendDelayedStateEvent( - this.room.roomId, - { - delay: this.membershipServerSideExpiryTimeout, - }, - EventType.GroupCallMemberPrefix, - {}, // leave event - this.stateKey, - ) - .then((response) => { - this.state.delayId = response.delay_id; - this.resetRateLimitCounter(MembershipActionType.SendMainDelayedEvent); - return createInsertActionUpdate( - MembershipActionType.RestartDelayedEvent, - this.membershipKeepAlivePeriod, - ); - }) - .catch((e) => { - const repeatActionType = MembershipActionType.SendMainDelayedEvent; - // Don't do any other delayed event work if its not supported. - if (this.isUnsupportedDelayedEndpoint(e)) return {}; - - if (this.manageMaxDelayExceededSituation(e)) { - return createInsertActionUpdate(repeatActionType); - } - const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent"); - if (update) return update; - - throw Error("Could not send delayed event, even though delayed events are supported. " + e); - }); - } - private async sendScheduledDelayedLeaveEventOrFallbackToSendLeaveEvent(delayId: string): Promise { return await this.client ._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Send) @@ -887,9 +843,8 @@ export class MembershipManager implements IMembershipManager { if (actions.length === 1) { const { type } = actions[0]; switch (type) { - case MembershipActionType.SendFirstDelayedEvent: + case MembershipActionType.SendDelayedEvent: case MembershipActionType.SendJoinEvent: - case MembershipActionType.SendMainDelayedEvent: return Status.Connecting; case MembershipActionType.UpdateExpiry: // where no delayed events return Status.Connected; @@ -904,7 +859,7 @@ export class MembershipManager implements IMembershipManager { // normal state for connected with delayed events if ( (types.includes(MembershipActionType.RestartDelayedEvent) || - types.includes(MembershipActionType.SendMainDelayedEvent)) && + (types.includes(MembershipActionType.SendDelayedEvent) && this.state.hasMemberStateEvent)) && types.includes(MembershipActionType.UpdateExpiry) ) { return Status.Connected; diff --git a/src/matrixrtc/NewMembershipManagerActionScheduler.ts b/src/matrixrtc/NewMembershipManagerActionScheduler.ts index 71fd078cc..7ed119503 100644 --- a/src/matrixrtc/NewMembershipManagerActionScheduler.ts +++ b/src/matrixrtc/NewMembershipManagerActionScheduler.ts @@ -73,7 +73,7 @@ export class ActionScheduler { return; } this.running = true; - this._actions = [{ ts: Date.now(), type: MembershipActionType.SendFirstDelayedEvent }]; + this._actions = [{ ts: Date.now(), type: MembershipActionType.SendDelayedEvent }]; try { while (this._actions.length > 0) { // Sort so next (smallest ts) action is at the beginning @@ -98,7 +98,7 @@ export class ActionScheduler { `\nDate.now: "${Date.now()}`, ); try { - // `this.wakeup` can also be called and sets the `wakupUpdate` object while we are in the handler. + // `this.wakeup` can also be called and sets the `wakeupUpdate` object while we are in the handler. handlerResult = await this.membershipLoopHandler(nextAction.type as MembershipActionType); } catch (e) { throw Error(`The MembershipManager shut down because of the end condition: ${e}`); @@ -125,7 +125,7 @@ export class ActionScheduler { } public initiateJoin(): void { - this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendFirstDelayedEvent }] }); + this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendDelayedEvent }] }); } public initiateLeave(): void { this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendScheduledDelayedLeaveEvent }] }); diff --git a/src/matrixrtc/index.ts b/src/matrixrtc/index.ts index 35737d70f..1ce647095 100644 --- a/src/matrixrtc/index.ts +++ b/src/matrixrtc/index.ts @@ -20,3 +20,4 @@ export * from "./LivekitFocus.ts"; export * from "./MatrixRTCSession.ts"; export * from "./MatrixRTCSessionManager.ts"; export type * from "./types.ts"; +export { Status, MembershipManagerEvent } from "./types.ts"; diff --git a/src/matrixrtc/types.ts b/src/matrixrtc/types.ts index 998a44445..532ac8dc7 100644 --- a/src/matrixrtc/types.ts +++ b/src/matrixrtc/types.ts @@ -13,7 +13,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -import { type IMentions } from "../matrix.ts"; +import type { IMentions } from "../matrix.ts"; +import type { CallMembership } from "./CallMembership.ts"; +import type { Focus } from "./focus.ts"; + export interface EncryptionKeyEntry { index: number; key: string; @@ -34,3 +37,83 @@ export interface ICallNotifyContent { "notify_type": CallNotifyType; "call_id": string; } + +export enum Status { + Disconnected = "Disconnected", + Connecting = "Connecting", + ConnectingFailed = "ConnectingFailed", + Connected = "Connected", + Reconnecting = "Reconnecting", + Disconnecting = "Disconnecting", + Stuck = "Stuck", + Unknown = "Unknown", +} + +export enum MembershipManagerEvent { + StatusChanged = "StatusChanged", +} + +export type MembershipManagerEventHandlerMap = { + [MembershipManagerEvent.StatusChanged]: (prefStatus: Status, newStatus: Status) => void; +}; + +/** + * This interface defines what a MembershipManager uses and exposes. + * This interface is what we use to write tests and allows changing the actual implementation + * without breaking tests because of some internal method renaming. + * + * @internal + */ +export interface IMembershipManager { + /** + * If we are trying to join, or have successfully joined the session. + * It does not reflect if the room state is already configured to represent us being joined. + * It only means that the Manager should be trying to connect or to disconnect running. + * The Manager is still running right after isJoined becomes false to send the disconnect events. + * @returns true if we intend to be participating in the MatrixRTC session + * @deprecated This name is confusing and replaced by `isActivated()`. (Returns the same as `isActivated()`) + */ + isJoined(): boolean; + /** + * If the manager is activated. This means it tries to do its job to join the call, resend state events... + * It does not imply that the room state is already configured to represent being joined. + * It means that the Manager tries to connect or is connected. ("the manager is still active") + * Once `leave()` is called the manager is not activated anymore but still running until `leave()` resolves. + * @returns `true` if we intend to be participating in the MatrixRTC session + */ + isActivated(): boolean; + /** + * Get the actual connection status of the manager. + */ + get status(): Status; + /** + * The current status while the manager is activated + */ + /** + * Start sending all necessary events to make this user participate in the RTC session. + * @param fociPreferred the list of preferred foci to use in the joined RTC membership event. + * @param fociActive the active focus to use in the joined RTC membership event. + * @throws can throw if it exceeds a configured maximum retry. + */ + join(fociPreferred: Focus[], fociActive?: Focus, onError?: (error: unknown) => void): void; + /** + * Send all necessary events to make this user leave the RTC session. + * @param timeout the maximum duration in ms until the promise is forced to resolve. + * @returns It resolves with true in case the leave was sent successfully. + * It resolves with false in case we hit the timeout before sending successfully. + */ + leave(timeout?: number): Promise; + /** + * Call this if the MatrixRTC session members have changed. + */ + onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise; + /** + * The used active focus in the currently joined session. + * @returns the used active focus in the currently joined session or undefined if not joined. + */ + getActiveFocus(): Focus | undefined; + + // TypedEventEmitter methods: + on(event: MembershipManagerEvent.StatusChanged, listener: (oldStatus: Status, newStatus: Status) => void): this; + off(event: MembershipManagerEvent.StatusChanged, listener: (oldStatus: Status, newStatus: Status) => void): this; +}