You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2026-01-03 23:22:30 +03:00
Fix MatrixRTC membership manager failing to rejoin in a race condition (sync vs not found response) (#4861)
* add test run helper to allow running long tests in vs code * deprecate IDeferred (as its associated defer method is also deprecated and its just a type rename to PromiseWithResolvers) * Improve docs and readability of MembershipManager.spec.ts * Intoduce test for a race condition which results in a state where the state event and the hasMemberStateEvent variable diverge * fix room state and membership manager state diverging. See: https://github.com/element-hq/element-call-rageshakes/issues/10609 https://github.com/element-hq/element-call-rageshakes/issues/10594 https://github.com/element-hq/element-call-rageshakes/issues/9902 * logging, docstings and variable name improvements * review * review pending timers
This commit is contained in:
@@ -19,3 +19,8 @@ jest.mock("../src/http-api/utils", () => ({
|
||||
// We mock timeoutSignal otherwise it causes tests to leave timers running
|
||||
timeoutSignal: () => new AbortController().signal,
|
||||
}));
|
||||
|
||||
// Dont make test fail too soon due to timeouts while debugging.
|
||||
if (process.env.VSCODE_INSPECTOR_OPTIONS) {
|
||||
jest.setTimeout(60 * 1000 * 5); // 5 minutes
|
||||
}
|
||||
|
||||
@@ -32,25 +32,38 @@ import { makeMockClient, makeMockRoom, membershipTemplate, mockCallMembership, t
|
||||
import { MembershipManager } from "../../../src/matrixrtc/NewMembershipManager";
|
||||
import { logger } from "../../../src/logger.ts";
|
||||
|
||||
function waitForMockCall(method: MockedFunction<any>, returnVal?: Promise<any>) {
|
||||
return new Promise<void>((resolve) => {
|
||||
method.mockImplementation(() => {
|
||||
resolve();
|
||||
return returnVal ?? Promise.resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
function waitForMockCallOnce(method: MockedFunction<any>, returnVal?: Promise<any>) {
|
||||
return new Promise<void>((resolve) => {
|
||||
method.mockImplementationOnce(() => {
|
||||
resolve();
|
||||
return returnVal ?? Promise.resolve();
|
||||
});
|
||||
/**
|
||||
* Create a promise that will resolve once a mocked method is called.
|
||||
* @param method The method to wait for.
|
||||
* @param returnVal Provide an optional value that the mocked method should return. (use Promise.resolve(val) or Promise.reject(err))
|
||||
* @returns The promise that resolves once the method is called.
|
||||
*/
|
||||
function waitForMockCall(method: MockedFunction<any>, returnVal?: Promise<any>): Promise<void> {
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
method.mockImplementation(() => {
|
||||
resolve();
|
||||
return returnVal ?? Promise.resolve();
|
||||
});
|
||||
return promise;
|
||||
}
|
||||
|
||||
function createAsyncHandle(method: MockedFunction<any>) {
|
||||
const { reject, resolve, promise } = Promise.withResolvers<void>();
|
||||
/** See waitForMockCall */
|
||||
function waitForMockCallOnce(method: MockedFunction<any>, returnVal?: Promise<any>) {
|
||||
const { promise, resolve } = Promise.withResolvers<void>();
|
||||
method.mockImplementationOnce(() => {
|
||||
resolve();
|
||||
return returnVal ?? Promise.resolve();
|
||||
});
|
||||
return promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* A handle to control when in the test flow the provided method resolves (or gets rejected).
|
||||
* @param method The method to control the resolve timing.
|
||||
* @returns
|
||||
*/
|
||||
function createAsyncHandle<T>(method: MockedFunction<any>) {
|
||||
const { reject, resolve, promise } = Promise.withResolvers<T>();
|
||||
method.mockImplementation(() => promise);
|
||||
return { reject, resolve };
|
||||
}
|
||||
@@ -110,13 +123,13 @@ describe.each([
|
||||
it("sends a membership event and schedules delayed leave when joining a call", async () => {
|
||||
// Spys/Mocks
|
||||
|
||||
const updateDelayedEventHandle = createAsyncHandle(client._unstable_updateDelayedEvent as Mock);
|
||||
const updateDelayedEventHandle = createAsyncHandle<void>(client._unstable_updateDelayedEvent as Mock);
|
||||
|
||||
// Test
|
||||
const memberManager = new TestMembershipManager(undefined, room, client, () => undefined);
|
||||
memberManager.join([focus], focusActive);
|
||||
// expects
|
||||
await waitForMockCall(client.sendStateEvent);
|
||||
await waitForMockCall(client.sendStateEvent, Promise.resolve({ event_id: "id" }));
|
||||
expect(client.sendStateEvent).toHaveBeenCalledWith(
|
||||
room.roomId,
|
||||
"org.matrix.msc3401.call.member",
|
||||
@@ -311,6 +324,44 @@ describe.each([
|
||||
});
|
||||
});
|
||||
|
||||
it("rejoins if delayed event is not found (404) !FailsForLegacy", async () => {
|
||||
const RESTART_DELAY = 15000;
|
||||
const manager = new TestMembershipManager(
|
||||
{ delayedLeaveEventRestartMs: RESTART_DELAY },
|
||||
room,
|
||||
client,
|
||||
() => undefined,
|
||||
);
|
||||
// Join with the membership manager
|
||||
manager.join([focus], focusActive);
|
||||
expect(manager.status).toBe(Status.Connecting);
|
||||
// Let the scheduler run one iteration so that we can send the join state event
|
||||
await jest.runOnlyPendingTimersAsync();
|
||||
expect(client.sendStateEvent).toHaveBeenCalledTimes(1);
|
||||
expect(manager.status).toBe(Status.Connected);
|
||||
// Now that we are connected, we set up the mocks.
|
||||
// We enforce the following scenario where we simulate that the delayed event activated and caused the user to leave:
|
||||
// - We wait until the delayed event gets sent and then mock its response to be "not found."
|
||||
// - We enforce a race condition between the sync that informs us that our call membership state event was set to "left"
|
||||
// and the "not found" response from the delayed event: we receive the sync while we are waiting for the delayed event to be sent.
|
||||
// - While the delayed leave event is being sent, we inform the manager that our membership state event was set to "left."
|
||||
// (onRTCSessionMemberUpdate)
|
||||
// - Only then do we resolve the sending of the delayed event.
|
||||
// - We test that the manager acknowledges the leave and sends a new membership state event.
|
||||
(client._unstable_updateDelayedEvent as Mock<any>).mockRejectedValueOnce(
|
||||
new MatrixError({ errcode: "M_NOT_FOUND" }),
|
||||
);
|
||||
|
||||
const { resolve } = createAsyncHandle(client._unstable_sendDelayedStateEvent);
|
||||
await jest.advanceTimersByTimeAsync(RESTART_DELAY);
|
||||
// first simulate the sync, then resolve sending the delayed event.
|
||||
await manager.onRTCSessionMemberUpdate([mockCallMembership(membershipTemplate, room.roomId)]);
|
||||
resolve({ delay_id: "id" });
|
||||
// Let the scheduler run one iteration so that the new join gets sent
|
||||
await jest.runOnlyPendingTimersAsync();
|
||||
expect(client.sendStateEvent).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("uses membershipEventExpiryMs from config", async () => {
|
||||
const manager = new TestMembershipManager(
|
||||
{ membershipEventExpiryMs: 1234567 },
|
||||
@@ -542,8 +593,8 @@ describe.each([
|
||||
expect(manager.status).toBe(Status.Disconnected);
|
||||
});
|
||||
it("emits 'Connection' and 'Connected' after join !FailsForLegacy", async () => {
|
||||
const handleDelayedEvent = createAsyncHandle(client._unstable_sendDelayedStateEvent);
|
||||
const handleStateEvent = createAsyncHandle(client.sendStateEvent);
|
||||
const handleDelayedEvent = createAsyncHandle<void>(client._unstable_sendDelayedStateEvent);
|
||||
const handleStateEvent = createAsyncHandle<void>(client.sendStateEvent);
|
||||
|
||||
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||
expect(manager.status).toBe(Status.Disconnected);
|
||||
@@ -594,7 +645,7 @@ describe.each([
|
||||
});
|
||||
// FailsForLegacy as implementation does not re-check membership before retrying.
|
||||
it("abandons retry loop and sends new own membership if not present anymore !FailsForLegacy", async () => {
|
||||
(client._unstable_sendDelayedStateEvent as any).mockRejectedValue(
|
||||
(client._unstable_sendDelayedStateEvent as Mock<any>).mockRejectedValue(
|
||||
new MatrixError(
|
||||
{ errcode: "M_LIMIT_EXCEEDED" },
|
||||
429,
|
||||
|
||||
@@ -577,7 +577,9 @@ export class MatrixRTCSession extends TypedEventEmitter<
|
||||
oldMemberships.some((m, i) => !CallMembership.equal(m, this.memberships[i]));
|
||||
|
||||
if (changed) {
|
||||
this.logger.info(`Memberships for call in room ${this.roomSubset.roomId} have changed: emitting`);
|
||||
this.logger.info(
|
||||
`Memberships for call in room ${this.roomSubset.roomId} have changed: emitting (${this.memberships.length} members)`,
|
||||
);
|
||||
logDurationSync(this.logger, "emit MatrixRTCSessionEvent.MembershipsChanged", () => {
|
||||
this.emit(MatrixRTCSessionEvent.MembershipsChanged, oldMemberships, this.memberships);
|
||||
});
|
||||
|
||||
@@ -21,7 +21,6 @@ import { UnsupportedDelayedEventsEndpointError } from "../errors.ts";
|
||||
import { ConnectionError, HTTPError, MatrixError } from "../http-api/errors.ts";
|
||||
import { type Logger, logger as rootLogger } from "../logger.ts";
|
||||
import { type Room } from "../models/room.ts";
|
||||
import { defer, type IDeferred } from "../utils.ts";
|
||||
import { type CallMembership, DEFAULT_EXPIRE_DURATION, type SessionMembershipData } from "./CallMembership.ts";
|
||||
import { type Focus } from "./focus.ts";
|
||||
import { isMyMembership, Status } from "./types.ts";
|
||||
@@ -209,14 +208,15 @@ export class MembershipManager
|
||||
// So we do not check scheduler.actions/scheduler.insertions
|
||||
if (!this.leavePromiseResolvers) {
|
||||
// reset scheduled actions so we will not do any new actions.
|
||||
this.leavePromiseResolvers = defer<boolean>();
|
||||
this.leavePromiseResolvers = Promise.withResolvers<boolean>();
|
||||
this.activated = false;
|
||||
this.scheduler.initiateLeave();
|
||||
if (timeout) setTimeout(() => this.leavePromiseResolvers?.resolve(false), timeout);
|
||||
}
|
||||
return this.leavePromiseResolvers.promise;
|
||||
}
|
||||
private leavePromiseResolvers?: IDeferred<boolean>;
|
||||
|
||||
private leavePromiseResolvers?: PromiseWithResolvers<boolean>;
|
||||
|
||||
public async onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise<void> {
|
||||
const userId = this.client.getUserId();
|
||||
@@ -229,14 +229,15 @@ export class MembershipManager
|
||||
MembershipActionType.SendJoinEvent,
|
||||
];
|
||||
this.logger.warn("Missing own membership: force re-join");
|
||||
this.state.hasMemberStateEvent = false;
|
||||
|
||||
if (this.scheduler.actions.find((a) => sendingMembershipActions.includes(a.type as MembershipActionType))) {
|
||||
this.logger.error(
|
||||
"NewMembershipManger tried adding another `SendFirstDelayedEvent` actions even though we already have one in the Queue\nActionQueueOnMemberUpdate:",
|
||||
"NewMembershipManger tried adding another `SendDelayedEvent` actions even though we already have one in the Queue\nActionQueueOnMemberUpdate:",
|
||||
this.scheduler.actions,
|
||||
);
|
||||
} else {
|
||||
// Only react to our own membership missing if we have not already scheduled sending a new membership DirectMembershipManagerAction.Join
|
||||
this.state.hasMemberStateEvent = false;
|
||||
this.scheduler.initiateJoin();
|
||||
}
|
||||
}
|
||||
@@ -382,13 +383,13 @@ export class MembershipManager
|
||||
return this.sendOrResendDelayedLeaveEvent(); // Normal case without any previous delayed id.
|
||||
} else {
|
||||
// This can happen if someone else (or another client) removes our own membership event.
|
||||
// It will trigger `onRTCSessionMemberUpdate` queue `MembershipActionType.SendFirstDelayedEvent`.
|
||||
// It will trigger `onRTCSessionMemberUpdate` queue `MembershipActionType.SendDelayedEvent`.
|
||||
// We might still have our delayed event from the previous participation and dependent on the server this might not
|
||||
// get removed automatically if the state changes. Hence, it would remove our membership unexpectedly shortly after the rejoin.
|
||||
//
|
||||
// In this block we will try to cancel this delayed event before setting up a new one.
|
||||
|
||||
return this.cancelKnownDelayIdBeforeSendFirstDelayedEvent(this.state.delayId);
|
||||
return this.cancelKnownDelayIdBeforeSendDelayedEvent(this.state.delayId);
|
||||
}
|
||||
}
|
||||
case MembershipActionType.RestartDelayedEvent: {
|
||||
@@ -488,7 +489,7 @@ export class MembershipManager
|
||||
});
|
||||
}
|
||||
|
||||
private async cancelKnownDelayIdBeforeSendFirstDelayedEvent(delayId: string): Promise<ActionUpdate> {
|
||||
private async cancelKnownDelayIdBeforeSendDelayedEvent(delayId: string): Promise<ActionUpdate> {
|
||||
// Remove all running updates and restarts
|
||||
return await this.client
|
||||
._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Cancel)
|
||||
|
||||
@@ -435,7 +435,9 @@ export function immediate(): Promise<void> {
|
||||
export function isNullOrUndefined(val: any): boolean {
|
||||
return val === null || val === undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link PromiseWithResolvers} instead.
|
||||
*/
|
||||
export type IDeferred<T> = PromiseWithResolvers<T>;
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user