You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-11-25 05:23:13 +03:00
MatrixRTC MembershipManger: remove redundant sendDelayedEventAction and expose status (#4747)
* Remove redundant sendDelayedEventAction We do already have the state `hasMemberEvent` that allows to distinguish the two cases. No need to create two dedicated actions. * fix missing return * Make membership manager an event emitter to inform about status updates. - deprecate isJoined (replaced by isActivated) - move Interface types to types.ts * add tests for status updates. * lint * test "reschedules delayed leave event" in case the delayed event gets canceled * review * fix types * prettier * fix legacy membership manager
This commit is contained in:
@@ -20,7 +20,13 @@ limitations under the License.
|
||||
import { type MockedFunction, type Mock } from "jest-mock";
|
||||
|
||||
import { EventType, HTTPError, MatrixError, UnsupportedDelayedEventsEndpointError, type Room } from "../../../src";
|
||||
import { type Focus, type LivekitFocusActive, type SessionMembershipData } from "../../../src/matrixrtc";
|
||||
import {
|
||||
MembershipManagerEvent,
|
||||
Status,
|
||||
type Focus,
|
||||
type LivekitFocusActive,
|
||||
type SessionMembershipData,
|
||||
} from "../../../src/matrixrtc";
|
||||
import { LegacyMembershipManager } from "../../../src/matrixrtc/LegacyMembershipManager";
|
||||
import { makeMockClient, makeMockRoom, membershipTemplate, mockCallMembership, type MockClient } from "./mocks";
|
||||
import { MembershipManager } from "../../../src/matrixrtc/NewMembershipManager";
|
||||
@@ -34,6 +40,14 @@ function waitForMockCall(method: MockedFunction<any>, returnVal?: Promise<any>)
|
||||
});
|
||||
});
|
||||
}
|
||||
function waitForMockCallOnce(method: MockedFunction<any>, returnVal?: Promise<any>) {
|
||||
return new Promise<void>((resolve) => {
|
||||
method.mockImplementationOnce(() => {
|
||||
resolve();
|
||||
return returnVal ?? Promise.resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function createAsyncHandle(method: MockedFunction<any>) {
|
||||
const { reject, resolve, promise } = defer();
|
||||
@@ -78,16 +92,16 @@ describe.each([
|
||||
// There is no need to clean up mocks since we will recreate the client.
|
||||
});
|
||||
|
||||
describe("isJoined()", () => {
|
||||
describe("isActivated()", () => {
|
||||
it("defaults to false", () => {
|
||||
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||
expect(manager.isJoined()).toEqual(false);
|
||||
expect(manager.isActivated()).toEqual(false);
|
||||
});
|
||||
|
||||
it("returns true after join()", () => {
|
||||
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||
manager.join([]);
|
||||
expect(manager.isJoined()).toEqual(true);
|
||||
expect(manager.isActivated()).toEqual(true);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -125,6 +139,23 @@ describe.each([
|
||||
{},
|
||||
"_@alice:example.org_AAAAAAA",
|
||||
);
|
||||
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("reschedules delayed leave event if sending state cancels it", async () => {
|
||||
const memberManager = new TestMembershipManager(undefined, room, client, () => undefined);
|
||||
const waitForSendState = waitForMockCall(client.sendStateEvent);
|
||||
const waitForUpdateDelaye = waitForMockCallOnce(
|
||||
client._unstable_updateDelayedEvent,
|
||||
Promise.reject(new MatrixError({ errcode: "M_NOT_FOUND" })),
|
||||
);
|
||||
memberManager.join([focus], focusActive);
|
||||
await waitForSendState;
|
||||
await waitForUpdateDelaye;
|
||||
await jest.advanceTimersByTimeAsync(1);
|
||||
// Once for the initial event and once because of the errcode: "M_NOT_FOUND"
|
||||
// Different to "sends a membership event and schedules delayed leave when joining a call" where its only called once (1)
|
||||
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
describe("does not prefix the state key with _ for rooms that support user-owned state events", () => {
|
||||
@@ -505,7 +536,39 @@ describe.each([
|
||||
await testExpires(10_000, 1_000);
|
||||
});
|
||||
});
|
||||
describe("status updates", () => {
|
||||
it("starts 'Disconnected' !FailsForLegacy", () => {
|
||||
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||
expect(manager.status).toBe(Status.Disconnected);
|
||||
});
|
||||
it("emits 'Connection' and 'Connected' after join !FailsForLegacy", async () => {
|
||||
const handleDelayedEvent = createAsyncHandle(client._unstable_sendDelayedStateEvent);
|
||||
const handleStateEvent = createAsyncHandle(client.sendStateEvent);
|
||||
|
||||
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||
expect(manager.status).toBe(Status.Disconnected);
|
||||
const connectEmit = jest.fn();
|
||||
manager.on(MembershipManagerEvent.StatusChanged, connectEmit);
|
||||
manager.join([focus], focusActive);
|
||||
expect(manager.status).toBe(Status.Connecting);
|
||||
handleDelayedEvent.resolve();
|
||||
await jest.advanceTimersByTimeAsync(1);
|
||||
expect(connectEmit).toHaveBeenCalledWith(Status.Disconnected, Status.Connecting);
|
||||
handleStateEvent.resolve();
|
||||
await jest.advanceTimersByTimeAsync(1);
|
||||
expect(connectEmit).toHaveBeenCalledWith(Status.Connecting, Status.Connected);
|
||||
});
|
||||
it("emits 'Disconnecting' and 'Disconnected' after leave !FailsForLegacy", async () => {
|
||||
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||
const connectEmit = jest.fn();
|
||||
manager.on(MembershipManagerEvent.StatusChanged, connectEmit);
|
||||
manager.join([focus], focusActive);
|
||||
await jest.advanceTimersByTimeAsync(1);
|
||||
await manager.leave();
|
||||
expect(connectEmit).toHaveBeenCalledWith(Status.Connected, Status.Disconnecting);
|
||||
expect(connectEmit).toHaveBeenCalledWith(Status.Disconnecting, Status.Disconnected);
|
||||
});
|
||||
});
|
||||
describe("server error handling", () => {
|
||||
// Types of server error: 429 rate limit with no retry-after header, 429 with retry-after, 50x server error (maybe retry every second), connection/socket timeout
|
||||
describe("retries sending delayed leave event", () => {
|
||||
|
||||
@@ -27,7 +27,7 @@ import { type Focus } from "./focus.ts";
|
||||
import { isLivekitFocusActive } from "./LivekitFocus.ts";
|
||||
import { type MembershipConfig } from "./MatrixRTCSession.ts";
|
||||
import { type EmptyObject } from "../@types/common.ts";
|
||||
import { type IMembershipManager } from "./NewMembershipManager.ts";
|
||||
import { type IMembershipManager, type MembershipManagerEvent, Status } from "./types.ts";
|
||||
|
||||
/**
|
||||
* This internal class is used by the MatrixRTCSession to manage the local user's own membership of the session.
|
||||
@@ -103,9 +103,35 @@ export class LegacyMembershipManager implements IMembershipManager {
|
||||
private getOldestMembership: () => CallMembership | undefined,
|
||||
) {}
|
||||
|
||||
public off(
|
||||
event: MembershipManagerEvent.StatusChanged,
|
||||
listener: (oldStatus: Status, newStatus: Status) => void,
|
||||
): this {
|
||||
logger.error("off is not implemented on LegacyMembershipManager");
|
||||
return this;
|
||||
}
|
||||
|
||||
public on(
|
||||
event: MembershipManagerEvent.StatusChanged,
|
||||
listener: (oldStatus: Status, newStatus: Status) => void,
|
||||
): this {
|
||||
logger.error("on is not implemented on LegacyMembershipManager");
|
||||
return this;
|
||||
}
|
||||
|
||||
public isJoined(): boolean {
|
||||
return this.relativeExpiry !== undefined;
|
||||
}
|
||||
public isActivated(): boolean {
|
||||
return this.isJoined();
|
||||
}
|
||||
/**
|
||||
* Unimplemented
|
||||
* @returns Status.Unknown
|
||||
*/
|
||||
public get status(): Status {
|
||||
return Status.Unknown;
|
||||
}
|
||||
|
||||
public join(fociPreferred: Focus[], fociActive?: Focus): void {
|
||||
this.ownFocusActive = fociActive;
|
||||
|
||||
@@ -25,10 +25,11 @@ import { RoomStateEvent } from "../models/room-state.ts";
|
||||
import { type Focus } from "./focus.ts";
|
||||
import { KnownMembership } from "../@types/membership.ts";
|
||||
import { type MatrixEvent } from "../models/event.ts";
|
||||
import { MembershipManager, type IMembershipManager } from "./NewMembershipManager.ts";
|
||||
import { MembershipManager } from "./NewMembershipManager.ts";
|
||||
import { EncryptionManager, type IEncryptionManager, type Statistics } from "./EncryptionManager.ts";
|
||||
import { LegacyMembershipManager } from "./LegacyMembershipManager.ts";
|
||||
import { logDurationSync } from "../utils.ts";
|
||||
import type { IMembershipManager } from "./types.ts";
|
||||
|
||||
const logger = rootLogger.getChild("MatrixRTCSession");
|
||||
|
||||
|
||||
@@ -24,110 +24,80 @@ import { type Room } from "../models/room.ts";
|
||||
import { defer, type IDeferred } from "../utils.ts";
|
||||
import { type CallMembership, DEFAULT_EXPIRE_DURATION, type SessionMembershipData } from "./CallMembership.ts";
|
||||
import { type Focus } from "./focus.ts";
|
||||
import {
|
||||
type IMembershipManager,
|
||||
type MembershipManagerEventHandlerMap,
|
||||
MembershipManagerEvent,
|
||||
Status,
|
||||
} from "./types.ts";
|
||||
import { isLivekitFocusActive } from "./LivekitFocus.ts";
|
||||
import { type MembershipConfig } from "./MatrixRTCSession.ts";
|
||||
import { ActionScheduler, type ActionUpdate } from "./NewMembershipManagerActionScheduler.ts";
|
||||
import { TypedEventEmitter } from "../models/typed-event-emitter.ts";
|
||||
|
||||
const logger = rootLogger.getChild("MatrixRTCSession");
|
||||
|
||||
/**
|
||||
* This interface defines what a MembershipManager uses and exposes.
|
||||
* This interface is what we use to write tests and allows changing the actual implementation
|
||||
* without breaking tests because of some internal method renaming.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
export interface IMembershipManager {
|
||||
/**
|
||||
* If we are trying to join, or have successfully joined the session.
|
||||
* It does not reflect if the room state is already configured to represent us being joined.
|
||||
* It only means that the Manager should be trying to connect or to disconnect running.
|
||||
* The Manager is still running right after isJoined becomes false to send the disconnect events.
|
||||
* (A more accurate name would be `isActivated`)
|
||||
* @returns true if we intend to be participating in the MatrixRTC session
|
||||
*/
|
||||
isJoined(): boolean;
|
||||
/**
|
||||
* Start sending all necessary events to make this user participate in the RTC session.
|
||||
* @param fociPreferred the list of preferred foci to use in the joined RTC membership event.
|
||||
* @param fociActive the active focus to use in the joined RTC membership event.
|
||||
* @throws can throw if it exceeds a configured maximum retry.
|
||||
*/
|
||||
join(fociPreferred: Focus[], fociActive?: Focus, onError?: (error: unknown) => void): void;
|
||||
/**
|
||||
* Send all necessary events to make this user leave the RTC session.
|
||||
* @param timeout the maximum duration in ms until the promise is forced to resolve.
|
||||
* @returns It resolves with true in case the leave was sent successfully.
|
||||
* It resolves with false in case we hit the timeout before sending successfully.
|
||||
*/
|
||||
leave(timeout?: number): Promise<boolean>;
|
||||
/**
|
||||
* Call this if the MatrixRTC session members have changed.
|
||||
*/
|
||||
onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise<void>;
|
||||
/**
|
||||
* The used active focus in the currently joined session.
|
||||
* @returns the used active focus in the currently joined session or undefined if not joined.
|
||||
*/
|
||||
getActiveFocus(): Focus | undefined;
|
||||
}
|
||||
|
||||
/* MembershipActionTypes:
|
||||
▼
|
||||
┌─────────────────────┐
|
||||
│SendFirstDelayedEvent│
|
||||
└─────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────┐
|
||||
┌────────────│SendJoinEvent│────────────┐
|
||||
│ └─────────────┘ │
|
||||
│ ┌─────┐ ┌──────┐ │ ┌──────┐
|
||||
|
||||
On Join: ───────────────┐ ┌───────────────(1)───────────┐
|
||||
▼ ▼ │
|
||||
┌────────────────┐ │
|
||||
│SendDelayedEvent│ ──────(2)───┐ │
|
||||
└────────────────┘ │ │
|
||||
│(3) │ │
|
||||
▼ │ │
|
||||
┌─────────────┐ │ │
|
||||
┌──────(4)───│SendJoinEvent│────(4)─────┐ │ │
|
||||
│ └─────────────┘ │ │ │
|
||||
│ ┌─────┐ ┌──────┐ │ │ │
|
||||
▼ ▼ │ │ ▼ ▼ ▼ │
|
||||
┌────────────┐ │ │ ┌───────────────────┐ │
|
||||
│UpdateExpiry│ │ │ │RestartDelayedEvent│ │
|
||||
│UpdateExpiry│ (s) (s)|RestartDelayedEvent│ │
|
||||
└────────────┘ │ │ └───────────────────┘ │
|
||||
│ │ │ │ │ │
|
||||
└─────┘ └──────┘ │ │
|
||||
│ │
|
||||
┌────────────────────┐ │ │
|
||||
│SendMainDelayedEvent│◄───────┘ │
|
||||
└───────────────────┬┘ │
|
||||
│ │
|
||||
└─────────────────────┘
|
||||
STOP ALL ABOVE
|
||||
└─────┘ └──────┘ └───────┘
|
||||
|
||||
On Leave: ───────── STOP ALL ABOVE
|
||||
▼
|
||||
┌───────────────────────────────┐
|
||||
┌────────────────────────────────┐
|
||||
│ SendScheduledDelayedLeaveEvent │
|
||||
└───────────────────────────────┘
|
||||
│
|
||||
└────────────────────────────────┘
|
||||
│(5)
|
||||
▼
|
||||
┌──────────────┐
|
||||
│SendLeaveEvent│
|
||||
└──────────────┘
|
||||
|
||||
(1) [Not found error] results in resending the delayed event
|
||||
(2) [hasMemberEvent = true] Sending the delayed event if we
|
||||
already have a call member event results jumping to the
|
||||
RestartDelayedEvent loop directly
|
||||
(3) [hasMemberEvent = false] if there is not call member event
|
||||
sending it is the next step
|
||||
(4) Both (UpdateExpiry and RestartDelayedEvent) actions are
|
||||
scheduled when successfully sending the state event
|
||||
(5) Only if delayed event sending failed (fallback)
|
||||
(s) Successful restart/resend
|
||||
*/
|
||||
|
||||
/**
|
||||
* The different types of actions the MembershipManager can take.
|
||||
* @internal
|
||||
*/
|
||||
export enum MembershipActionType {
|
||||
SendFirstDelayedEvent = "SendFirstDelayedEvent",
|
||||
SendDelayedEvent = "SendDelayedEvent",
|
||||
// -> MembershipActionType.SendJoinEvent if successful
|
||||
// -> DelayedLeaveActionType.SendFirstDelayedEvent on error, retry sending the first delayed event.
|
||||
// -> DelayedLeaveActionType.SendDelayedEvent on error, retry sending the first delayed event.
|
||||
// -> DelayedLeaveActionType.RestartDelayedEvent on success start updating the delayed event
|
||||
SendJoinEvent = "SendJoinEvent",
|
||||
// -> MembershipActionType.SendJoinEvent if we run into a rate limit and need to retry
|
||||
// -> MembershipActionType.Update if we successfully send the join event then schedule the expire event update
|
||||
// -> DelayedLeaveActionType.RestartDelayedEvent to recheck the delayed event
|
||||
RestartDelayedEvent = "RestartDelayedEvent",
|
||||
// -> DelayedLeaveActionType.SendMainDelayedEvent on missing delay id but there is a rtc state event
|
||||
// -> DelayedLeaveActionType.SendFirstDelayedEvent on missing delay id and there is no state event
|
||||
// -> DelayedLeaveActionType.SendDelayedEvent on missing delay id and there is no state event
|
||||
// -> DelayedLeaveActionType.RestartDelayedEvent on success we schedule the next restart
|
||||
UpdateExpiry = "UpdateExpiry",
|
||||
// -> MembershipActionType.Update if the timeout has passed so the next update is required.
|
||||
SendMainDelayedEvent = "SendMainDelayedEvent",
|
||||
// -> DelayedLeaveActionType.RestartDelayedEvent on success start updating the delayed event
|
||||
// -> DelayedLeaveActionType.SendMainDelayedEvent on error try again
|
||||
SendScheduledDelayedLeaveEvent = "SendScheduledDelayedLeaveEvent",
|
||||
// -> MembershipActionType.SendLeaveEvent on failiour (not found) we need to send the leave manually and cannot use the scheduled delayed event
|
||||
// -> DelayedLeaveActionType.SendScheduledDelayedLeaveEvent on error we try again.
|
||||
@@ -138,7 +108,7 @@ export enum MembershipActionType {
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
export interface ActionSchedulerState {
|
||||
export interface MembershipManagerState {
|
||||
/** The delayId we got when successfully sending the delayed leave event.
|
||||
* Gets set to undefined if the server claims it cannot find the delayed event anymore. */
|
||||
delayId?: string;
|
||||
@@ -158,17 +128,6 @@ export interface ActionSchedulerState {
|
||||
networkErrorRetries: Map<MembershipActionType, number>;
|
||||
}
|
||||
|
||||
enum Status {
|
||||
Disconnected = "Disconnected",
|
||||
Connecting = "Connecting",
|
||||
ConnectingFailed = "ConnectingFailed",
|
||||
Connected = "Connected",
|
||||
Reconnecting = "Reconnecting",
|
||||
Disconnecting = "Disconnecting",
|
||||
Stuck = "Stuck",
|
||||
Unknown = "Unknown",
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is responsible for sending all events relating to the own membership of a matrixRTC call.
|
||||
* It has the following tasks:
|
||||
@@ -182,11 +141,18 @@ enum Status {
|
||||
* - Stop the timer for the delay refresh
|
||||
* - Stop the timer for updating the state event
|
||||
*/
|
||||
export class MembershipManager implements IMembershipManager {
|
||||
export class MembershipManager
|
||||
extends TypedEventEmitter<MembershipManagerEvent, MembershipManagerEventHandlerMap>
|
||||
implements IMembershipManager
|
||||
{
|
||||
private activated = false;
|
||||
public isJoined(): boolean {
|
||||
public isActivated(): boolean {
|
||||
return this.activated;
|
||||
}
|
||||
// DEPRECATED use isActivated
|
||||
public isJoined(): boolean {
|
||||
return this.isActivated();
|
||||
}
|
||||
|
||||
/**
|
||||
* Puts the MembershipManager in a state where it tries to be joined.
|
||||
@@ -205,23 +171,27 @@ export class MembershipManager implements IMembershipManager {
|
||||
this.focusActive = focusActive;
|
||||
this.leavePromiseDefer = undefined;
|
||||
this.activated = true;
|
||||
|
||||
this.oldStatus = this.status;
|
||||
this.state = MembershipManager.defaultState;
|
||||
|
||||
this.scheduler
|
||||
.startWithJoin()
|
||||
.then(() => {
|
||||
if (!this.scheduler.running) {
|
||||
this.leavePromiseDefer?.resolve(true);
|
||||
this.leavePromiseDefer = undefined;
|
||||
}
|
||||
})
|
||||
.catch((e) => {
|
||||
logger.error("MembershipManager stopped because: ", e);
|
||||
onError?.(e);
|
||||
})
|
||||
.finally(() => {
|
||||
// Should already be set to false when calling `leave` in non error cases.
|
||||
.finally(() => (this.activated = false));
|
||||
this.activated = false;
|
||||
// Here the scheduler is not running anymore so we the `membershipLoopHandler` is not called to emit.
|
||||
if (this.oldStatus && this.oldStatus !== this.status) {
|
||||
this.emit(MembershipManagerEvent.StatusChanged, this.oldStatus, this.status);
|
||||
}
|
||||
if (!this.scheduler.running) {
|
||||
this.leavePromiseDefer?.resolve(true);
|
||||
this.leavePromiseDefer = undefined;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -256,7 +226,7 @@ export class MembershipManager implements IMembershipManager {
|
||||
// If one of these actions are scheduled or are getting inserted in the next iteration, we should already
|
||||
// take care of our missing membership.
|
||||
const sendingMembershipActions = [
|
||||
MembershipActionType.SendFirstDelayedEvent,
|
||||
MembershipActionType.SendDelayedEvent,
|
||||
MembershipActionType.SendJoinEvent,
|
||||
];
|
||||
logger.warn("Missing own membership: force re-join");
|
||||
@@ -313,6 +283,7 @@ export class MembershipManager implements IMembershipManager {
|
||||
>,
|
||||
private getOldestMembership: () => CallMembership | undefined,
|
||||
) {
|
||||
super();
|
||||
const [userId, deviceId] = [this.client.getUserId(), this.client.getDeviceId()];
|
||||
if (userId === null) throw Error("Missing userId in client");
|
||||
if (deviceId === null) throw Error("Missing deviceId in client");
|
||||
@@ -322,8 +293,8 @@ export class MembershipManager implements IMembershipManager {
|
||||
}
|
||||
|
||||
// MembershipManager mutable state.
|
||||
private state: ActionSchedulerState;
|
||||
private static get defaultState(): ActionSchedulerState {
|
||||
private state: MembershipManagerState;
|
||||
private static get defaultState(): MembershipManagerState {
|
||||
return {
|
||||
hasMemberStateEvent: false,
|
||||
delayId: undefined,
|
||||
@@ -382,7 +353,11 @@ export class MembershipManager implements IMembershipManager {
|
||||
if (this.oldStatus) {
|
||||
// we put this at the beginning of the actions scheduler loop handle callback since it is a loop this
|
||||
// is equivalent to running it at the end of the loop. (just after applying the status/action list changes)
|
||||
// This order is required because this method needs to return the action updates.
|
||||
logger.debug(`MembershipManager applied action changes. Status: ${this.oldStatus} -> ${this.status}`);
|
||||
if (this.oldStatus !== this.status) {
|
||||
this.emit(MembershipManagerEvent.StatusChanged, this.oldStatus, this.status);
|
||||
}
|
||||
}
|
||||
this.oldStatus = this.status;
|
||||
logger.debug(`MembershipManager before processing action. status=${this.oldStatus}`);
|
||||
@@ -391,17 +366,16 @@ export class MembershipManager implements IMembershipManager {
|
||||
|
||||
// LOOP HANDLER:
|
||||
private async membershipLoopHandler(type: MembershipActionType): Promise<ActionUpdate> {
|
||||
this.oldStatus = this.status;
|
||||
switch (type) {
|
||||
case MembershipActionType.SendFirstDelayedEvent: {
|
||||
case MembershipActionType.SendDelayedEvent: {
|
||||
// Before we start we check if we come from a state where we have a delay id.
|
||||
if (!this.state.delayId) {
|
||||
return this.sendFirstDelayedLeaveEvent(); // Normal case without any previous delayed id.
|
||||
return this.sendOrResendDelayedLeaveEvent(); // Normal case without any previous delayed id.
|
||||
} else {
|
||||
// This can happen if someone else (or another client) removes our own membership event.
|
||||
// It will trigger `onRTCSessionMemberUpdate` queue `MembershipActionType.SendFirstDelayedEvent`.
|
||||
// We might still have our delayed event from the previous participation and dependent on the server this might not
|
||||
// get automatically removed if the state changes. Hence It would remove our membership unexpectedly shortly after the rejoin.
|
||||
// get removed automatically if the state changes. Hence, it would remove our membership unexpectedly shortly after the rejoin.
|
||||
//
|
||||
// In this block we will try to cancel this delayed event before setting up a new one.
|
||||
|
||||
@@ -411,17 +385,10 @@ export class MembershipManager implements IMembershipManager {
|
||||
case MembershipActionType.RestartDelayedEvent: {
|
||||
if (!this.state.delayId) {
|
||||
// Delay id got reset. This action was used to check if the hs canceled the delayed event when the join state got sent.
|
||||
return createInsertActionUpdate(
|
||||
this.state.hasMemberStateEvent
|
||||
? MembershipActionType.SendMainDelayedEvent
|
||||
: MembershipActionType.SendFirstDelayedEvent,
|
||||
);
|
||||
return createInsertActionUpdate(MembershipActionType.SendDelayedEvent);
|
||||
}
|
||||
return this.restartDelayedEvent(this.state.delayId);
|
||||
}
|
||||
case MembershipActionType.SendMainDelayedEvent: {
|
||||
return this.sendMainDelayedEvent();
|
||||
}
|
||||
case MembershipActionType.SendScheduledDelayedLeaveEvent: {
|
||||
// We are already good
|
||||
if (!this.state.hasMemberStateEvent) {
|
||||
@@ -452,7 +419,11 @@ export class MembershipManager implements IMembershipManager {
|
||||
}
|
||||
|
||||
// HANDLERS (used in the membershipLoopHandler)
|
||||
private async sendFirstDelayedLeaveEvent(): Promise<ActionUpdate> {
|
||||
private async sendOrResendDelayedLeaveEvent(): Promise<ActionUpdate> {
|
||||
// We can reach this at the start of a call (where we do not yet have a membership: state.hasMemberStateEvent=false)
|
||||
// or during a call if the state event canceled our delayed event or caused by an unexpected error that removed our delayed event.
|
||||
// (Another client could have canceled it, the homeserver might have removed/lost it due to a restart, ...)
|
||||
// In the `then` and `catch` block we treat both cases differently. "if (this.state.hasMemberStateEvent) {} else {}"
|
||||
return await this.client
|
||||
._unstable_sendDelayedStateEvent(
|
||||
this.room.roomId,
|
||||
@@ -465,19 +436,37 @@ export class MembershipManager implements IMembershipManager {
|
||||
)
|
||||
.then((response) => {
|
||||
// On success we reset retries and set delayId.
|
||||
this.state.rateLimitRetries.set(MembershipActionType.SendFirstDelayedEvent, 0);
|
||||
this.state.networkErrorRetries.set(MembershipActionType.SendFirstDelayedEvent, 0);
|
||||
this.resetRateLimitCounter(MembershipActionType.SendDelayedEvent);
|
||||
this.state.delayId = response.delay_id;
|
||||
if (this.state.hasMemberStateEvent) {
|
||||
// This action was scheduled because the previous delayed event was cancelled
|
||||
// due to lack of https://github.com/element-hq/synapse/pull/17810
|
||||
return createInsertActionUpdate(
|
||||
MembershipActionType.RestartDelayedEvent,
|
||||
this.membershipKeepAlivePeriod,
|
||||
);
|
||||
} else {
|
||||
// This action was scheduled because we are in the process of joining
|
||||
return createInsertActionUpdate(MembershipActionType.SendJoinEvent);
|
||||
}
|
||||
})
|
||||
.catch((e) => {
|
||||
const repeatActionType = MembershipActionType.SendFirstDelayedEvent;
|
||||
const repeatActionType = MembershipActionType.SendDelayedEvent;
|
||||
if (this.manageMaxDelayExceededSituation(e)) {
|
||||
return createInsertActionUpdate(repeatActionType);
|
||||
}
|
||||
const update = this.actionUpdateFromErrors(e, repeatActionType, "sendDelayedStateEvent");
|
||||
if (update) return update;
|
||||
|
||||
if (this.state.hasMemberStateEvent) {
|
||||
// This action was scheduled because the previous delayed event was cancelled
|
||||
// due to lack of https://github.com/element-hq/synapse/pull/17810
|
||||
|
||||
// Don't do any other delayed event work if its not supported.
|
||||
if (this.isUnsupportedDelayedEndpoint(e)) return {};
|
||||
throw Error("Could not send delayed event, even though delayed events are supported. " + e);
|
||||
} else {
|
||||
// This action was scheduled because we are in the process of joining
|
||||
// log and fall through
|
||||
if (this.isUnsupportedDelayedEndpoint(e)) {
|
||||
logger.info("Not using delayed event because the endpoint is not supported");
|
||||
@@ -486,6 +475,7 @@ export class MembershipManager implements IMembershipManager {
|
||||
}
|
||||
// On any other error we fall back to not using delayed events and send the join state event immediately
|
||||
return createInsertActionUpdate(MembershipActionType.SendJoinEvent);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -495,11 +485,11 @@ export class MembershipManager implements IMembershipManager {
|
||||
._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Cancel)
|
||||
.then(() => {
|
||||
this.state.delayId = undefined;
|
||||
this.resetRateLimitCounter(MembershipActionType.SendFirstDelayedEvent);
|
||||
return createReplaceActionUpdate(MembershipActionType.SendFirstDelayedEvent);
|
||||
this.resetRateLimitCounter(MembershipActionType.SendDelayedEvent);
|
||||
return createReplaceActionUpdate(MembershipActionType.SendDelayedEvent);
|
||||
})
|
||||
.catch((e) => {
|
||||
const repeatActionType = MembershipActionType.SendFirstDelayedEvent;
|
||||
const repeatActionType = MembershipActionType.SendDelayedEvent;
|
||||
const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent");
|
||||
if (update) return update;
|
||||
|
||||
@@ -538,7 +528,7 @@ export class MembershipManager implements IMembershipManager {
|
||||
const repeatActionType = MembershipActionType.RestartDelayedEvent;
|
||||
if (this.isNotFoundError(e)) {
|
||||
this.state.delayId = undefined;
|
||||
return createInsertActionUpdate(MembershipActionType.SendMainDelayedEvent);
|
||||
return createInsertActionUpdate(MembershipActionType.SendDelayedEvent);
|
||||
}
|
||||
// If the HS does not support delayed events we wont reschedule.
|
||||
if (this.isUnsupportedDelayedEndpoint(e)) return {};
|
||||
@@ -552,40 +542,6 @@ export class MembershipManager implements IMembershipManager {
|
||||
});
|
||||
}
|
||||
|
||||
private async sendMainDelayedEvent(): Promise<ActionUpdate> {
|
||||
return await this.client
|
||||
._unstable_sendDelayedStateEvent(
|
||||
this.room.roomId,
|
||||
{
|
||||
delay: this.membershipServerSideExpiryTimeout,
|
||||
},
|
||||
EventType.GroupCallMemberPrefix,
|
||||
{}, // leave event
|
||||
this.stateKey,
|
||||
)
|
||||
.then((response) => {
|
||||
this.state.delayId = response.delay_id;
|
||||
this.resetRateLimitCounter(MembershipActionType.SendMainDelayedEvent);
|
||||
return createInsertActionUpdate(
|
||||
MembershipActionType.RestartDelayedEvent,
|
||||
this.membershipKeepAlivePeriod,
|
||||
);
|
||||
})
|
||||
.catch((e) => {
|
||||
const repeatActionType = MembershipActionType.SendMainDelayedEvent;
|
||||
// Don't do any other delayed event work if its not supported.
|
||||
if (this.isUnsupportedDelayedEndpoint(e)) return {};
|
||||
|
||||
if (this.manageMaxDelayExceededSituation(e)) {
|
||||
return createInsertActionUpdate(repeatActionType);
|
||||
}
|
||||
const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent");
|
||||
if (update) return update;
|
||||
|
||||
throw Error("Could not send delayed event, even though delayed events are supported. " + e);
|
||||
});
|
||||
}
|
||||
|
||||
private async sendScheduledDelayedLeaveEventOrFallbackToSendLeaveEvent(delayId: string): Promise<ActionUpdate> {
|
||||
return await this.client
|
||||
._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Send)
|
||||
@@ -887,9 +843,8 @@ export class MembershipManager implements IMembershipManager {
|
||||
if (actions.length === 1) {
|
||||
const { type } = actions[0];
|
||||
switch (type) {
|
||||
case MembershipActionType.SendFirstDelayedEvent:
|
||||
case MembershipActionType.SendDelayedEvent:
|
||||
case MembershipActionType.SendJoinEvent:
|
||||
case MembershipActionType.SendMainDelayedEvent:
|
||||
return Status.Connecting;
|
||||
case MembershipActionType.UpdateExpiry: // where no delayed events
|
||||
return Status.Connected;
|
||||
@@ -904,7 +859,7 @@ export class MembershipManager implements IMembershipManager {
|
||||
// normal state for connected with delayed events
|
||||
if (
|
||||
(types.includes(MembershipActionType.RestartDelayedEvent) ||
|
||||
types.includes(MembershipActionType.SendMainDelayedEvent)) &&
|
||||
(types.includes(MembershipActionType.SendDelayedEvent) && this.state.hasMemberStateEvent)) &&
|
||||
types.includes(MembershipActionType.UpdateExpiry)
|
||||
) {
|
||||
return Status.Connected;
|
||||
|
||||
@@ -73,7 +73,7 @@ export class ActionScheduler {
|
||||
return;
|
||||
}
|
||||
this.running = true;
|
||||
this._actions = [{ ts: Date.now(), type: MembershipActionType.SendFirstDelayedEvent }];
|
||||
this._actions = [{ ts: Date.now(), type: MembershipActionType.SendDelayedEvent }];
|
||||
try {
|
||||
while (this._actions.length > 0) {
|
||||
// Sort so next (smallest ts) action is at the beginning
|
||||
@@ -98,7 +98,7 @@ export class ActionScheduler {
|
||||
`\nDate.now: "${Date.now()}`,
|
||||
);
|
||||
try {
|
||||
// `this.wakeup` can also be called and sets the `wakupUpdate` object while we are in the handler.
|
||||
// `this.wakeup` can also be called and sets the `wakeupUpdate` object while we are in the handler.
|
||||
handlerResult = await this.membershipLoopHandler(nextAction.type as MembershipActionType);
|
||||
} catch (e) {
|
||||
throw Error(`The MembershipManager shut down because of the end condition: ${e}`);
|
||||
@@ -125,7 +125,7 @@ export class ActionScheduler {
|
||||
}
|
||||
|
||||
public initiateJoin(): void {
|
||||
this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendFirstDelayedEvent }] });
|
||||
this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendDelayedEvent }] });
|
||||
}
|
||||
public initiateLeave(): void {
|
||||
this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendScheduledDelayedLeaveEvent }] });
|
||||
|
||||
@@ -20,3 +20,4 @@ export * from "./LivekitFocus.ts";
|
||||
export * from "./MatrixRTCSession.ts";
|
||||
export * from "./MatrixRTCSessionManager.ts";
|
||||
export type * from "./types.ts";
|
||||
export { Status, MembershipManagerEvent } from "./types.ts";
|
||||
|
||||
@@ -13,7 +13,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
import { type IMentions } from "../matrix.ts";
|
||||
import type { IMentions } from "../matrix.ts";
|
||||
import type { CallMembership } from "./CallMembership.ts";
|
||||
import type { Focus } from "./focus.ts";
|
||||
|
||||
export interface EncryptionKeyEntry {
|
||||
index: number;
|
||||
key: string;
|
||||
@@ -34,3 +37,83 @@ export interface ICallNotifyContent {
|
||||
"notify_type": CallNotifyType;
|
||||
"call_id": string;
|
||||
}
|
||||
|
||||
export enum Status {
|
||||
Disconnected = "Disconnected",
|
||||
Connecting = "Connecting",
|
||||
ConnectingFailed = "ConnectingFailed",
|
||||
Connected = "Connected",
|
||||
Reconnecting = "Reconnecting",
|
||||
Disconnecting = "Disconnecting",
|
||||
Stuck = "Stuck",
|
||||
Unknown = "Unknown",
|
||||
}
|
||||
|
||||
export enum MembershipManagerEvent {
|
||||
StatusChanged = "StatusChanged",
|
||||
}
|
||||
|
||||
export type MembershipManagerEventHandlerMap = {
|
||||
[MembershipManagerEvent.StatusChanged]: (prefStatus: Status, newStatus: Status) => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* This interface defines what a MembershipManager uses and exposes.
|
||||
* This interface is what we use to write tests and allows changing the actual implementation
|
||||
* without breaking tests because of some internal method renaming.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
export interface IMembershipManager {
|
||||
/**
|
||||
* If we are trying to join, or have successfully joined the session.
|
||||
* It does not reflect if the room state is already configured to represent us being joined.
|
||||
* It only means that the Manager should be trying to connect or to disconnect running.
|
||||
* The Manager is still running right after isJoined becomes false to send the disconnect events.
|
||||
* @returns true if we intend to be participating in the MatrixRTC session
|
||||
* @deprecated This name is confusing and replaced by `isActivated()`. (Returns the same as `isActivated()`)
|
||||
*/
|
||||
isJoined(): boolean;
|
||||
/**
|
||||
* If the manager is activated. This means it tries to do its job to join the call, resend state events...
|
||||
* It does not imply that the room state is already configured to represent being joined.
|
||||
* It means that the Manager tries to connect or is connected. ("the manager is still active")
|
||||
* Once `leave()` is called the manager is not activated anymore but still running until `leave()` resolves.
|
||||
* @returns `true` if we intend to be participating in the MatrixRTC session
|
||||
*/
|
||||
isActivated(): boolean;
|
||||
/**
|
||||
* Get the actual connection status of the manager.
|
||||
*/
|
||||
get status(): Status;
|
||||
/**
|
||||
* The current status while the manager is activated
|
||||
*/
|
||||
/**
|
||||
* Start sending all necessary events to make this user participate in the RTC session.
|
||||
* @param fociPreferred the list of preferred foci to use in the joined RTC membership event.
|
||||
* @param fociActive the active focus to use in the joined RTC membership event.
|
||||
* @throws can throw if it exceeds a configured maximum retry.
|
||||
*/
|
||||
join(fociPreferred: Focus[], fociActive?: Focus, onError?: (error: unknown) => void): void;
|
||||
/**
|
||||
* Send all necessary events to make this user leave the RTC session.
|
||||
* @param timeout the maximum duration in ms until the promise is forced to resolve.
|
||||
* @returns It resolves with true in case the leave was sent successfully.
|
||||
* It resolves with false in case we hit the timeout before sending successfully.
|
||||
*/
|
||||
leave(timeout?: number): Promise<boolean>;
|
||||
/**
|
||||
* Call this if the MatrixRTC session members have changed.
|
||||
*/
|
||||
onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise<void>;
|
||||
/**
|
||||
* The used active focus in the currently joined session.
|
||||
* @returns the used active focus in the currently joined session or undefined if not joined.
|
||||
*/
|
||||
getActiveFocus(): Focus | undefined;
|
||||
|
||||
// TypedEventEmitter methods:
|
||||
on(event: MembershipManagerEvent.StatusChanged, listener: (oldStatus: Status, newStatus: Status) => void): this;
|
||||
off(event: MembershipManagerEvent.StatusChanged, listener: (oldStatus: Status, newStatus: Status) => void): this;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user