From 44399f601733c8f4daf5a6c891a355cf425192a9 Mon Sep 17 00:00:00 2001 From: Timo <16718859+toger5@users.noreply.github.com> Date: Wed, 4 Jun 2025 12:44:12 +0200 Subject: [PATCH] Fix MatrixRTC membership manager failing to rejoin in a race condition (sync vs not found response) (#4861) * add test run helper to allow running long tests in vs code * deprecate IDeferred (as its associated defer method is also deprecated and its just a type rename to PromiseWithResolvers) * Improve docs and readability of MembershipManager.spec.ts * Intoduce test for a race condition which results in a state where the state event and the hasMemberStateEvent variable diverge * fix room state and membership manager state diverging. See: https://github.com/element-hq/element-call-rageshakes/issues/10609 https://github.com/element-hq/element-call-rageshakes/issues/10594 https://github.com/element-hq/element-call-rageshakes/issues/9902 * logging, docstings and variable name improvements * review * review pending timers --- spec/setupTests.ts | 5 + spec/unit/matrixrtc/MembershipManager.spec.ts | 93 ++++++++++++++----- src/matrixrtc/MatrixRTCSession.ts | 4 +- src/matrixrtc/NewMembershipManager.ts | 17 ++-- src/utils.ts | 4 +- 5 files changed, 92 insertions(+), 31 deletions(-) diff --git a/spec/setupTests.ts b/spec/setupTests.ts index 2c16f6632..ac05e6f08 100644 --- a/spec/setupTests.ts +++ b/spec/setupTests.ts @@ -19,3 +19,8 @@ jest.mock("../src/http-api/utils", () => ({ // We mock timeoutSignal otherwise it causes tests to leave timers running timeoutSignal: () => new AbortController().signal, })); + +// Dont make test fail too soon due to timeouts while debugging. +if (process.env.VSCODE_INSPECTOR_OPTIONS) { + jest.setTimeout(60 * 1000 * 5); // 5 minutes +} diff --git a/spec/unit/matrixrtc/MembershipManager.spec.ts b/spec/unit/matrixrtc/MembershipManager.spec.ts index 36cb456c7..91d15f56e 100644 --- a/spec/unit/matrixrtc/MembershipManager.spec.ts +++ b/spec/unit/matrixrtc/MembershipManager.spec.ts @@ -32,25 +32,38 @@ import { makeMockClient, makeMockRoom, membershipTemplate, mockCallMembership, t import { MembershipManager } from "../../../src/matrixrtc/NewMembershipManager"; import { logger } from "../../../src/logger.ts"; -function waitForMockCall(method: MockedFunction, returnVal?: Promise) { - return new Promise((resolve) => { - method.mockImplementation(() => { - resolve(); - return returnVal ?? Promise.resolve(); - }); - }); -} -function waitForMockCallOnce(method: MockedFunction, returnVal?: Promise) { - return new Promise((resolve) => { - method.mockImplementationOnce(() => { - resolve(); - return returnVal ?? Promise.resolve(); - }); +/** + * Create a promise that will resolve once a mocked method is called. + * @param method The method to wait for. + * @param returnVal Provide an optional value that the mocked method should return. (use Promise.resolve(val) or Promise.reject(err)) + * @returns The promise that resolves once the method is called. + */ +function waitForMockCall(method: MockedFunction, returnVal?: Promise): Promise { + const { promise, resolve } = Promise.withResolvers(); + method.mockImplementation(() => { + resolve(); + return returnVal ?? Promise.resolve(); }); + return promise; } -function createAsyncHandle(method: MockedFunction) { - const { reject, resolve, promise } = Promise.withResolvers(); +/** See waitForMockCall */ +function waitForMockCallOnce(method: MockedFunction, returnVal?: Promise) { + const { promise, resolve } = Promise.withResolvers(); + method.mockImplementationOnce(() => { + resolve(); + return returnVal ?? Promise.resolve(); + }); + return promise; +} + +/** + * A handle to control when in the test flow the provided method resolves (or gets rejected). + * @param method The method to control the resolve timing. + * @returns + */ +function createAsyncHandle(method: MockedFunction) { + const { reject, resolve, promise } = Promise.withResolvers(); method.mockImplementation(() => promise); return { reject, resolve }; } @@ -110,13 +123,13 @@ describe.each([ it("sends a membership event and schedules delayed leave when joining a call", async () => { // Spys/Mocks - const updateDelayedEventHandle = createAsyncHandle(client._unstable_updateDelayedEvent as Mock); + const updateDelayedEventHandle = createAsyncHandle(client._unstable_updateDelayedEvent as Mock); // Test const memberManager = new TestMembershipManager(undefined, room, client, () => undefined); memberManager.join([focus], focusActive); // expects - await waitForMockCall(client.sendStateEvent); + await waitForMockCall(client.sendStateEvent, Promise.resolve({ event_id: "id" })); expect(client.sendStateEvent).toHaveBeenCalledWith( room.roomId, "org.matrix.msc3401.call.member", @@ -311,6 +324,44 @@ describe.each([ }); }); + it("rejoins if delayed event is not found (404) !FailsForLegacy", async () => { + const RESTART_DELAY = 15000; + const manager = new TestMembershipManager( + { delayedLeaveEventRestartMs: RESTART_DELAY }, + room, + client, + () => undefined, + ); + // Join with the membership manager + manager.join([focus], focusActive); + expect(manager.status).toBe(Status.Connecting); + // Let the scheduler run one iteration so that we can send the join state event + await jest.runOnlyPendingTimersAsync(); + expect(client.sendStateEvent).toHaveBeenCalledTimes(1); + expect(manager.status).toBe(Status.Connected); + // Now that we are connected, we set up the mocks. + // We enforce the following scenario where we simulate that the delayed event activated and caused the user to leave: + // - We wait until the delayed event gets sent and then mock its response to be "not found." + // - We enforce a race condition between the sync that informs us that our call membership state event was set to "left" + // and the "not found" response from the delayed event: we receive the sync while we are waiting for the delayed event to be sent. + // - While the delayed leave event is being sent, we inform the manager that our membership state event was set to "left." + // (onRTCSessionMemberUpdate) + // - Only then do we resolve the sending of the delayed event. + // - We test that the manager acknowledges the leave and sends a new membership state event. + (client._unstable_updateDelayedEvent as Mock).mockRejectedValueOnce( + new MatrixError({ errcode: "M_NOT_FOUND" }), + ); + + const { resolve } = createAsyncHandle(client._unstable_sendDelayedStateEvent); + await jest.advanceTimersByTimeAsync(RESTART_DELAY); + // first simulate the sync, then resolve sending the delayed event. + await manager.onRTCSessionMemberUpdate([mockCallMembership(membershipTemplate, room.roomId)]); + resolve({ delay_id: "id" }); + // Let the scheduler run one iteration so that the new join gets sent + await jest.runOnlyPendingTimersAsync(); + expect(client.sendStateEvent).toHaveBeenCalledTimes(2); + }); + it("uses membershipEventExpiryMs from config", async () => { const manager = new TestMembershipManager( { membershipEventExpiryMs: 1234567 }, @@ -542,8 +593,8 @@ describe.each([ expect(manager.status).toBe(Status.Disconnected); }); it("emits 'Connection' and 'Connected' after join !FailsForLegacy", async () => { - const handleDelayedEvent = createAsyncHandle(client._unstable_sendDelayedStateEvent); - const handleStateEvent = createAsyncHandle(client.sendStateEvent); + const handleDelayedEvent = createAsyncHandle(client._unstable_sendDelayedStateEvent); + const handleStateEvent = createAsyncHandle(client.sendStateEvent); const manager = new TestMembershipManager({}, room, client, () => undefined); expect(manager.status).toBe(Status.Disconnected); @@ -594,7 +645,7 @@ describe.each([ }); // FailsForLegacy as implementation does not re-check membership before retrying. it("abandons retry loop and sends new own membership if not present anymore !FailsForLegacy", async () => { - (client._unstable_sendDelayedStateEvent as any).mockRejectedValue( + (client._unstable_sendDelayedStateEvent as Mock).mockRejectedValue( new MatrixError( { errcode: "M_LIMIT_EXCEEDED" }, 429, diff --git a/src/matrixrtc/MatrixRTCSession.ts b/src/matrixrtc/MatrixRTCSession.ts index 0a5ce5bca..07e89baa3 100644 --- a/src/matrixrtc/MatrixRTCSession.ts +++ b/src/matrixrtc/MatrixRTCSession.ts @@ -577,7 +577,9 @@ export class MatrixRTCSession extends TypedEventEmitter< oldMemberships.some((m, i) => !CallMembership.equal(m, this.memberships[i])); if (changed) { - this.logger.info(`Memberships for call in room ${this.roomSubset.roomId} have changed: emitting`); + this.logger.info( + `Memberships for call in room ${this.roomSubset.roomId} have changed: emitting (${this.memberships.length} members)`, + ); logDurationSync(this.logger, "emit MatrixRTCSessionEvent.MembershipsChanged", () => { this.emit(MatrixRTCSessionEvent.MembershipsChanged, oldMemberships, this.memberships); }); diff --git a/src/matrixrtc/NewMembershipManager.ts b/src/matrixrtc/NewMembershipManager.ts index 266834ddd..1250335d4 100644 --- a/src/matrixrtc/NewMembershipManager.ts +++ b/src/matrixrtc/NewMembershipManager.ts @@ -21,7 +21,6 @@ import { UnsupportedDelayedEventsEndpointError } from "../errors.ts"; import { ConnectionError, HTTPError, MatrixError } from "../http-api/errors.ts"; import { type Logger, logger as rootLogger } from "../logger.ts"; import { type Room } from "../models/room.ts"; -import { defer, type IDeferred } from "../utils.ts"; import { type CallMembership, DEFAULT_EXPIRE_DURATION, type SessionMembershipData } from "./CallMembership.ts"; import { type Focus } from "./focus.ts"; import { isMyMembership, Status } from "./types.ts"; @@ -209,14 +208,15 @@ export class MembershipManager // So we do not check scheduler.actions/scheduler.insertions if (!this.leavePromiseResolvers) { // reset scheduled actions so we will not do any new actions. - this.leavePromiseResolvers = defer(); + this.leavePromiseResolvers = Promise.withResolvers(); this.activated = false; this.scheduler.initiateLeave(); if (timeout) setTimeout(() => this.leavePromiseResolvers?.resolve(false), timeout); } return this.leavePromiseResolvers.promise; } - private leavePromiseResolvers?: IDeferred; + + private leavePromiseResolvers?: PromiseWithResolvers; public async onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise { const userId = this.client.getUserId(); @@ -229,14 +229,15 @@ export class MembershipManager MembershipActionType.SendJoinEvent, ]; this.logger.warn("Missing own membership: force re-join"); + this.state.hasMemberStateEvent = false; + if (this.scheduler.actions.find((a) => sendingMembershipActions.includes(a.type as MembershipActionType))) { this.logger.error( - "NewMembershipManger tried adding another `SendFirstDelayedEvent` actions even though we already have one in the Queue\nActionQueueOnMemberUpdate:", + "NewMembershipManger tried adding another `SendDelayedEvent` actions even though we already have one in the Queue\nActionQueueOnMemberUpdate:", this.scheduler.actions, ); } else { // Only react to our own membership missing if we have not already scheduled sending a new membership DirectMembershipManagerAction.Join - this.state.hasMemberStateEvent = false; this.scheduler.initiateJoin(); } } @@ -382,13 +383,13 @@ export class MembershipManager return this.sendOrResendDelayedLeaveEvent(); // Normal case without any previous delayed id. } else { // This can happen if someone else (or another client) removes our own membership event. - // It will trigger `onRTCSessionMemberUpdate` queue `MembershipActionType.SendFirstDelayedEvent`. + // It will trigger `onRTCSessionMemberUpdate` queue `MembershipActionType.SendDelayedEvent`. // We might still have our delayed event from the previous participation and dependent on the server this might not // get removed automatically if the state changes. Hence, it would remove our membership unexpectedly shortly after the rejoin. // // In this block we will try to cancel this delayed event before setting up a new one. - return this.cancelKnownDelayIdBeforeSendFirstDelayedEvent(this.state.delayId); + return this.cancelKnownDelayIdBeforeSendDelayedEvent(this.state.delayId); } } case MembershipActionType.RestartDelayedEvent: { @@ -488,7 +489,7 @@ export class MembershipManager }); } - private async cancelKnownDelayIdBeforeSendFirstDelayedEvent(delayId: string): Promise { + private async cancelKnownDelayIdBeforeSendDelayedEvent(delayId: string): Promise { // Remove all running updates and restarts return await this.client ._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Cancel) diff --git a/src/utils.ts b/src/utils.ts index b5e178bc6..e69ba3e33 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -435,7 +435,9 @@ export function immediate(): Promise { export function isNullOrUndefined(val: any): boolean { return val === null || val === undefined; } - +/** + * @deprecated use {@link PromiseWithResolvers} instead. + */ export type IDeferred = PromiseWithResolvers; /**