You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-08-07 23:02:56 +03:00
MatrixRTC: move MatrixRTCSession logic into LocalMembershipManager (#4608)
* split joinConfig - myMembership related properties get moved into its own interface * Add MyMembershipManager * Remove methods and functions that are from MatrixRTCSession (they now live in MyMembershipManager) * Refactor MatrixRTCSession to use myMembershipManager * fix tests * review * get rid of more memberhsip manager usage in tests * review - fix tests using private membershipManager props * fix circular import
This commit is contained in:
@@ -16,7 +16,8 @@ limitations under the License.
|
|||||||
|
|
||||||
import { encodeBase64, EventType, MatrixClient, MatrixError, MatrixEvent, Room } from "../../../src";
|
import { encodeBase64, EventType, MatrixClient, MatrixError, MatrixEvent, Room } from "../../../src";
|
||||||
import { KnownMembership } from "../../../src/@types/membership";
|
import { KnownMembership } from "../../../src/@types/membership";
|
||||||
import { SessionMembershipData, DEFAULT_EXPIRE_DURATION } from "../../../src/matrixrtc/CallMembership";
|
import { DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "../../../src/matrixrtc/CallMembership";
|
||||||
|
import { MembershipManager } from "../../../src/matrixrtc/MembershipManager";
|
||||||
import { MatrixRTCSession, MatrixRTCSessionEvent } from "../../../src/matrixrtc/MatrixRTCSession";
|
import { MatrixRTCSession, MatrixRTCSessionEvent } from "../../../src/matrixrtc/MatrixRTCSession";
|
||||||
import { EncryptionKeysEventContent } from "../../../src/matrixrtc/types";
|
import { EncryptionKeysEventContent } from "../../../src/matrixrtc/types";
|
||||||
import { randomString } from "../../../src/randomstring";
|
import { randomString } from "../../../src/randomstring";
|
||||||
@@ -235,14 +236,13 @@ describe("MatrixRTCSession", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
async function testSession(membershipData: SessionMembershipData): Promise<void> {
|
async function testSession(membershipData: SessionMembershipData): Promise<void> {
|
||||||
|
const makeNewMembershipSpy = jest.spyOn(MembershipManager.prototype as any, "makeNewMembership");
|
||||||
sess = MatrixRTCSession.roomSessionForRoom(client, makeMockRoom(membershipData));
|
sess = MatrixRTCSession.roomSessionForRoom(client, makeMockRoom(membershipData));
|
||||||
|
|
||||||
const makeNewMembershipMock = jest.spyOn(sess as any, "makeNewMembership");
|
|
||||||
|
|
||||||
sess.joinRoomSession([mockFocus], mockFocus, joinSessionConfig);
|
sess.joinRoomSession([mockFocus], mockFocus, joinSessionConfig);
|
||||||
await Promise.race([sentStateEvent, new Promise((resolve) => setTimeout(resolve, 500))]);
|
await Promise.race([sentStateEvent, new Promise((resolve) => setTimeout(resolve, 500))]);
|
||||||
|
|
||||||
expect(makeNewMembershipMock).toHaveBeenCalledTimes(1);
|
expect(makeNewMembershipSpy).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
await Promise.race([sentDelayedState, new Promise((resolve) => setTimeout(resolve, 500))]);
|
await Promise.race([sentDelayedState, new Promise((resolve) => setTimeout(resolve, 500))]);
|
||||||
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
|
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
|
||||||
@@ -422,7 +422,6 @@ describe("MatrixRTCSession", () => {
|
|||||||
type: "livekit",
|
type: "livekit",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
"_@alice:example.org_AAAAAAA",
|
"_@alice:example.org_AAAAAAA",
|
||||||
);
|
);
|
||||||
await Promise.race([sentDelayedState, new Promise((resolve) => realSetTimeout(resolve, 500))]);
|
await Promise.race([sentDelayedState, new Promise((resolve) => realSetTimeout(resolve, 500))]);
|
||||||
@@ -454,6 +453,7 @@ describe("MatrixRTCSession", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const userStateKey = `${!useOwnedStateEvents ? "_" : ""}@alice:example.org_AAAAAAA`;
|
||||||
// preparing the delayed disconnect should handle ratelimiting
|
// preparing the delayed disconnect should handle ratelimiting
|
||||||
const sendDelayedStateAttempt = new Promise<void>((resolve) => {
|
const sendDelayedStateAttempt = new Promise<void>((resolve) => {
|
||||||
const error = new MatrixError({ errcode: "M_LIMIT_EXCEEDED" });
|
const error = new MatrixError({ errcode: "M_LIMIT_EXCEEDED" });
|
||||||
@@ -478,24 +478,30 @@ describe("MatrixRTCSession", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
// needed to advance the mock timers properly
|
|
||||||
const scheduledDelayDisconnection = new Promise<void>((resolve) => {
|
|
||||||
const originalFn: () => void = (sess as any).scheduleDelayDisconnection;
|
|
||||||
(sess as any).scheduleDelayDisconnection = jest.fn(() => {
|
|
||||||
originalFn.call(sess);
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
sess!.joinRoomSession([activeFocusConfig], activeFocus, {
|
sess!.joinRoomSession([activeFocusConfig], activeFocus, {
|
||||||
membershipServerSideExpiryTimeout: 9000,
|
membershipServerSideExpiryTimeout: 9000,
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(sess).toHaveProperty("membershipServerSideExpiryTimeout", 9000);
|
// needed to advance the mock timers properly
|
||||||
|
// depends on myMembershipManager being created
|
||||||
|
const scheduledDelayDisconnection = new Promise<void>((resolve) => {
|
||||||
|
const membershipManager = (sess as any).membershipManager;
|
||||||
|
const originalFn: () => void = membershipManager.scheduleDelayDisconnection;
|
||||||
|
membershipManager.scheduleDelayDisconnection = jest.fn(() => {
|
||||||
|
originalFn.call(membershipManager);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
await sendDelayedStateExceedAttempt.then(); // needed to resolve after the send attempt catches
|
await sendDelayedStateExceedAttempt.then(); // needed to resolve after the send attempt catches
|
||||||
expect(sess).toHaveProperty("membershipServerSideExpiryTimeout", 7500);
|
|
||||||
|
|
||||||
await sendDelayedStateAttempt;
|
await sendDelayedStateAttempt;
|
||||||
|
const callProps = (d: number) => {
|
||||||
|
return [mockRoom!.roomId, { delay: d }, "org.matrix.msc3401.call.member", {}, userStateKey];
|
||||||
|
};
|
||||||
|
expect(client._unstable_sendDelayedStateEvent).toHaveBeenNthCalledWith(1, ...callProps(9000));
|
||||||
|
expect(client._unstable_sendDelayedStateEvent).toHaveBeenNthCalledWith(2, ...callProps(7500));
|
||||||
|
|
||||||
jest.advanceTimersByTime(5000);
|
jest.advanceTimersByTime(5000);
|
||||||
|
|
||||||
await sendStateEventAttempt.then(); // needed to resolve after resendIfRateLimited catches
|
await sendStateEventAttempt.then(); // needed to resolve after resendIfRateLimited catches
|
||||||
@@ -514,7 +520,7 @@ describe("MatrixRTCSession", () => {
|
|||||||
foci_preferred: [activeFocusConfig],
|
foci_preferred: [activeFocusConfig],
|
||||||
focus_active: activeFocus,
|
focus_active: activeFocus,
|
||||||
} satisfies SessionMembershipData,
|
} satisfies SessionMembershipData,
|
||||||
`${!useOwnedStateEvents ? "_" : ""}@alice:example.org_AAAAAAA`,
|
userStateKey,
|
||||||
);
|
);
|
||||||
await sentDelayedState;
|
await sentDelayedState;
|
||||||
|
|
||||||
|
@@ -20,18 +20,16 @@ import { EventTimeline } from "../models/event-timeline.ts";
|
|||||||
import { Room } from "../models/room.ts";
|
import { Room } from "../models/room.ts";
|
||||||
import { MatrixClient } from "../client.ts";
|
import { MatrixClient } from "../client.ts";
|
||||||
import { EventType } from "../@types/event.ts";
|
import { EventType } from "../@types/event.ts";
|
||||||
import { UpdateDelayedEventAction } from "../@types/requests.ts";
|
import { CallMembership } from "./CallMembership.ts";
|
||||||
import { CallMembership, DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "./CallMembership.ts";
|
|
||||||
import { RoomStateEvent } from "../models/room-state.ts";
|
import { RoomStateEvent } from "../models/room-state.ts";
|
||||||
import { Focus } from "./focus.ts";
|
import { Focus } from "./focus.ts";
|
||||||
import { secureRandomBase64Url } from "../randomstring.ts";
|
import { secureRandomBase64Url } from "../randomstring.ts";
|
||||||
import { EncryptionKeysEventContent } from "./types.ts";
|
import { EncryptionKeysEventContent } from "./types.ts";
|
||||||
import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts";
|
import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts";
|
||||||
import { KnownMembership } from "../@types/membership.ts";
|
import { KnownMembership } from "../@types/membership.ts";
|
||||||
import { HTTPError, MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
|
import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
|
||||||
import { MatrixEvent } from "../models/event.ts";
|
import { MatrixEvent } from "../models/event.ts";
|
||||||
import { isLivekitFocusActive } from "./LivekitFocus.ts";
|
import { MembershipManager } from "./MembershipManager.ts";
|
||||||
import { sleep } from "../utils.ts";
|
|
||||||
|
|
||||||
const logger = rootLogger.getChild("MatrixRTCSession");
|
const logger = rootLogger.getChild("MatrixRTCSession");
|
||||||
|
|
||||||
@@ -67,14 +65,7 @@ export type MatrixRTCSessionEventHandlerMap = {
|
|||||||
) => void;
|
) => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
export interface JoinSessionConfig {
|
export interface MembershipConfig {
|
||||||
/**
|
|
||||||
* If true, generate and share a media key for this participant,
|
|
||||||
* and emit MatrixRTCSessionEvent.EncryptionKeyChanged when
|
|
||||||
* media keys for other participants become available.
|
|
||||||
*/
|
|
||||||
manageMediaKeys?: boolean;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The timeout (in milliseconds) after we joined the call, that our membership should expire
|
* The timeout (in milliseconds) after we joined the call, that our membership should expire
|
||||||
* unless we have explicitly updated it.
|
* unless we have explicitly updated it.
|
||||||
@@ -94,24 +85,38 @@ export interface JoinSessionConfig {
|
|||||||
callMemberEventRetryDelayMinimum?: number;
|
callMemberEventRetryDelayMinimum?: number;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The jitter (in milliseconds) which is added to callMemberEventRetryDelayMinimum before retrying
|
* The timeout (in milliseconds) with which the deleayed leave event on the server is configured.
|
||||||
* sending the membership event. e.g. if this is set to 1000, then a random delay of between 0 and 1000
|
* After this time the server will set the event to the disconnected stat if it has not received a keep-alive from the client.
|
||||||
* milliseconds will be added.
|
*/
|
||||||
|
membershipServerSideExpiryTimeout?: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The interval (in milliseconds) in which the client will send membership keep-alives to the server.
|
||||||
|
*/
|
||||||
|
membershipKeepAlivePeriod?: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated It should be possible to make it stable without this.
|
||||||
*/
|
*/
|
||||||
callMemberEventRetryJitter?: number;
|
callMemberEventRetryJitter?: number;
|
||||||
|
}
|
||||||
|
export interface EncryptionConfig {
|
||||||
|
/**
|
||||||
|
* If true, generate and share a media key for this participant,
|
||||||
|
* and emit MatrixRTCSessionEvent.EncryptionKeyChanged when
|
||||||
|
* media keys for other participants become available.
|
||||||
|
*/
|
||||||
|
manageMediaKeys?: boolean;
|
||||||
/**
|
/**
|
||||||
* The minimum time (in milliseconds) between each attempt to send encryption key(s).
|
* The minimum time (in milliseconds) between each attempt to send encryption key(s).
|
||||||
* e.g. if this is set to 1000, then we will send at most one key event every second.
|
* e.g. if this is set to 1000, then we will send at most one key event every second.
|
||||||
*/
|
*/
|
||||||
updateEncryptionKeyThrottle?: number;
|
updateEncryptionKeyThrottle?: number;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The delay (in milliseconds) after a member leaves before we create and publish a new key, because people
|
* The delay (in milliseconds) after a member leaves before we create and publish a new key, because people
|
||||||
* tend to leave calls at the same time.
|
* tend to leave calls at the same time.
|
||||||
*/
|
*/
|
||||||
makeKeyDelay?: number;
|
makeKeyDelay?: number;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The delay (in milliseconds) between creating and sending a new key and starting to encrypt with it. This
|
* The delay (in milliseconds) between creating and sending a new key and starting to encrypt with it. This
|
||||||
* gives other a chance to receive the new key to minimise the chance they don't get media they can't decrypt.
|
* gives other a chance to receive the new key to minimise the chance they don't get media they can't decrypt.
|
||||||
@@ -119,40 +124,22 @@ export interface JoinSessionConfig {
|
|||||||
* makeKeyDelay + useKeyDelay
|
* makeKeyDelay + useKeyDelay
|
||||||
*/
|
*/
|
||||||
useKeyDelay?: number;
|
useKeyDelay?: number;
|
||||||
|
|
||||||
/**
|
|
||||||
* The timeout (in milliseconds) after which the server will consider the membership to have expired if it
|
|
||||||
* has not received a keep-alive from the client.
|
|
||||||
*/
|
|
||||||
membershipServerSideExpiryTimeout?: number;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The period (in milliseconds) that the client will send membership keep-alives to the server.
|
|
||||||
*/
|
|
||||||
membershipKeepAlivePeriod?: number;
|
|
||||||
}
|
}
|
||||||
|
export type JoinSessionConfig = MembershipConfig & EncryptionConfig;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A MatrixRTCSession manages the membership & properties of a MatrixRTC session.
|
* A MatrixRTCSession manages the membership & properties of a MatrixRTC session.
|
||||||
* This class doesn't deal with media at all, just membership & properties of a session.
|
* This class doesn't deal with media at all, just membership & properties of a session.
|
||||||
*/
|
*/
|
||||||
export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, MatrixRTCSessionEventHandlerMap> {
|
export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, MatrixRTCSessionEventHandlerMap> {
|
||||||
|
private membershipManager?: MembershipManager;
|
||||||
|
|
||||||
// The session Id of the call, this is the call_id of the call Member event.
|
// The session Id of the call, this is the call_id of the call Member event.
|
||||||
private _callId: string | undefined;
|
private _callId: string | undefined;
|
||||||
|
|
||||||
private relativeExpiry: number | undefined;
|
|
||||||
|
|
||||||
// undefined means not yet joined
|
// undefined means not yet joined
|
||||||
private joinConfig?: JoinSessionConfig;
|
private joinConfig?: JoinSessionConfig;
|
||||||
|
|
||||||
private get membershipExpiryTimeout(): number {
|
|
||||||
return this.joinConfig?.membershipExpiryTimeout ?? DEFAULT_EXPIRE_DURATION;
|
|
||||||
}
|
|
||||||
|
|
||||||
private get callMemberEventRetryDelayMinimum(): number {
|
|
||||||
return this.joinConfig?.callMemberEventRetryDelayMinimum ?? 3_000;
|
|
||||||
}
|
|
||||||
|
|
||||||
private get updateEncryptionKeyThrottle(): number {
|
private get updateEncryptionKeyThrottle(): number {
|
||||||
return this.joinConfig?.updateEncryptionKeyThrottle ?? 3_000;
|
return this.joinConfig?.updateEncryptionKeyThrottle ?? 3_000;
|
||||||
}
|
}
|
||||||
@@ -165,49 +152,16 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
|
|||||||
return this.joinConfig?.useKeyDelay ?? 5_000;
|
return this.joinConfig?.useKeyDelay ?? 5_000;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* If the server disallows the configured {@link membershipServerSideExpiryTimeout},
|
|
||||||
* this stores a delay that the server does allow.
|
|
||||||
*/
|
|
||||||
private membershipServerSideExpiryTimeoutOverride?: number;
|
|
||||||
|
|
||||||
private get membershipServerSideExpiryTimeout(): number {
|
|
||||||
return (
|
|
||||||
this.membershipServerSideExpiryTimeoutOverride ??
|
|
||||||
this.joinConfig?.membershipServerSideExpiryTimeout ??
|
|
||||||
8_000
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private get membershipKeepAlivePeriod(): number {
|
|
||||||
return this.joinConfig?.membershipKeepAlivePeriod ?? 5_000;
|
|
||||||
}
|
|
||||||
|
|
||||||
private get callMemberEventRetryJitter(): number {
|
|
||||||
return this.joinConfig?.callMemberEventRetryJitter ?? 2_000;
|
|
||||||
}
|
|
||||||
|
|
||||||
private memberEventTimeout?: ReturnType<typeof setTimeout>;
|
|
||||||
private expiryTimeout?: ReturnType<typeof setTimeout>;
|
private expiryTimeout?: ReturnType<typeof setTimeout>;
|
||||||
private keysEventUpdateTimeout?: ReturnType<typeof setTimeout>;
|
private keysEventUpdateTimeout?: ReturnType<typeof setTimeout>;
|
||||||
private makeNewKeyTimeout?: ReturnType<typeof setTimeout>;
|
private makeNewKeyTimeout?: ReturnType<typeof setTimeout>;
|
||||||
private setNewKeyTimeouts = new Set<ReturnType<typeof setTimeout>>();
|
private setNewKeyTimeouts = new Set<ReturnType<typeof setTimeout>>();
|
||||||
|
|
||||||
// This is a Focus with the specified fields for an ActiveFocus (e.g. LivekitFocusActive for type="livekit")
|
|
||||||
private ownFocusActive?: Focus;
|
|
||||||
// This is a Foci array that contains the Focus objects this user is aware of and proposes to use.
|
|
||||||
private ownFociPreferred?: Focus[];
|
|
||||||
|
|
||||||
private updateCallMembershipRunning = false;
|
|
||||||
private needCallMembershipUpdate = false;
|
|
||||||
|
|
||||||
private manageMediaKeys = false;
|
private manageMediaKeys = false;
|
||||||
// userId:deviceId => array of (key, timestamp)
|
// userId:deviceId => array of (key, timestamp)
|
||||||
private encryptionKeys = new Map<string, Array<{ key: Uint8Array; timestamp: number }>>();
|
private encryptionKeys = new Map<string, Array<{ key: Uint8Array; timestamp: number }>>();
|
||||||
private lastEncryptionKeyUpdateRequest?: number;
|
private lastEncryptionKeyUpdateRequest?: number;
|
||||||
|
|
||||||
private disconnectDelayId: string | undefined;
|
|
||||||
|
|
||||||
// We use this to store the last membership fingerprints we saw, so we can proactively re-send encryption keys
|
// We use this to store the last membership fingerprints we saw, so we can proactively re-send encryption keys
|
||||||
// if it looks like a membership has been updated.
|
// if it looks like a membership has been updated.
|
||||||
private lastMembershipFingerprints: Set<string> | undefined;
|
private lastMembershipFingerprints: Set<string> | undefined;
|
||||||
@@ -338,22 +292,19 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
|
|||||||
* This is determined by checking if the relativeExpiry has been set.
|
* This is determined by checking if the relativeExpiry has been set.
|
||||||
*/
|
*/
|
||||||
public isJoined(): boolean {
|
public isJoined(): boolean {
|
||||||
return this.relativeExpiry !== undefined;
|
return this.membershipManager?.isJoined() ?? false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs cleanup & removes timers for client shutdown
|
* Performs cleanup & removes timers for client shutdown
|
||||||
*/
|
*/
|
||||||
public async stop(): Promise<void> {
|
public async stop(): Promise<void> {
|
||||||
await this.leaveRoomSession(1000);
|
await this.membershipManager?.leaveRoomSession(1000);
|
||||||
if (this.expiryTimeout) {
|
if (this.expiryTimeout) {
|
||||||
clearTimeout(this.expiryTimeout);
|
clearTimeout(this.expiryTimeout);
|
||||||
this.expiryTimeout = undefined;
|
this.expiryTimeout = undefined;
|
||||||
}
|
}
|
||||||
if (this.memberEventTimeout) {
|
this.membershipManager?.stop();
|
||||||
clearTimeout(this.memberEventTimeout);
|
|
||||||
this.memberEventTimeout = undefined;
|
|
||||||
}
|
|
||||||
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
|
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
|
||||||
roomState?.off(RoomStateEvent.Members, this.onMembershipUpdate);
|
roomState?.off(RoomStateEvent.Members, this.onMembershipUpdate);
|
||||||
}
|
}
|
||||||
@@ -376,22 +327,21 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
|
|||||||
if (this.isJoined()) {
|
if (this.isJoined()) {
|
||||||
logger.info(`Already joined to session in room ${this.room.roomId}: ignoring join call`);
|
logger.info(`Already joined to session in room ${this.room.roomId}: ignoring join call`);
|
||||||
return;
|
return;
|
||||||
|
} else {
|
||||||
|
this.membershipManager = new MembershipManager(joinConfig, this.room, this.client, () =>
|
||||||
|
this.getOldestMembership(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.ownFocusActive = fociActive;
|
|
||||||
this.ownFociPreferred = fociPreferred;
|
|
||||||
this.joinConfig = joinConfig;
|
this.joinConfig = joinConfig;
|
||||||
this.relativeExpiry = this.membershipExpiryTimeout;
|
|
||||||
this.manageMediaKeys = joinConfig?.manageMediaKeys ?? this.manageMediaKeys;
|
this.manageMediaKeys = joinConfig?.manageMediaKeys ?? this.manageMediaKeys;
|
||||||
|
// TODO: it feels wrong to be doing `setJoined()` and then `joinRoomSession()` non-atomically
|
||||||
logger.info(`Joining call session in room ${this.room.roomId} with manageMediaKeys=${this.manageMediaKeys}`);
|
// A new api between MembershipManager and the session will need to be defined.
|
||||||
|
this.membershipManager.setJoined(fociPreferred, fociActive);
|
||||||
if (joinConfig?.manageMediaKeys) {
|
if (joinConfig?.manageMediaKeys) {
|
||||||
this.makeNewSenderKey();
|
this.makeNewSenderKey();
|
||||||
this.requestSendCurrentKey();
|
this.requestSendCurrentKey();
|
||||||
}
|
}
|
||||||
// We don't wait for this, mostly because it may fail and schedule a retry, so this
|
this.membershipManager.joinRoomSession();
|
||||||
// function returning doesn't really mean anything at all.
|
|
||||||
this.triggerCallMembershipEventUpdate();
|
|
||||||
this.emit(MatrixRTCSessionEvent.JoinStateChanged, true);
|
this.emit(MatrixRTCSessionEvent.JoinStateChanged, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -433,35 +383,14 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
|
|||||||
|
|
||||||
logger.info(`Leaving call session in room ${this.room.roomId}`);
|
logger.info(`Leaving call session in room ${this.room.roomId}`);
|
||||||
this.joinConfig = undefined;
|
this.joinConfig = undefined;
|
||||||
this.relativeExpiry = undefined;
|
this.membershipManager!.setLeft();
|
||||||
this.ownFocusActive = undefined;
|
|
||||||
this.manageMediaKeys = false;
|
this.manageMediaKeys = false;
|
||||||
this.emit(MatrixRTCSessionEvent.JoinStateChanged, false);
|
this.emit(MatrixRTCSessionEvent.JoinStateChanged, false);
|
||||||
|
return await this.membershipManager!.leaveRoomSession(timeout);
|
||||||
if (timeout) {
|
|
||||||
// The sleep promise returns the string 'timeout' and the membership update void
|
|
||||||
// A success implies that the membership update was quicker then the timeout.
|
|
||||||
const raceResult = await Promise.race([this.triggerCallMembershipEventUpdate(), sleep(timeout, "timeout")]);
|
|
||||||
return raceResult !== "timeout";
|
|
||||||
} else {
|
|
||||||
await this.triggerCallMembershipEventUpdate();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public getActiveFocus(): Focus | undefined {
|
public getActiveFocus(): Focus | undefined {
|
||||||
if (this.ownFocusActive && isLivekitFocusActive(this.ownFocusActive)) {
|
return this.membershipManager?.getActiveFocus();
|
||||||
// A livekit active focus
|
|
||||||
if (this.ownFocusActive.focus_selection === "oldest_membership") {
|
|
||||||
const oldestMembership = this.getOldestMembership();
|
|
||||||
return oldestMembership?.getPreferredFoci()[0];
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// We do not understand the membership format (could be legacy). We default to oldestMembership
|
|
||||||
// Once there are other methods this is a hard error!
|
|
||||||
const oldestMembership = this.getOldestMembership();
|
|
||||||
return oldestMembership?.getPreferredFoci()[0];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -838,7 +767,7 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
|
|||||||
if (this.isJoined() && !this.memberships.some(this.isMyMembership)) {
|
if (this.isJoined() && !this.memberships.some(this.isMyMembership)) {
|
||||||
logger.warn("Missing own membership: force re-join");
|
logger.warn("Missing own membership: force re-join");
|
||||||
// TODO: Should this be awaited? And is there anything to tell the focus?
|
// TODO: Should this be awaited? And is there anything to tell the focus?
|
||||||
this.triggerCallMembershipEventUpdate();
|
this.membershipManager?.triggerCallMembershipEventUpdate();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -896,195 +825,6 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructs our own membership
|
|
||||||
*/
|
|
||||||
private makeMyMembership(deviceId: string): SessionMembershipData {
|
|
||||||
return {
|
|
||||||
call_id: "",
|
|
||||||
scope: "m.room",
|
|
||||||
application: "m.call",
|
|
||||||
device_id: deviceId,
|
|
||||||
expires: this.relativeExpiry,
|
|
||||||
focus_active: { type: "livekit", focus_selection: "oldest_membership" },
|
|
||||||
foci_preferred: this.ownFociPreferred ?? [],
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private makeNewMembership(deviceId: string): SessionMembershipData | {} {
|
|
||||||
// If we're joined, add our own
|
|
||||||
if (this.isJoined()) {
|
|
||||||
return this.makeMyMembership(deviceId);
|
|
||||||
}
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
private triggerCallMembershipEventUpdate = async (): Promise<void> => {
|
|
||||||
// TODO: Should this await on a shared promise?
|
|
||||||
if (this.updateCallMembershipRunning) {
|
|
||||||
this.needCallMembershipUpdate = true;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.updateCallMembershipRunning = true;
|
|
||||||
try {
|
|
||||||
// if anything triggers an update while the update is running, do another update afterwards
|
|
||||||
do {
|
|
||||||
this.needCallMembershipUpdate = false;
|
|
||||||
await this.updateCallMembershipEvent();
|
|
||||||
} while (this.needCallMembershipUpdate);
|
|
||||||
} finally {
|
|
||||||
this.updateCallMembershipRunning = false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private async updateCallMembershipEvent(): Promise<void> {
|
|
||||||
if (this.memberEventTimeout) {
|
|
||||||
clearTimeout(this.memberEventTimeout);
|
|
||||||
this.memberEventTimeout = undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
|
|
||||||
if (!roomState) throw new Error("Couldn't get room state for room " + this.room.roomId);
|
|
||||||
|
|
||||||
const localUserId = this.client.getUserId();
|
|
||||||
const localDeviceId = this.client.getDeviceId();
|
|
||||||
if (!localUserId || !localDeviceId) throw new Error("User ID or device ID was null!");
|
|
||||||
|
|
||||||
let newContent: {} | SessionMembershipData = {};
|
|
||||||
// TODO: implement expiry logic to MSC4143 events
|
|
||||||
// previously we checked here if the event is timed out and scheduled a check if not.
|
|
||||||
// maybe there is a better way.
|
|
||||||
newContent = this.makeNewMembership(localDeviceId);
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (this.isJoined()) {
|
|
||||||
const stateKey = this.makeMembershipStateKey(localUserId, localDeviceId);
|
|
||||||
const prepareDelayedDisconnection = async (): Promise<void> => {
|
|
||||||
try {
|
|
||||||
const res = await resendIfRateLimited(() =>
|
|
||||||
this.client._unstable_sendDelayedStateEvent(
|
|
||||||
this.room.roomId,
|
|
||||||
{
|
|
||||||
delay: this.membershipServerSideExpiryTimeout,
|
|
||||||
},
|
|
||||||
EventType.GroupCallMemberPrefix,
|
|
||||||
{}, // leave event
|
|
||||||
stateKey,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
this.disconnectDelayId = res.delay_id;
|
|
||||||
} catch (e) {
|
|
||||||
if (
|
|
||||||
e instanceof MatrixError &&
|
|
||||||
e.errcode === "M_UNKNOWN" &&
|
|
||||||
e.data["org.matrix.msc4140.errcode"] === "M_MAX_DELAY_EXCEEDED"
|
|
||||||
) {
|
|
||||||
const maxDelayAllowed = e.data["org.matrix.msc4140.max_delay"];
|
|
||||||
if (
|
|
||||||
typeof maxDelayAllowed === "number" &&
|
|
||||||
this.membershipServerSideExpiryTimeout > maxDelayAllowed
|
|
||||||
) {
|
|
||||||
this.membershipServerSideExpiryTimeoutOverride = maxDelayAllowed;
|
|
||||||
return prepareDelayedDisconnection();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.error("Failed to prepare delayed disconnection event:", e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
await prepareDelayedDisconnection();
|
|
||||||
// Send join event _after_ preparing the delayed disconnection event
|
|
||||||
await resendIfRateLimited(() =>
|
|
||||||
this.client.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, newContent, stateKey),
|
|
||||||
);
|
|
||||||
// If sending state cancels your own delayed state, prepare another delayed state
|
|
||||||
// TODO: Remove this once MSC4140 is stable & doesn't cancel own delayed state
|
|
||||||
if (this.disconnectDelayId !== undefined) {
|
|
||||||
try {
|
|
||||||
const knownDisconnectDelayId = this.disconnectDelayId;
|
|
||||||
await resendIfRateLimited(() =>
|
|
||||||
this.client._unstable_updateDelayedEvent(
|
|
||||||
knownDisconnectDelayId,
|
|
||||||
UpdateDelayedEventAction.Restart,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
} catch (e) {
|
|
||||||
if (e instanceof MatrixError && e.errcode === "M_NOT_FOUND") {
|
|
||||||
// If we get a M_NOT_FOUND we prepare a new delayed event.
|
|
||||||
// In other error cases we do not want to prepare anything since we do not have the guarantee, that the
|
|
||||||
// future is not still running.
|
|
||||||
logger.warn("Failed to update delayed disconnection event, prepare it again:", e);
|
|
||||||
this.disconnectDelayId = undefined;
|
|
||||||
await prepareDelayedDisconnection();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (this.disconnectDelayId !== undefined) {
|
|
||||||
this.scheduleDelayDisconnection();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Not joined
|
|
||||||
let sentDelayedDisconnect = false;
|
|
||||||
if (this.disconnectDelayId !== undefined) {
|
|
||||||
try {
|
|
||||||
const knownDisconnectDelayId = this.disconnectDelayId;
|
|
||||||
await resendIfRateLimited(() =>
|
|
||||||
this.client._unstable_updateDelayedEvent(
|
|
||||||
knownDisconnectDelayId,
|
|
||||||
UpdateDelayedEventAction.Send,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
sentDelayedDisconnect = true;
|
|
||||||
} catch (e) {
|
|
||||||
logger.error("Failed to send our delayed disconnection event:", e);
|
|
||||||
}
|
|
||||||
this.disconnectDelayId = undefined;
|
|
||||||
}
|
|
||||||
if (!sentDelayedDisconnect) {
|
|
||||||
await resendIfRateLimited(() =>
|
|
||||||
this.client.sendStateEvent(
|
|
||||||
this.room.roomId,
|
|
||||||
EventType.GroupCallMemberPrefix,
|
|
||||||
{},
|
|
||||||
this.makeMembershipStateKey(localUserId, localDeviceId),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.info("Sent updated call member event.");
|
|
||||||
} catch (e) {
|
|
||||||
const resendDelay = this.callMemberEventRetryDelayMinimum + Math.random() * this.callMemberEventRetryJitter;
|
|
||||||
logger.warn(`Failed to send call member event (retrying in ${resendDelay}): ${e}`);
|
|
||||||
await sleep(resendDelay);
|
|
||||||
await this.triggerCallMembershipEventUpdate();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private scheduleDelayDisconnection(): void {
|
|
||||||
this.memberEventTimeout = setTimeout(this.delayDisconnection, this.membershipKeepAlivePeriod);
|
|
||||||
}
|
|
||||||
|
|
||||||
private readonly delayDisconnection = async (): Promise<void> => {
|
|
||||||
try {
|
|
||||||
const knownDisconnectDelayId = this.disconnectDelayId!;
|
|
||||||
await resendIfRateLimited(() =>
|
|
||||||
this.client._unstable_updateDelayedEvent(knownDisconnectDelayId, UpdateDelayedEventAction.Restart),
|
|
||||||
);
|
|
||||||
this.scheduleDelayDisconnection();
|
|
||||||
} catch (e) {
|
|
||||||
logger.error("Failed to delay our disconnection event:", e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private makeMembershipStateKey(localUserId: string, localDeviceId: string): string {
|
|
||||||
const stateKey = `${localUserId}_${localDeviceId}`;
|
|
||||||
if (/^org\.matrix\.msc(3757|3779)\b/.exec(this.room.getVersion())) {
|
|
||||||
return stateKey;
|
|
||||||
} else {
|
|
||||||
return `_${stateKey}`;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private onRotateKeyTimeout = (): void => {
|
private onRotateKeyTimeout = (): void => {
|
||||||
if (!this.manageMediaKeys) return;
|
if (!this.manageMediaKeys) return;
|
||||||
|
|
||||||
@@ -1096,31 +836,3 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
|
|||||||
this.sendEncryptionKeysEvent(newKeyIndex);
|
this.sendEncryptionKeysEvent(newKeyIndex);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async function resendIfRateLimited<T>(func: () => Promise<T>, numRetriesAllowed: number = 1): Promise<T> {
|
|
||||||
// eslint-disable-next-line no-constant-condition
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
return await func();
|
|
||||||
} catch (e) {
|
|
||||||
if (numRetriesAllowed > 0 && e instanceof HTTPError && e.isRateLimitError()) {
|
|
||||||
numRetriesAllowed--;
|
|
||||||
let resendDelay: number;
|
|
||||||
const defaultMs = 5000;
|
|
||||||
try {
|
|
||||||
resendDelay = e.getRetryAfterMs() ?? defaultMs;
|
|
||||||
logger.info(`Rate limited by server, retrying in ${resendDelay}ms`);
|
|
||||||
} catch (e) {
|
|
||||||
logger.warn(
|
|
||||||
`Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`,
|
|
||||||
e,
|
|
||||||
);
|
|
||||||
resendDelay = defaultMs;
|
|
||||||
}
|
|
||||||
await sleep(resendDelay);
|
|
||||||
} else {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
336
src/matrixrtc/MembershipManager.ts
Normal file
336
src/matrixrtc/MembershipManager.ts
Normal file
@@ -0,0 +1,336 @@
|
|||||||
|
import { EventType } from "../@types/event.ts";
|
||||||
|
import { UpdateDelayedEventAction } from "../@types/requests.ts";
|
||||||
|
import type { MatrixClient } from "../client.ts";
|
||||||
|
import { HTTPError, MatrixError } from "../http-api/errors.ts";
|
||||||
|
import { logger } from "../logger.ts";
|
||||||
|
import { EventTimeline } from "../models/event-timeline.ts";
|
||||||
|
import { Room } from "../models/room.ts";
|
||||||
|
import { sleep } from "../utils.ts";
|
||||||
|
import { CallMembership, DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "./CallMembership.ts";
|
||||||
|
import { Focus } from "./focus.ts";
|
||||||
|
import { isLivekitFocusActive } from "./LivekitFocus.ts";
|
||||||
|
import { MembershipConfig } from "./MatrixRTCSession.ts";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This internal class is used by the MatrixRTCSession to manage the local user's own membership of the session.
|
||||||
|
* @internal
|
||||||
|
*/
|
||||||
|
export class MembershipManager {
|
||||||
|
private relativeExpiry: number | undefined;
|
||||||
|
|
||||||
|
public constructor(
|
||||||
|
private joinConfig: MembershipConfig | undefined,
|
||||||
|
private room: Room,
|
||||||
|
private client: MatrixClient,
|
||||||
|
private getOldestMembership: () => CallMembership | undefined,
|
||||||
|
) {}
|
||||||
|
private memberEventTimeout?: ReturnType<typeof setTimeout>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a Foci array that contains the Focus objects this user is aware of and proposes to use.
|
||||||
|
*/
|
||||||
|
private ownFociPreferred?: Focus[];
|
||||||
|
/**
|
||||||
|
* This is a Focus with the specified fields for an ActiveFocus (e.g. LivekitFocusActive for type="livekit")
|
||||||
|
*/
|
||||||
|
private ownFocusActive?: Focus;
|
||||||
|
|
||||||
|
private updateCallMembershipRunning = false;
|
||||||
|
private needCallMembershipUpdate = false;
|
||||||
|
/**
|
||||||
|
* If the server disallows the configured {@link membershipServerSideExpiryTimeout},
|
||||||
|
* this stores a delay that the server does allow.
|
||||||
|
*/
|
||||||
|
private membershipServerSideExpiryTimeoutOverride?: number;
|
||||||
|
private disconnectDelayId: string | undefined;
|
||||||
|
|
||||||
|
private get callMemberEventRetryDelayMinimum(): number {
|
||||||
|
return this.joinConfig?.callMemberEventRetryDelayMinimum ?? 3_000;
|
||||||
|
}
|
||||||
|
private get membershipExpiryTimeout(): number {
|
||||||
|
return this.joinConfig?.membershipExpiryTimeout ?? DEFAULT_EXPIRE_DURATION;
|
||||||
|
}
|
||||||
|
private get membershipServerSideExpiryTimeout(): number {
|
||||||
|
return (
|
||||||
|
this.membershipServerSideExpiryTimeoutOverride ??
|
||||||
|
this.joinConfig?.membershipServerSideExpiryTimeout ??
|
||||||
|
8_000
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private get membershipKeepAlivePeriod(): number {
|
||||||
|
return this.joinConfig?.membershipKeepAlivePeriod ?? 5_000;
|
||||||
|
}
|
||||||
|
|
||||||
|
private get callMemberEventRetryJitter(): number {
|
||||||
|
return this.joinConfig?.callMemberEventRetryJitter ?? 2_000;
|
||||||
|
}
|
||||||
|
public joinRoomSession(): void {
|
||||||
|
// We don't wait for this, mostly because it may fail and schedule a retry, so this
|
||||||
|
// function returning doesn't really mean anything at all.
|
||||||
|
this.triggerCallMembershipEventUpdate();
|
||||||
|
}
|
||||||
|
public setJoined(fociPreferred: Focus[], fociActive?: Focus): void {
|
||||||
|
this.ownFocusActive = fociActive;
|
||||||
|
this.ownFociPreferred = fociPreferred;
|
||||||
|
this.relativeExpiry = this.membershipExpiryTimeout;
|
||||||
|
}
|
||||||
|
public setLeft(): void {
|
||||||
|
this.relativeExpiry = undefined;
|
||||||
|
this.ownFocusActive = undefined;
|
||||||
|
}
|
||||||
|
public async leaveRoomSession(timeout: number | undefined = undefined): Promise<boolean> {
|
||||||
|
if (timeout) {
|
||||||
|
// The sleep promise returns the string 'timeout' and the membership update void
|
||||||
|
// A success implies that the membership update was quicker then the timeout.
|
||||||
|
const raceResult = await Promise.race([this.triggerCallMembershipEventUpdate(), sleep(timeout, "timeout")]);
|
||||||
|
return raceResult !== "timeout";
|
||||||
|
} else {
|
||||||
|
await this.triggerCallMembershipEventUpdate();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public stop(): void {
|
||||||
|
if (this.memberEventTimeout) {
|
||||||
|
clearTimeout(this.memberEventTimeout);
|
||||||
|
this.memberEventTimeout = undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public triggerCallMembershipEventUpdate = async (): Promise<void> => {
|
||||||
|
// TODO: Should this await on a shared promise?
|
||||||
|
if (this.updateCallMembershipRunning) {
|
||||||
|
this.needCallMembershipUpdate = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.updateCallMembershipRunning = true;
|
||||||
|
try {
|
||||||
|
// if anything triggers an update while the update is running, do another update afterwards
|
||||||
|
do {
|
||||||
|
this.needCallMembershipUpdate = false;
|
||||||
|
await this.updateCallMembershipEvent();
|
||||||
|
} while (this.needCallMembershipUpdate);
|
||||||
|
} finally {
|
||||||
|
this.updateCallMembershipRunning = false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
private makeNewMembership(deviceId: string): SessionMembershipData | {} {
|
||||||
|
// If we're joined, add our own
|
||||||
|
if (this.isJoined()) {
|
||||||
|
return this.makeMyMembership(deviceId);
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Returns true if we intend to be participating in the MatrixRTC session.
|
||||||
|
* This is determined by checking if the relativeExpiry has been set.
|
||||||
|
*/
|
||||||
|
public isJoined(): boolean {
|
||||||
|
return this.relativeExpiry !== undefined;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Constructs our own membership
|
||||||
|
*/
|
||||||
|
private makeMyMembership(deviceId: string): SessionMembershipData {
|
||||||
|
return {
|
||||||
|
call_id: "",
|
||||||
|
scope: "m.room",
|
||||||
|
application: "m.call",
|
||||||
|
device_id: deviceId,
|
||||||
|
expires: this.relativeExpiry,
|
||||||
|
focus_active: { type: "livekit", focus_selection: "oldest_membership" },
|
||||||
|
foci_preferred: this.ownFociPreferred ?? [],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public getActiveFocus(): Focus | undefined {
|
||||||
|
if (this.ownFocusActive && isLivekitFocusActive(this.ownFocusActive)) {
|
||||||
|
// A livekit active focus
|
||||||
|
if (this.ownFocusActive.focus_selection === "oldest_membership") {
|
||||||
|
const oldestMembership = this.getOldestMembership();
|
||||||
|
return oldestMembership?.getPreferredFoci()[0];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// We do not understand the membership format (could be legacy). We default to oldestMembership
|
||||||
|
// Once there are other methods this is a hard error!
|
||||||
|
const oldestMembership = this.getOldestMembership();
|
||||||
|
return oldestMembership?.getPreferredFoci()[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public async updateCallMembershipEvent(): Promise<void> {
|
||||||
|
if (this.memberEventTimeout) {
|
||||||
|
clearTimeout(this.memberEventTimeout);
|
||||||
|
this.memberEventTimeout = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
|
||||||
|
if (!roomState) throw new Error("Couldn't get room state for room " + this.room.roomId);
|
||||||
|
|
||||||
|
const localUserId = this.client.getUserId();
|
||||||
|
const localDeviceId = this.client.getDeviceId();
|
||||||
|
if (!localUserId || !localDeviceId) throw new Error("User ID or device ID was null!");
|
||||||
|
|
||||||
|
let newContent: {} | SessionMembershipData = {};
|
||||||
|
// TODO: add back expiary logic to non-legacy events
|
||||||
|
// previously we checked here if the event is timed out and scheduled a check if not.
|
||||||
|
// maybe there is a better way.
|
||||||
|
newContent = this.makeNewMembership(localDeviceId);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (this.isJoined()) {
|
||||||
|
const stateKey = this.makeMembershipStateKey(localUserId, localDeviceId);
|
||||||
|
const prepareDelayedDisconnection = async (): Promise<void> => {
|
||||||
|
try {
|
||||||
|
const res = await resendIfRateLimited(() =>
|
||||||
|
this.client._unstable_sendDelayedStateEvent(
|
||||||
|
this.room.roomId,
|
||||||
|
{
|
||||||
|
delay: this.membershipServerSideExpiryTimeout,
|
||||||
|
},
|
||||||
|
EventType.GroupCallMemberPrefix,
|
||||||
|
{}, // leave event
|
||||||
|
stateKey,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
logger.log("BEFOER:", this.disconnectDelayId);
|
||||||
|
this.disconnectDelayId = res.delay_id;
|
||||||
|
logger.log("AFTER:", this.disconnectDelayId);
|
||||||
|
} catch (e) {
|
||||||
|
if (
|
||||||
|
e instanceof MatrixError &&
|
||||||
|
e.errcode === "M_UNKNOWN" &&
|
||||||
|
e.data["org.matrix.msc4140.errcode"] === "M_MAX_DELAY_EXCEEDED"
|
||||||
|
) {
|
||||||
|
const maxDelayAllowed = e.data["org.matrix.msc4140.max_delay"];
|
||||||
|
if (
|
||||||
|
typeof maxDelayAllowed === "number" &&
|
||||||
|
this.membershipServerSideExpiryTimeout > maxDelayAllowed
|
||||||
|
) {
|
||||||
|
this.membershipServerSideExpiryTimeoutOverride = maxDelayAllowed;
|
||||||
|
return prepareDelayedDisconnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.error("Failed to prepare delayed disconnection event:", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
await prepareDelayedDisconnection();
|
||||||
|
// Send join event _after_ preparing the delayed disconnection event
|
||||||
|
await resendIfRateLimited(() =>
|
||||||
|
this.client.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, newContent, stateKey),
|
||||||
|
);
|
||||||
|
// If sending state cancels your own delayed state, prepare another delayed state
|
||||||
|
// TODO: Remove this once MSC4140 is stable & doesn't cancel own delayed state
|
||||||
|
if (this.disconnectDelayId !== undefined) {
|
||||||
|
try {
|
||||||
|
const knownDisconnectDelayId = this.disconnectDelayId;
|
||||||
|
await resendIfRateLimited(() =>
|
||||||
|
this.client._unstable_updateDelayedEvent(
|
||||||
|
knownDisconnectDelayId,
|
||||||
|
UpdateDelayedEventAction.Restart,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof MatrixError && e.errcode === "M_NOT_FOUND") {
|
||||||
|
// If we get a M_NOT_FOUND we prepare a new delayed event.
|
||||||
|
// In other error cases we do not want to prepare anything since we do not have the guarantee, that the
|
||||||
|
// future is not still running.
|
||||||
|
logger.warn("Failed to update delayed disconnection event, prepare it again:", e);
|
||||||
|
this.disconnectDelayId = undefined;
|
||||||
|
await prepareDelayedDisconnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (this.disconnectDelayId !== undefined) {
|
||||||
|
this.scheduleDelayDisconnection();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Not joined
|
||||||
|
let sentDelayedDisconnect = false;
|
||||||
|
if (this.disconnectDelayId !== undefined) {
|
||||||
|
try {
|
||||||
|
const knownDisconnectDelayId = this.disconnectDelayId;
|
||||||
|
await resendIfRateLimited(() =>
|
||||||
|
this.client._unstable_updateDelayedEvent(
|
||||||
|
knownDisconnectDelayId,
|
||||||
|
UpdateDelayedEventAction.Send,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
sentDelayedDisconnect = true;
|
||||||
|
} catch (e) {
|
||||||
|
logger.error("Failed to send our delayed disconnection event:", e);
|
||||||
|
}
|
||||||
|
this.disconnectDelayId = undefined;
|
||||||
|
}
|
||||||
|
if (!sentDelayedDisconnect) {
|
||||||
|
await resendIfRateLimited(() =>
|
||||||
|
this.client.sendStateEvent(
|
||||||
|
this.room.roomId,
|
||||||
|
EventType.GroupCallMemberPrefix,
|
||||||
|
{},
|
||||||
|
this.makeMembershipStateKey(localUserId, localDeviceId),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.info("Sent updated call member event.");
|
||||||
|
} catch (e) {
|
||||||
|
const resendDelay = this.callMemberEventRetryDelayMinimum + Math.random() * this.callMemberEventRetryJitter;
|
||||||
|
logger.warn(`Failed to send call member event (retrying in ${resendDelay}): ${e}`);
|
||||||
|
await sleep(resendDelay);
|
||||||
|
await this.triggerCallMembershipEventUpdate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private scheduleDelayDisconnection(): void {
|
||||||
|
this.memberEventTimeout = setTimeout(this.delayDisconnection, this.membershipKeepAlivePeriod);
|
||||||
|
}
|
||||||
|
|
||||||
|
private readonly delayDisconnection = async (): Promise<void> => {
|
||||||
|
try {
|
||||||
|
const knownDisconnectDelayId = this.disconnectDelayId!;
|
||||||
|
await resendIfRateLimited(() =>
|
||||||
|
this.client._unstable_updateDelayedEvent(knownDisconnectDelayId, UpdateDelayedEventAction.Restart),
|
||||||
|
);
|
||||||
|
this.scheduleDelayDisconnection();
|
||||||
|
} catch (e) {
|
||||||
|
logger.error("Failed to delay our disconnection event:", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private makeMembershipStateKey(localUserId: string, localDeviceId: string): string {
|
||||||
|
const stateKey = `${localUserId}_${localDeviceId}`;
|
||||||
|
if (/^org\.matrix\.msc(3757|3779)\b/.exec(this.room.getVersion())) {
|
||||||
|
return stateKey;
|
||||||
|
} else {
|
||||||
|
return `_${stateKey}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function resendIfRateLimited<T>(func: () => Promise<T>, numRetriesAllowed: number = 1): Promise<T> {
|
||||||
|
// eslint-disable-next-line no-constant-condition
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return await func();
|
||||||
|
} catch (e) {
|
||||||
|
if (numRetriesAllowed > 0 && e instanceof HTTPError && e.isRateLimitError()) {
|
||||||
|
numRetriesAllowed--;
|
||||||
|
let resendDelay: number;
|
||||||
|
const defaultMs = 5000;
|
||||||
|
try {
|
||||||
|
resendDelay = e.getRetryAfterMs() ?? defaultMs;
|
||||||
|
logger.info(`Rate limited by server, retrying in ${resendDelay}ms`);
|
||||||
|
} catch (e) {
|
||||||
|
logger.warn(
|
||||||
|
`Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`,
|
||||||
|
e,
|
||||||
|
);
|
||||||
|
resendDelay = defaultMs;
|
||||||
|
}
|
||||||
|
await sleep(resendDelay);
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user