1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-07-30 04:23:07 +03:00

MatrixRTC: refactor MatrixRTCSession MemberManager API (#4610)

* update join and leave internal api.

* rename onMembershipUpdate and triggerCallMembershipEventUpdate to onMembershipsUpdate
This makes it more clear that we do not talk about our own membership but all memberships in the session

* cleanup MembershipManager
 - add comments and interface how to test this class.
 - sort methods by public/private
 - make triggerCallMembershipEventUpdate private

* docstrings for getFocusInUse and getActiveFocus

* simplify tests and make them only use MembershipManagerInterface methods.
This allows to exchange the membershipManager with a different implementation.

* convert interface to abstract class.

* review (implement interface, make interface internal, dont change public api.)

* Make the interface an actual interface.
The actual constructor of the class now contains the `Pick` to define what it needs from the client.

* move update condition into MembershipManager

* renaming public api

* Update src/matrixrtc/MatrixRTCSession.ts

Co-authored-by: Hugh Nimmo-Smith <hughns@users.noreply.github.com>

* Update src/matrixrtc/MatrixRTCSession.ts

Co-authored-by: Hugh Nimmo-Smith <hughns@users.noreply.github.com>

---------

Co-authored-by: Hugh Nimmo-Smith <hughns@users.noreply.github.com>
This commit is contained in:
Timo
2025-01-13 14:20:54 +01:00
committed by GitHub
parent bed4e9579e
commit ffb228bf5a
4 changed files with 175 additions and 101 deletions

View File

@ -17,10 +17,10 @@ limitations under the License.
import { encodeBase64, EventType, MatrixClient, MatrixError, MatrixEvent, Room } from "../../../src";
import { KnownMembership } from "../../../src/@types/membership";
import { DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "../../../src/matrixrtc/CallMembership";
import { MembershipManager } from "../../../src/matrixrtc/MembershipManager";
import { MatrixRTCSession, MatrixRTCSessionEvent } from "../../../src/matrixrtc/MatrixRTCSession";
import { EncryptionKeysEventContent } from "../../../src/matrixrtc/types";
import { randomString } from "../../../src/randomstring";
import { flushPromises } from "../../test-utils/flushPromises";
import { makeMockRoom, makeMockRoomState, membershipTemplate } from "./mocks";
const mockFocus = { type: "mock" };
@ -236,16 +236,15 @@ describe("MatrixRTCSession", () => {
});
async function testSession(membershipData: SessionMembershipData): Promise<void> {
const makeNewMembershipSpy = jest.spyOn(MembershipManager.prototype as any, "makeNewMembership");
sess = MatrixRTCSession.roomSessionForRoom(client, makeMockRoom(membershipData));
sess.joinRoomSession([mockFocus], mockFocus, joinSessionConfig);
await Promise.race([sentStateEvent, new Promise((resolve) => setTimeout(resolve, 500))]);
expect(makeNewMembershipSpy).toHaveBeenCalledTimes(1);
expect(sendStateEventMock).toHaveBeenCalledTimes(1);
await Promise.race([sentDelayedState, new Promise((resolve) => setTimeout(resolve, 500))]);
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
expect(sendDelayedStateMock).toHaveBeenCalledTimes(1);
}
it("sends events", async () => {
@ -323,9 +322,11 @@ describe("MatrixRTCSession", () => {
let sendStateEventMock: jest.Mock;
let sendDelayedStateMock: jest.Mock;
let sendEventMock: jest.Mock;
let updateDelayedEventMock: jest.Mock;
let sentStateEvent: Promise<void>;
let sentDelayedState: Promise<void>;
let updatedDelayedEvent: Promise<void>;
beforeEach(() => {
sentStateEvent = new Promise((resolve) => {
@ -339,12 +340,15 @@ describe("MatrixRTCSession", () => {
};
});
});
updatedDelayedEvent = new Promise((r) => {
updateDelayedEventMock = jest.fn(r);
});
sendEventMock = jest.fn();
client.sendStateEvent = sendStateEventMock;
client._unstable_sendDelayedStateEvent = sendDelayedStateMock;
client.sendEvent = sendEventMock;
client._unstable_updateDelayedEvent = jest.fn();
client._unstable_updateDelayedEvent = updateDelayedEventMock;
mockRoom = makeMockRoom([]);
sess = MatrixRTCSession.roomSessionForRoom(client, mockRoom);
@ -482,19 +486,7 @@ describe("MatrixRTCSession", () => {
membershipServerSideExpiryTimeout: 9000,
});
// needed to advance the mock timers properly
// depends on myMembershipManager being created
const scheduledDelayDisconnection = new Promise<void>((resolve) => {
const membershipManager = (sess as any).membershipManager;
const originalFn: () => void = membershipManager.scheduleDelayDisconnection;
membershipManager.scheduleDelayDisconnection = jest.fn(() => {
originalFn.call(membershipManager);
resolve();
});
});
await sendDelayedStateExceedAttempt.then(); // needed to resolve after the send attempt catches
await sendDelayedStateAttempt;
const callProps = (d: number) => {
return [mockRoom!.roomId, { delay: d }, "org.matrix.msc3401.call.member", {}, userStateKey];
@ -525,11 +517,13 @@ describe("MatrixRTCSession", () => {
await sentDelayedState;
// should have prepared the heartbeat to keep delaying the leave event while still connected
await scheduledDelayDisconnection;
// should have tried updating the delayed leave to test that it wasn't replaced by own state
await updatedDelayedEvent;
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(1);
// should update delayed disconnect
// ensures that we reach the code that schedules the timeout for the next delay update before we advance the timers.
await flushPromises();
jest.advanceTimersByTime(5000);
// should update delayed disconnect
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(2);
jest.useRealTimers();
@ -561,7 +555,7 @@ describe("MatrixRTCSession", () => {
const onMembershipsChanged = jest.fn();
sess.on(MatrixRTCSessionEvent.MembershipsChanged, onMembershipsChanged);
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
expect(onMembershipsChanged).not.toHaveBeenCalled();
});
@ -574,7 +568,7 @@ describe("MatrixRTCSession", () => {
sess.on(MatrixRTCSessionEvent.MembershipsChanged, onMembershipsChanged);
mockRoom.getLiveTimeline().getState = jest.fn().mockReturnValue(makeMockRoomState([], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
expect(onMembershipsChanged).toHaveBeenCalled();
});
@ -763,7 +757,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
// member2 re-joins which should trigger an immediate re-send
const keysSentPromise2 = new Promise<EncryptionKeysEventContent>((resolve) => {
@ -772,7 +766,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
// but, that immediate resend is throttled so we need to wait a bit
jest.advanceTimersByTime(1000);
const { keys } = await keysSentPromise2;
@ -825,7 +819,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
await keysSentPromise2;
@ -879,7 +873,7 @@ describe("MatrixRTCSession", () => {
sendEventMock.mockClear();
// these should be a no-op:
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
expect(sendEventMock).toHaveBeenCalledTimes(0);
expect(sess!.statistics.counters.roomEventEncryptionKeysSent).toEqual(1);
} finally {
@ -933,7 +927,7 @@ describe("MatrixRTCSession", () => {
sendEventMock.mockClear();
// this should be a no-op:
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
expect(sendEventMock).toHaveBeenCalledTimes(0);
// advance time to avoid key throttling
@ -947,7 +941,7 @@ describe("MatrixRTCSession", () => {
});
// this should re-send the key
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
await keysSentPromise2;
@ -1010,7 +1004,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
jest.advanceTimersByTime(10000);
@ -1055,7 +1049,7 @@ describe("MatrixRTCSession", () => {
);
}
sess!.onMembershipUpdate();
sess!.onRTCSessionMemberUpdate();
// advance time to avoid key throttling
jest.advanceTimersByTime(10000);
@ -1096,7 +1090,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
await new Promise((resolve) => {
realSetTimeout(resolve);

View File

@ -29,7 +29,7 @@ import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts";
import { KnownMembership } from "../@types/membership.ts";
import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
import { MatrixEvent } from "../models/event.ts";
import { MembershipManager } from "./MembershipManager.ts";
import { LegacyMembershipManager, IMembershipManager } from "./MembershipManager.ts";
const logger = rootLogger.getChild("MatrixRTCSession");
@ -132,7 +132,7 @@ export type JoinSessionConfig = MembershipConfig & EncryptionConfig;
* This class doesn't deal with media at all, just membership & properties of a session.
*/
export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, MatrixRTCSessionEventHandlerMap> {
private membershipManager?: MembershipManager;
private membershipManager?: IMembershipManager;
// The session Id of the call, this is the call_id of the call Member event.
private _callId: string | undefined;
@ -283,7 +283,8 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
super();
this._callId = memberships[0]?.callId;
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
roomState?.on(RoomStateEvent.Members, this.onMembershipUpdate);
// TODO: double check if this is actually needed. Should be covered by refreshRoom in MatrixRTCSessionManager
roomState?.on(RoomStateEvent.Members, this.onRoomMemberUpdate);
this.setExpiryTimer();
}
@ -299,14 +300,13 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
* Performs cleanup & removes timers for client shutdown
*/
public async stop(): Promise<void> {
await this.membershipManager?.leaveRoomSession(1000);
await this.membershipManager?.leave(1000);
if (this.expiryTimeout) {
clearTimeout(this.expiryTimeout);
this.expiryTimeout = undefined;
}
this.membershipManager?.stop();
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
roomState?.off(RoomStateEvent.Members, this.onMembershipUpdate);
roomState?.off(RoomStateEvent.Members, this.onRoomMemberUpdate);
}
/**
@ -324,24 +324,21 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
* @param joinConfig - Additional configuration for the joined session.
*/
public joinRoomSession(fociPreferred: Focus[], fociActive?: Focus, joinConfig?: JoinSessionConfig): void {
this.joinConfig = joinConfig;
if (this.isJoined()) {
logger.info(`Already joined to session in room ${this.room.roomId}: ignoring join call`);
return;
} else {
this.membershipManager = new MembershipManager(joinConfig, this.room, this.client, () =>
this.membershipManager = new LegacyMembershipManager(joinConfig, this.room, this.client, () =>
this.getOldestMembership(),
);
}
this.joinConfig = joinConfig;
this.membershipManager!.join(fociPreferred, fociActive);
this.manageMediaKeys = joinConfig?.manageMediaKeys ?? this.manageMediaKeys;
// TODO: it feels wrong to be doing `setJoined()` and then `joinRoomSession()` non-atomically
// A new api between MembershipManager and the session will need to be defined.
this.membershipManager.setJoined(fociPreferred, fociActive);
if (joinConfig?.manageMediaKeys) {
this.makeNewSenderKey();
this.requestSendCurrentKey();
}
this.membershipManager.joinRoomSession();
this.emit(MatrixRTCSessionEvent.JoinStateChanged, true);
}
@ -383,12 +380,17 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
logger.info(`Leaving call session in room ${this.room.roomId}`);
this.joinConfig = undefined;
this.membershipManager!.setLeft();
this.manageMediaKeys = false;
const leavePromise = this.membershipManager!.leave(timeout);
this.emit(MatrixRTCSessionEvent.JoinStateChanged, false);
return await this.membershipManager!.leaveRoomSession(timeout);
return await leavePromise;
}
/**
* Get the active focus from the current CallMemberState event
* @returns The focus that is currently in use to connect to this session. This is undefined
* if the client is not connected to this session.
*/
public getActiveFocus(): Focus | undefined {
return this.membershipManager?.getActiveFocus();
}
@ -650,7 +652,7 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
}
if (soonestExpiry != undefined) {
this.expiryTimeout = setTimeout(this.onMembershipUpdate, soonestExpiry);
this.expiryTimeout = setTimeout(this.onRTCSessionMemberUpdate, soonestExpiry);
}
}
@ -658,6 +660,13 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
return this.memberships[0];
}
/**
* This method is used when the user is not yet connected to the Session but wants to know what focus
* the users in the session are using to make a decision how it wants/should connect.
*
* See also `getActiveFocus`
* @returns The focus which should be used when joining this session.
*/
public getFocusInUse(): Focus | undefined {
const oldestMembership = this.getOldestMembership();
if (oldestMembership?.getFocusSelection() === "oldest_membership") {
@ -746,11 +755,35 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
m.sender === this.client.getUserId() && m.deviceId === this.client.getDeviceId();
/**
* @deprecated use onRoomMemberUpdate or onRTCSessionMemberUpdate instead. this should be called when any membership in the call is updated
* the old name might have implied to only need to call this when your own membership changes.
*/
public onMembershipUpdate = (): void => {
this.recalculateSessionMembers();
};
/**
* Call this when the Matrix room members have changed.
*/
public onRoomMemberUpdate = (): void => {
this.recalculateSessionMembers();
};
/**
* Call this when something changed that may impacts the current MatrixRTC members in this session.
*/
public onRTCSessionMemberUpdate = (): void => {
this.recalculateSessionMembers();
};
/**
* Call this when anything that could impact rtc memberships has changed: Room Members or RTC members.
*
* Examines the latest call memberships and handles any encryption key sending or rotation that is needed.
*
* This function should be called when the room members or call memberships might have changed.
*/
public onMembershipUpdate = (): void => {
private recalculateSessionMembers = (): void => {
const oldMemberships = this.memberships;
this.memberships = MatrixRTCSession.callMembershipsForRoom(this.room);
@ -764,11 +797,7 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
logger.info(`Memberships for call in room ${this.room.roomId} have changed: emitting`);
this.emit(MatrixRTCSessionEvent.MembershipsChanged, oldMemberships, this.memberships);
if (this.isJoined() && !this.memberships.some(this.isMyMembership)) {
logger.warn("Missing own membership: force re-join");
// TODO: Should this be awaited? And is there anything to tell the focus?
this.membershipManager?.triggerCallMembershipEventUpdate();
}
this.membershipManager?.onRTCSessionMemberUpdate(this.memberships);
}
if (this.manageMediaKeys && this.isJoined()) {

View File

@ -153,7 +153,7 @@ export class MatrixRTCSessionManager extends TypedEventEmitter<MatrixRTCSessionM
const wasActiveAndKnown = sess.memberships.length > 0 && !isNewSession;
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
const nowActive = sess.memberships.length > 0;

View File

@ -10,20 +10,43 @@ import { CallMembership, DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "
import { Focus } from "./focus.ts";
import { isLivekitFocusActive } from "./LivekitFocus.ts";
import { MembershipConfig } from "./MatrixRTCSession.ts";
/**
* This interface defines what a MembershipManager uses and exposes.
* This interface is what we use to write tests and allows to change the actual implementation
* Without breaking tests because of some internal method renaming.
*
* @internal
*/
export interface IMembershipManager {
isJoined(): boolean;
join(fociPreferred: Focus[], fociActive?: Focus): void;
leave(timeout: number | undefined): Promise<boolean>;
/**
* call this if the MatrixRTC session members have changed
*/
onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise<void>;
getActiveFocus(): Focus | undefined;
}
/**
* This internal class is used by the MatrixRTCSession to manage the local user's own membership of the session.
*
* Its responsibitiy is to manage the locals user membership:
* - send that sate event
* - send the delayed leave event
* - update the delayed leave event while connected
* - update the state event when it times out (for calls longer than membershipExpiryTimeout ~ 4h)
*
* It is possible to test this class on its own. The api surface (to use for tests) is
* defined in `MembershipManagerInterface`.
*
* It is recommended to only use this interface for testing to allow replacing this class.
*
* @internal
*/
export class MembershipManager {
export class LegacyMembershipManager implements IMembershipManager {
private relativeExpiry: number | undefined;
public constructor(
private joinConfig: MembershipConfig | undefined,
private room: Room,
private client: MatrixClient,
private getOldestMembership: () => CallMembership | undefined,
) {}
private memberEventTimeout?: ReturnType<typeof setTimeout>;
/**
@ -57,29 +80,53 @@ export class MembershipManager {
8_000
);
}
private get membershipKeepAlivePeriod(): number {
return this.joinConfig?.membershipKeepAlivePeriod ?? 5_000;
}
private get callMemberEventRetryJitter(): number {
return this.joinConfig?.callMemberEventRetryJitter ?? 2_000;
}
public joinRoomSession(): void {
public constructor(
private joinConfig: MembershipConfig | undefined,
private room: Pick<Room, "getLiveTimeline" | "roomId" | "getVersion">,
private client: Pick<
MatrixClient,
| "getUserId"
| "getDeviceId"
| "sendStateEvent"
| "_unstable_sendDelayedEvent"
| "_unstable_sendDelayedStateEvent"
| "_unstable_updateDelayedEvent"
>,
private getOldestMembership: () => CallMembership | undefined,
) {}
/*
* Returns true if we intend to be participating in the MatrixRTC session.
* This is determined by checking if the relativeExpiry has been set.
*/
public isJoined(): boolean {
return this.relativeExpiry !== undefined;
}
public join(fociPreferred: Focus[], fociActive?: Focus): void {
this.ownFocusActive = fociActive;
this.ownFociPreferred = fociPreferred;
this.relativeExpiry = this.membershipExpiryTimeout;
// We don't wait for this, mostly because it may fail and schedule a retry, so this
// function returning doesn't really mean anything at all.
this.triggerCallMembershipEventUpdate();
}
public setJoined(fociPreferred: Focus[], fociActive?: Focus): void {
this.ownFocusActive = fociActive;
this.ownFociPreferred = fociPreferred;
this.relativeExpiry = this.membershipExpiryTimeout;
}
public setLeft(): void {
public async leave(timeout: number | undefined = undefined): Promise<boolean> {
this.relativeExpiry = undefined;
this.ownFocusActive = undefined;
if (this.memberEventTimeout) {
clearTimeout(this.memberEventTimeout);
this.memberEventTimeout = undefined;
}
public async leaveRoomSession(timeout: number | undefined = undefined): Promise<boolean> {
if (timeout) {
// The sleep promise returns the string 'timeout' and the membership update void
// A success implies that the membership update was quicker then the timeout.
@ -90,13 +137,38 @@ export class MembershipManager {
return true;
}
}
public stop(): void {
if (this.memberEventTimeout) {
clearTimeout(this.memberEventTimeout);
this.memberEventTimeout = undefined;
public async onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise<void> {
const isMyMembership = (m: CallMembership): boolean =>
m.sender === this.client.getUserId() && m.deviceId === this.client.getDeviceId();
if (this.isJoined() && !memberships.some(isMyMembership)) {
logger.warn("Missing own membership: force re-join");
// TODO: Should this be awaited? And is there anything to tell the focus?
return this.triggerCallMembershipEventUpdate();
}
}
public triggerCallMembershipEventUpdate = async (): Promise<void> => {
public getActiveFocus(): Focus | undefined {
if (this.ownFocusActive) {
// A livekit active focus
if (isLivekitFocusActive(this.ownFocusActive)) {
if (this.ownFocusActive.focus_selection === "oldest_membership") {
const oldestMembership = this.getOldestMembership();
return oldestMembership?.getPreferredFoci()[0];
}
} else {
logger.warn("Unknown own ActiveFocus type. This makes it impossible to connect to an SFU.");
}
} else {
// We do not understand the membership format (could be legacy). We default to oldestMembership
// Once there are other methods this is a hard error!
const oldestMembership = this.getOldestMembership();
return oldestMembership?.getPreferredFoci()[0];
}
}
private triggerCallMembershipEventUpdate = async (): Promise<void> => {
// TODO: Should this await on a shared promise?
if (this.updateCallMembershipRunning) {
this.needCallMembershipUpdate = true;
@ -121,13 +193,7 @@ export class MembershipManager {
}
return {};
}
/*
* Returns true if we intend to be participating in the MatrixRTC session.
* This is determined by checking if the relativeExpiry has been set.
*/
public isJoined(): boolean {
return this.relativeExpiry !== undefined;
}
/**
* Constructs our own membership
*/
@ -143,21 +209,7 @@ export class MembershipManager {
};
}
public getActiveFocus(): Focus | undefined {
if (this.ownFocusActive && isLivekitFocusActive(this.ownFocusActive)) {
// A livekit active focus
if (this.ownFocusActive.focus_selection === "oldest_membership") {
const oldestMembership = this.getOldestMembership();
return oldestMembership?.getPreferredFoci()[0];
}
} else {
// We do not understand the membership format (could be legacy). We default to oldestMembership
// Once there are other methods this is a hard error!
const oldestMembership = this.getOldestMembership();
return oldestMembership?.getPreferredFoci()[0];
}
}
public async updateCallMembershipEvent(): Promise<void> {
private async updateCallMembershipEvent(): Promise<void> {
if (this.memberEventTimeout) {
clearTimeout(this.memberEventTimeout);
this.memberEventTimeout = undefined;
@ -192,9 +244,7 @@ export class MembershipManager {
stateKey,
),
);
logger.log("BEFOER:", this.disconnectDelayId);
this.disconnectDelayId = res.delay_id;
logger.log("AFTER:", this.disconnectDelayId);
} catch (e) {
if (
e instanceof MatrixError &&
@ -213,6 +263,7 @@ export class MembershipManager {
logger.error("Failed to prepare delayed disconnection event:", e);
}
};
await prepareDelayedDisconnection();
// Send join event _after_ preparing the delayed disconnection event
await resendIfRateLimited(() =>