You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-08-09 10:22:46 +03:00
MatrixRTC: New membership manager (#4726)
* WIP doodles on MembershipManager test cases
* .
* initial membership manager test setup.
* Updates from discussion
* revert renaming comments
* remove unused import
* fix leave delayed event resend test.
It was missing a flush.
* comment out and remove unused variables
* es lint
* use jsdom instead of node test environment
* remove unused variables
* remove unused export
* temp
* review
* fixup tests
* more review
* remove wait for expect dependency
* temp
* fix wrong mocked meberhsip template
* rename MembershipManager -> LegacyMembershipManager
And remove the IMembershipManager from it
* Add new memberhsip manager
* fix tests to be compatible with old and new membership manager
* Comment cleanup
* Allow join to throw
- Add tests for throwing cases
- Fixs based on tests
* introduce membershipExpiryTimeoutSlack
* more detailed comments and cleanup
* warn if slack is misconfigured and use default values instead
* fix action resets.
* flatten MembershipManager.spec.ts
* rename testEnvironment to memberManagerTestEnvironment
* allow configuring Legacy manager in the matrixRTC session
* deprecate LegacyMembershipManager
* remove usage of waitForExpect
* flatten tests and add comments
* clean up leave logic branch
* add more leave test cases
* use defer
* review ("Some minor tidying things for now.")
* add onError for join method and cleanup
* use pop instead of filter
* fixes
* simplify error handling and MembershipAction
Only use one membership action enum
* Add diagram
* fix new error api in rtc session
* fix up retry counter
* fix lints
* make unrecoverable errors more explicit
* fix tests
* Allow multiple retries on the rtc state event http requests.
* use then catch for startup
* no try catch 1
* update expire headroom logic
transition from try catch to .then .catch
* replace flushPromise with advanceTimersByTimeAsync
* fix leaving special cases
* more unrecoverable errors special cases
* move to MatrixRTCSessionManager logger
* add state reset and add another unhandleable error
The error occurs if we want to cancel the delayed event we still have an id for but get a non expected error.
* missed review fixes
* remove @jest/environment dependency
* Cleanup awaits and Make mock types more correct.
Make every mock return a Promise if the real implementation does return a pormise.
* remove flush promise dependency
* fix not recreating default state on reset
This broke all tests since we only created the state once and than passed by ref
* Use per action rate limit and retry counter
There can be multiple retries at once so we need to store counters per action
e.g. the send update membership and the restart delayed could be rate limited at the same time.
* add linting to matrixrtc tests
* Add fix async lints and use matrix rtc logger for test environment.
* prettier
* review step 1
* change to MatrixRTCSession logger
* review step 2
* make LoopHandler Private
* update config to use NewManager wording
* emit error on rtc session if the membership manager encounters one
* network error and throw refactor
* make accessing the full room deprecated
* remove deprecated usage of full room
* Clean up the deprecation
* add network error handler and cleanup
* better logging, another test, make maximumNetworkErrorRetryCount configurable
* more logging & refactor leave promise
* add ConnectionError as possible retry cause
* Make it work in embedded mode with a server that does not support delayed events
* review iteration 1
* review iteration 2
* first step in improving widget error handling
* make the embedded client throw ConnectionErrors where desired.
* fix tests
* delayed event sending widget mode stop gap fix.
* improve comment
* fix unrecoverable error joinState (and add JoinStateChanged) emission.
* check that we do not add multipe sendFirstDelayed Events
* also check insertions queue
* always log "Missing own membership: force re-join"
* Do not update the membership if we are in any (a later) state of sending our own state.
The scheduled states MembershipActionType.SendFirstDelayedEvent and MembershipActionType.SendJoinEvent both imply that we are already trying to send our own membership state event.
* make leave reset actually stop the manager.
The reset case was not covered properly. There are cases where it is not allowed to add additional events after a reset and cases where we want to add more events after the reset. We need to allow this as a reset property.
* fix tests (and implementation)
* Allow MembershipManger to be set at runtime via JoinConfig.membershipManagerFactory
* Map actions into status as a sanity check
* Log status change after applying actions
* Add todo
* Cleanup
* Log transition from earlier status
* remove redundant status implementation
also add TODO comment to not forget about this.
* More cleanup
* Consider insertions in status()
* Log duration for emitting MatrixRTCSessionEvent.MembershipsChanged
* add another valid condition for connected
* some TODO cleanup
* review add warning when using addAction while the scheduler is not running.
* es lint
* refactor to return based handler approach (remove insertions array)
* refactor: Move action scheduler
* refactor: move different handler cases into separate functions
* linter
* review: delayed events endpoint error
* review
* Suggestions from pair review
* resetState is actually only used internally
* Revert "resetState is actually only used internally"
This reverts commit 6af4730919
.
* refactor: running is part of the scheduler (not state)
* refactor: move everything state related from schduler to manager.
* review
* Update src/matrixrtc/NewMembershipManager.ts
Co-authored-by: Hugh Nimmo-Smith <hughns@users.noreply.github.com>
* review
* public -> private + missed review fiexes (comment typos)
---------
Co-authored-by: Hugh Nimmo-Smith <hughns@matrix.org>
Co-authored-by: Hugh Nimmo-Smith <hughns@users.noreply.github.com>
This commit is contained in:
@@ -49,26 +49,26 @@ const testOIDCToken = {
|
|||||||
token_type: "Bearer",
|
token_type: "Bearer",
|
||||||
};
|
};
|
||||||
class MockWidgetApi extends EventEmitter {
|
class MockWidgetApi extends EventEmitter {
|
||||||
public start = jest.fn();
|
public start = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapability = jest.fn();
|
public requestCapability = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapabilities = jest.fn();
|
public requestCapabilities = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapabilityForRoomTimeline = jest.fn();
|
public requestCapabilityForRoomTimeline = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapabilityToSendEvent = jest.fn();
|
public requestCapabilityToSendEvent = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapabilityToReceiveEvent = jest.fn();
|
public requestCapabilityToReceiveEvent = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapabilityToSendMessage = jest.fn();
|
public requestCapabilityToSendMessage = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapabilityToReceiveMessage = jest.fn();
|
public requestCapabilityToReceiveMessage = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapabilityToSendState = jest.fn();
|
public requestCapabilityToSendState = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapabilityToReceiveState = jest.fn();
|
public requestCapabilityToReceiveState = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapabilityToSendToDevice = jest.fn();
|
public requestCapabilityToSendToDevice = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestCapabilityToReceiveToDevice = jest.fn();
|
public requestCapabilityToReceiveToDevice = jest.fn().mockResolvedValue(undefined);
|
||||||
public sendRoomEvent = jest.fn(
|
public sendRoomEvent = jest.fn(
|
||||||
(eventType: string, content: unknown, roomId?: string, delay?: number, parentDelayId?: string) =>
|
async (eventType: string, content: unknown, roomId?: string, delay?: number, parentDelayId?: string) =>
|
||||||
delay === undefined && parentDelayId === undefined
|
delay === undefined && parentDelayId === undefined
|
||||||
? { event_id: `$${Math.random()}` }
|
? { event_id: `$${Math.random()}` }
|
||||||
: { delay_id: `id-${Math.random()}` },
|
: { delay_id: `id-${Math.random()}` },
|
||||||
);
|
);
|
||||||
public sendStateEvent = jest.fn(
|
public sendStateEvent = jest.fn(
|
||||||
(
|
async (
|
||||||
eventType: string,
|
eventType: string,
|
||||||
stateKey: string,
|
stateKey: string,
|
||||||
content: unknown,
|
content: unknown,
|
||||||
@@ -80,17 +80,17 @@ class MockWidgetApi extends EventEmitter {
|
|||||||
? { event_id: `$${Math.random()}` }
|
? { event_id: `$${Math.random()}` }
|
||||||
: { delay_id: `id-${Math.random()}` },
|
: { delay_id: `id-${Math.random()}` },
|
||||||
);
|
);
|
||||||
public updateDelayedEvent = jest.fn();
|
public updateDelayedEvent = jest.fn().mockResolvedValue(undefined);
|
||||||
public sendToDevice = jest.fn();
|
public sendToDevice = jest.fn().mockResolvedValue(undefined);
|
||||||
public requestOpenIDConnectToken = jest.fn(() => {
|
public requestOpenIDConnectToken = jest.fn(async () => {
|
||||||
return testOIDCToken;
|
return testOIDCToken;
|
||||||
return new Promise<IOpenIDCredentials>(() => {
|
return new Promise<IOpenIDCredentials>(() => {
|
||||||
return testOIDCToken;
|
return testOIDCToken;
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
public readStateEvents = jest.fn(() => []);
|
public readStateEvents = jest.fn(async () => []);
|
||||||
public getTurnServers = jest.fn(() => []);
|
public getTurnServers = jest.fn(async () => []);
|
||||||
public sendContentLoaded = jest.fn();
|
public sendContentLoaded = jest.fn().mockResolvedValue(undefined);
|
||||||
|
|
||||||
public transport = {
|
public transport = {
|
||||||
reply: jest.fn(),
|
reply: jest.fn(),
|
||||||
|
@@ -19,10 +19,11 @@ limitations under the License.
|
|||||||
|
|
||||||
import { type MockedFunction, type Mock } from "jest-mock";
|
import { type MockedFunction, type Mock } from "jest-mock";
|
||||||
|
|
||||||
import { EventType, HTTPError, MatrixError, type Room } from "../../../src";
|
import { EventType, HTTPError, MatrixError, UnsupportedDelayedEventsEndpointError, type Room } from "../../../src";
|
||||||
import { type Focus, type LivekitFocusActive, type SessionMembershipData } from "../../../src/matrixrtc";
|
import { type Focus, type LivekitFocusActive, type SessionMembershipData } from "../../../src/matrixrtc";
|
||||||
import { LegacyMembershipManager } from "../../../src/matrixrtc/MembershipManager";
|
import { LegacyMembershipManager } from "../../../src/matrixrtc/LegacyMembershipManager";
|
||||||
import { makeMockClient, makeMockRoom, membershipTemplate, mockCallMembership, type MockClient } from "./mocks";
|
import { makeMockClient, makeMockRoom, membershipTemplate, mockCallMembership, type MockClient } from "./mocks";
|
||||||
|
import { MembershipManager } from "../../../src/matrixrtc/NewMembershipManager";
|
||||||
import { defer } from "../../../src/utils";
|
import { defer } from "../../../src/utils";
|
||||||
|
|
||||||
function waitForMockCall(method: MockedFunction<any>, returnVal?: Promise<any>) {
|
function waitForMockCall(method: MockedFunction<any>, returnVal?: Promise<any>) {
|
||||||
@@ -44,9 +45,10 @@ function createAsyncHandle(method: MockedFunction<any>) {
|
|||||||
* Tests different MembershipManager implementations. Some tests don't apply to `LegacyMembershipManager`
|
* Tests different MembershipManager implementations. Some tests don't apply to `LegacyMembershipManager`
|
||||||
* use !FailsForLegacy to skip those. See: testEnvironment for more details.
|
* use !FailsForLegacy to skip those. See: testEnvironment for more details.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
describe.each([
|
describe.each([
|
||||||
{ TestMembershipManager: LegacyMembershipManager, description: "LegacyMembershipManager" },
|
{ TestMembershipManager: LegacyMembershipManager, description: "LegacyMembershipManager" },
|
||||||
// { TestMembershipManager: MembershipManager, description: "MembershipManager" },
|
{ TestMembershipManager: MembershipManager, description: "MembershipManager" },
|
||||||
])("$description", ({ TestMembershipManager }) => {
|
])("$description", ({ TestMembershipManager }) => {
|
||||||
let client: MockClient;
|
let client: MockClient;
|
||||||
let room: Room;
|
let room: Room;
|
||||||
@@ -244,7 +246,12 @@ describe.each([
|
|||||||
const delayedHandle = createAsyncHandle(client._unstable_sendDelayedStateEvent as Mock);
|
const delayedHandle = createAsyncHandle(client._unstable_sendDelayedStateEvent as Mock);
|
||||||
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||||
manager.join([focus], focusActive);
|
manager.join([focus], focusActive);
|
||||||
delayedHandle.reject?.(Error("Server does not support the delayed events API"));
|
delayedHandle.reject?.(
|
||||||
|
new UnsupportedDelayedEventsEndpointError(
|
||||||
|
"Server does not support the delayed events API",
|
||||||
|
"sendDelayedStateEvent",
|
||||||
|
),
|
||||||
|
);
|
||||||
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
|
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
it("does try to schedule a delayed leave event again if rate limited", async () => {
|
it("does try to schedule a delayed leave event again if rate limited", async () => {
|
||||||
@@ -328,6 +335,7 @@ describe.each([
|
|||||||
await jest.advanceTimersByTimeAsync(1);
|
await jest.advanceTimersByTimeAsync(1);
|
||||||
(client._unstable_updateDelayedEvent as Mock<any>).mockRejectedValue("unknown");
|
(client._unstable_updateDelayedEvent as Mock<any>).mockRejectedValue("unknown");
|
||||||
await manager.leave();
|
await manager.leave();
|
||||||
|
|
||||||
// We send a normal leave event since we failed using updateDelayedEvent with the "send" action.
|
// We send a normal leave event since we failed using updateDelayedEvent with the "send" action.
|
||||||
expect(client.sendStateEvent).toHaveBeenLastCalledWith(
|
expect(client.sendStateEvent).toHaveBeenLastCalledWith(
|
||||||
room.roomId,
|
room.roomId,
|
||||||
@@ -337,9 +345,9 @@ describe.each([
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
// FailsForLegacy because legacy implementation always sends the empty state event even though it isn't needed
|
// FailsForLegacy because legacy implementation always sends the empty state event even though it isn't needed
|
||||||
it("does nothing if not joined !FailsForLegacy", async () => {
|
it("does nothing if not joined !FailsForLegacy", () => {
|
||||||
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||||
await manager.leave();
|
expect(async () => await manager.leave()).not.toThrow();
|
||||||
expect(client._unstable_sendDelayedStateEvent).not.toHaveBeenCalled();
|
expect(client._unstable_sendDelayedStateEvent).not.toHaveBeenCalled();
|
||||||
expect(client.sendStateEvent).not.toHaveBeenCalled();
|
expect(client.sendStateEvent).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
@@ -470,10 +478,10 @@ describe.each([
|
|||||||
// !FailsForLegacy because the expires logic was removed for the legacy call manager.
|
// !FailsForLegacy because the expires logic was removed for the legacy call manager.
|
||||||
// Delayed events should replace it entirely but before they have wide adoption
|
// Delayed events should replace it entirely but before they have wide adoption
|
||||||
// the expiration logic still makes sense.
|
// the expiration logic still makes sense.
|
||||||
// TODO: add git commit when we removed it.
|
// TODO: Add git commit when we removed it.
|
||||||
it("extends `expires` when call still active !FailsForLegacy", async () => {
|
async function testExpires(expire: number, headroom?: number) {
|
||||||
const manager = new TestMembershipManager(
|
const manager = new TestMembershipManager(
|
||||||
{ membershipExpiryTimeout: 10_000 },
|
{ membershipExpiryTimeout: expire, membershipExpiryTimeoutHeadroom: headroom },
|
||||||
room,
|
room,
|
||||||
client,
|
client,
|
||||||
() => undefined,
|
() => undefined,
|
||||||
@@ -482,13 +490,19 @@ describe.each([
|
|||||||
await waitForMockCall(client.sendStateEvent);
|
await waitForMockCall(client.sendStateEvent);
|
||||||
expect(client.sendStateEvent).toHaveBeenCalledTimes(1);
|
expect(client.sendStateEvent).toHaveBeenCalledTimes(1);
|
||||||
const sentMembership = (client.sendStateEvent as Mock).mock.calls[0][2] as SessionMembershipData;
|
const sentMembership = (client.sendStateEvent as Mock).mock.calls[0][2] as SessionMembershipData;
|
||||||
expect(sentMembership.expires).toBe(10_000);
|
expect(sentMembership.expires).toBe(expire);
|
||||||
for (let i = 2; i <= 12; i++) {
|
for (let i = 2; i <= 12; i++) {
|
||||||
await jest.advanceTimersByTimeAsync(10_000);
|
await jest.advanceTimersByTimeAsync(expire);
|
||||||
expect(client.sendStateEvent).toHaveBeenCalledTimes(i);
|
expect(client.sendStateEvent).toHaveBeenCalledTimes(i);
|
||||||
const sentMembership = (client.sendStateEvent as Mock).mock.lastCall![2] as SessionMembershipData;
|
const sentMembership = (client.sendStateEvent as Mock).mock.lastCall![2] as SessionMembershipData;
|
||||||
expect(sentMembership.expires).toBe(10_000 * i);
|
expect(sentMembership.expires).toBe(expire * i);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
it("extends `expires` when call still active !FailsForLegacy", async () => {
|
||||||
|
await testExpires(10_000);
|
||||||
|
});
|
||||||
|
it("extends `expires` using headroom configuration !FailsForLegacy", async () => {
|
||||||
|
await testExpires(10_000, 1_000);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -544,7 +558,7 @@ describe.each([
|
|||||||
expect(client.sendStateEvent).toHaveBeenCalledTimes(1);
|
expect(client.sendStateEvent).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
// FailsForLegacy as implementation does not re-check membership before retrying.
|
// FailsForLegacy as implementation does not re-check membership before retrying.
|
||||||
it("abandons retry loop if leave() was called !FailsForLegacy", async () => {
|
it("abandons retry loop if leave() was called before sending state event !FailsForLegacy", async () => {
|
||||||
const handle = createAsyncHandle(client._unstable_sendDelayedStateEvent);
|
const handle = createAsyncHandle(client._unstable_sendDelayedStateEvent);
|
||||||
|
|
||||||
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||||
@@ -565,7 +579,6 @@ describe.each([
|
|||||||
await manager.leave();
|
await manager.leave();
|
||||||
|
|
||||||
// Wait for all timers to be setup
|
// Wait for all timers to be setup
|
||||||
// await flushPromises();
|
|
||||||
await jest.advanceTimersByTimeAsync(1000);
|
await jest.advanceTimersByTimeAsync(1000);
|
||||||
|
|
||||||
// No new events should have been sent:
|
// No new events should have been sent:
|
||||||
@@ -603,4 +616,87 @@ describe.each([
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
describe("unrecoverable errors", () => {
|
||||||
|
// !FailsForLegacy because legacy does not have a retry limit and no mechanism to communicate unrecoverable errors.
|
||||||
|
it("throws, when reaching maximum number of retries for initial delayed event creation !FailsForLegacy", async () => {
|
||||||
|
const delayEventSendError = jest.fn();
|
||||||
|
(client._unstable_sendDelayedStateEvent as Mock<any>).mockRejectedValue(
|
||||||
|
new MatrixError(
|
||||||
|
{ errcode: "M_LIMIT_EXCEEDED" },
|
||||||
|
429,
|
||||||
|
undefined,
|
||||||
|
undefined,
|
||||||
|
new Headers({ "Retry-After": "2" }),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||||
|
manager.join([focus], focusActive, delayEventSendError);
|
||||||
|
|
||||||
|
for (let i = 0; i < 10; i++) {
|
||||||
|
await jest.advanceTimersByTimeAsync(2000);
|
||||||
|
}
|
||||||
|
expect(delayEventSendError).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
// !FailsForLegacy because legacy does not have a retry limit and no mechanism to communicate unrecoverable errors.
|
||||||
|
it("throws, when reaching maximum number of retries !FailsForLegacy", async () => {
|
||||||
|
const delayEventRestartError = jest.fn();
|
||||||
|
(client._unstable_updateDelayedEvent as Mock<any>).mockRejectedValue(
|
||||||
|
new MatrixError(
|
||||||
|
{ errcode: "M_LIMIT_EXCEEDED" },
|
||||||
|
429,
|
||||||
|
undefined,
|
||||||
|
undefined,
|
||||||
|
new Headers({ "Retry-After": "1" }),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||||
|
manager.join([focus], focusActive, delayEventRestartError);
|
||||||
|
|
||||||
|
for (let i = 0; i < 10; i++) {
|
||||||
|
await jest.advanceTimersByTimeAsync(1000);
|
||||||
|
}
|
||||||
|
expect(delayEventRestartError).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
it("falls back to using pure state events when some error occurs while sending delayed events !FailsForLegacy", async () => {
|
||||||
|
const unrecoverableError = jest.fn();
|
||||||
|
(client._unstable_sendDelayedStateEvent as Mock<any>).mockRejectedValue(new HTTPError("unknown", 601));
|
||||||
|
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||||
|
manager.join([focus], focusActive, unrecoverableError);
|
||||||
|
await waitForMockCall(client.sendStateEvent);
|
||||||
|
expect(unrecoverableError).not.toHaveBeenCalledWith();
|
||||||
|
expect(client.sendStateEvent).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
it("retries before failing in case its a network error !FailsForLegacy", async () => {
|
||||||
|
const unrecoverableError = jest.fn();
|
||||||
|
(client._unstable_sendDelayedStateEvent as Mock<any>).mockRejectedValue(new HTTPError("unknown", 501));
|
||||||
|
const manager = new TestMembershipManager(
|
||||||
|
{ callMemberEventRetryDelayMinimum: 1000, maximumNetworkErrorRetryCount: 7 },
|
||||||
|
room,
|
||||||
|
client,
|
||||||
|
() => undefined,
|
||||||
|
);
|
||||||
|
manager.join([focus], focusActive, unrecoverableError);
|
||||||
|
for (let retries = 0; retries < 7; retries++) {
|
||||||
|
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(retries + 1);
|
||||||
|
await jest.advanceTimersByTimeAsync(1000);
|
||||||
|
}
|
||||||
|
expect(unrecoverableError).toHaveBeenCalled();
|
||||||
|
expect(unrecoverableError.mock.lastCall![0].message).toMatch(
|
||||||
|
"The MembershipManager shut down because of the end condition",
|
||||||
|
);
|
||||||
|
expect(client.sendStateEvent).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
it("falls back to using pure state events when UnsupportedDelayedEventsEndpointError encountered for delayed events !FailsForLegacy", async () => {
|
||||||
|
const unrecoverableError = jest.fn();
|
||||||
|
(client._unstable_sendDelayedStateEvent as Mock<any>).mockRejectedValue(
|
||||||
|
new UnsupportedDelayedEventsEndpointError("not supported", "sendDelayedStateEvent"),
|
||||||
|
);
|
||||||
|
const manager = new TestMembershipManager({}, room, client, () => undefined);
|
||||||
|
manager.join([focus], focusActive, unrecoverableError);
|
||||||
|
await jest.advanceTimersByTimeAsync(1);
|
||||||
|
|
||||||
|
expect(unrecoverableError).not.toHaveBeenCalled();
|
||||||
|
expect(client.sendStateEvent).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
@@ -25,7 +25,7 @@ export const membershipTemplate: SessionMembershipData = {
|
|||||||
call_id: "",
|
call_id: "",
|
||||||
device_id: "AAAAAAA",
|
device_id: "AAAAAAA",
|
||||||
scope: "m.room",
|
scope: "m.room",
|
||||||
focus_active: { type: "livekit", livekit_service_url: "https://lk.url" },
|
focus_active: { type: "livekit", focus_selection: "oldest_membership" },
|
||||||
foci_preferred: [
|
foci_preferred: [
|
||||||
{
|
{
|
||||||
livekit_alias: "!alias:something.org",
|
livekit_alias: "!alias:something.org",
|
||||||
|
@@ -240,6 +240,7 @@ import {
|
|||||||
validateAuthMetadataAndKeys,
|
validateAuthMetadataAndKeys,
|
||||||
} from "./oidc/index.ts";
|
} from "./oidc/index.ts";
|
||||||
import { type EmptyObject } from "./@types/common.ts";
|
import { type EmptyObject } from "./@types/common.ts";
|
||||||
|
import { UnsupportedDelayedEventsEndpointError } from "./errors.ts";
|
||||||
|
|
||||||
export type Store = IStore;
|
export type Store = IStore;
|
||||||
|
|
||||||
@@ -3351,7 +3352,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
txnId?: string,
|
txnId?: string,
|
||||||
): Promise<SendDelayedEventResponse> {
|
): Promise<SendDelayedEventResponse> {
|
||||||
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
||||||
throw Error("Server does not support the delayed events API");
|
throw new UnsupportedDelayedEventsEndpointError(
|
||||||
|
"Server does not support the delayed events API",
|
||||||
|
"sendDelayedEvent",
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.addThreadRelationIfNeeded(content, threadId, roomId);
|
this.addThreadRelationIfNeeded(content, threadId, roomId);
|
||||||
@@ -3374,7 +3378,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
opts: IRequestOpts = {},
|
opts: IRequestOpts = {},
|
||||||
): Promise<SendDelayedEventResponse> {
|
): Promise<SendDelayedEventResponse> {
|
||||||
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
||||||
throw Error("Server does not support the delayed events API");
|
throw new UnsupportedDelayedEventsEndpointError(
|
||||||
|
"Server does not support the delayed events API",
|
||||||
|
"sendDelayedStateEvent",
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const pathParams = {
|
const pathParams = {
|
||||||
@@ -3398,7 +3405,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
// eslint-disable-next-line
|
// eslint-disable-next-line
|
||||||
public async _unstable_getDelayedEvents(fromToken?: string): Promise<DelayedEventInfo> {
|
public async _unstable_getDelayedEvents(fromToken?: string): Promise<DelayedEventInfo> {
|
||||||
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
||||||
throw Error("Server does not support the delayed events API");
|
throw new UnsupportedDelayedEventsEndpointError(
|
||||||
|
"Server does not support the delayed events API",
|
||||||
|
"getDelayedEvents",
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const queryDict = fromToken ? { from: fromToken } : undefined;
|
const queryDict = fromToken ? { from: fromToken } : undefined;
|
||||||
@@ -3420,7 +3430,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
requestOptions: IRequestOpts = {},
|
requestOptions: IRequestOpts = {},
|
||||||
): Promise<EmptyObject> {
|
): Promise<EmptyObject> {
|
||||||
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
||||||
throw Error("Server does not support the delayed events API");
|
throw new UnsupportedDelayedEventsEndpointError(
|
||||||
|
"Server does not support the delayed events API",
|
||||||
|
"updateDelayedEvent",
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const path = utils.encodeUri("/delayed_events/$delayId", {
|
const path = utils.encodeUri("/delayed_events/$delayId", {
|
||||||
|
@@ -50,12 +50,12 @@ import {
|
|||||||
} from "./client.ts";
|
} from "./client.ts";
|
||||||
import { SyncApi, SyncState } from "./sync.ts";
|
import { SyncApi, SyncState } from "./sync.ts";
|
||||||
import { SlidingSyncSdk } from "./sliding-sync-sdk.ts";
|
import { SlidingSyncSdk } from "./sliding-sync-sdk.ts";
|
||||||
import { MatrixError } from "./http-api/errors.ts";
|
import { ConnectionError, MatrixError } from "./http-api/errors.ts";
|
||||||
import { User } from "./models/user.ts";
|
import { User } from "./models/user.ts";
|
||||||
import { type Room } from "./models/room.ts";
|
import { type Room } from "./models/room.ts";
|
||||||
import { type ToDeviceBatch, type ToDevicePayload } from "./models/ToDeviceMessage.ts";
|
import { type ToDeviceBatch, type ToDevicePayload } from "./models/ToDeviceMessage.ts";
|
||||||
import { MapWithDefault, recursiveMapToObject } from "./utils.ts";
|
import { MapWithDefault, recursiveMapToObject } from "./utils.ts";
|
||||||
import { type EmptyObject, TypedEventEmitter } from "./matrix.ts";
|
import { type EmptyObject, TypedEventEmitter, UnsupportedDelayedEventsEndpointError } from "./matrix.ts";
|
||||||
|
|
||||||
interface IStateEventRequest {
|
interface IStateEventRequest {
|
||||||
eventType: string;
|
eventType: string;
|
||||||
@@ -358,13 +358,15 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
// Delayed event special case.
|
// Delayed event special case.
|
||||||
if (delayOpts) {
|
if (delayOpts) {
|
||||||
// TODO: updatePendingEvent for delayed events?
|
// TODO: updatePendingEvent for delayed events?
|
||||||
const response = await this.widgetApi.sendRoomEvent(
|
const response = await this.widgetApi
|
||||||
event.getType(),
|
.sendRoomEvent(
|
||||||
content,
|
event.getType(),
|
||||||
room.roomId,
|
content,
|
||||||
"delay" in delayOpts ? delayOpts.delay : undefined,
|
room.roomId,
|
||||||
"parent_delay_id" in delayOpts ? delayOpts.parent_delay_id : undefined,
|
"delay" in delayOpts ? delayOpts.delay : undefined,
|
||||||
);
|
"parent_delay_id" in delayOpts ? delayOpts.parent_delay_id : undefined,
|
||||||
|
)
|
||||||
|
.catch(timeoutToConnectionError);
|
||||||
return this.validateSendDelayedEventResponse(response);
|
return this.validateSendDelayedEventResponse(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -374,7 +376,9 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
|
|
||||||
let response: ISendEventFromWidgetResponseData;
|
let response: ISendEventFromWidgetResponseData;
|
||||||
try {
|
try {
|
||||||
response = await this.widgetApi.sendRoomEvent(event.getType(), content, room.roomId);
|
response = await this.widgetApi
|
||||||
|
.sendRoomEvent(event.getType(), content, room.roomId)
|
||||||
|
.catch(timeoutToConnectionError);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
this.updatePendingEventStatus(room, event, EventStatus.NOT_SENT);
|
this.updatePendingEventStatus(room, event, EventStatus.NOT_SENT);
|
||||||
throw e;
|
throw e;
|
||||||
@@ -397,7 +401,9 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
content: any,
|
content: any,
|
||||||
stateKey = "",
|
stateKey = "",
|
||||||
): Promise<ISendEventResponse> {
|
): Promise<ISendEventResponse> {
|
||||||
const response = await this.widgetApi.sendStateEvent(eventType, stateKey, content, roomId);
|
const response = await this.widgetApi
|
||||||
|
.sendStateEvent(eventType, stateKey, content, roomId)
|
||||||
|
.catch(timeoutToConnectionError);
|
||||||
if (response.event_id === undefined) {
|
if (response.event_id === undefined) {
|
||||||
throw new Error("'event_id' absent from response to an event request");
|
throw new Error("'event_id' absent from response to an event request");
|
||||||
}
|
}
|
||||||
@@ -416,17 +422,22 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
stateKey = "",
|
stateKey = "",
|
||||||
): Promise<SendDelayedEventResponse> {
|
): Promise<SendDelayedEventResponse> {
|
||||||
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
||||||
throw Error("Server does not support the delayed events API");
|
throw new UnsupportedDelayedEventsEndpointError(
|
||||||
|
"Server does not support the delayed events API",
|
||||||
|
"sendDelayedStateEvent",
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const response = await this.widgetApi.sendStateEvent(
|
const response = await this.widgetApi
|
||||||
eventType,
|
.sendStateEvent(
|
||||||
stateKey,
|
eventType,
|
||||||
content,
|
stateKey,
|
||||||
roomId,
|
content,
|
||||||
"delay" in delayOpts ? delayOpts.delay : undefined,
|
roomId,
|
||||||
"parent_delay_id" in delayOpts ? delayOpts.parent_delay_id : undefined,
|
"delay" in delayOpts ? delayOpts.delay : undefined,
|
||||||
);
|
"parent_delay_id" in delayOpts ? delayOpts.parent_delay_id : undefined,
|
||||||
|
)
|
||||||
|
.catch(timeoutToConnectionError);
|
||||||
return this.validateSendDelayedEventResponse(response);
|
return this.validateSendDelayedEventResponse(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -443,20 +454,25 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
// eslint-disable-next-line
|
// eslint-disable-next-line
|
||||||
public async _unstable_updateDelayedEvent(delayId: string, action: UpdateDelayedEventAction): Promise<EmptyObject> {
|
public async _unstable_updateDelayedEvent(delayId: string, action: UpdateDelayedEventAction): Promise<EmptyObject> {
|
||||||
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
|
||||||
throw Error("Server does not support the delayed events API");
|
throw new UnsupportedDelayedEventsEndpointError(
|
||||||
|
"Server does not support the delayed events API",
|
||||||
|
"updateDelayedEvent",
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.widgetApi.updateDelayedEvent(delayId, action);
|
await this.widgetApi.updateDelayedEvent(delayId, action).catch(timeoutToConnectionError);
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
public async sendToDevice(eventType: string, contentMap: SendToDeviceContentMap): Promise<EmptyObject> {
|
public async sendToDevice(eventType: string, contentMap: SendToDeviceContentMap): Promise<EmptyObject> {
|
||||||
await this.widgetApi.sendToDevice(eventType, false, recursiveMapToObject(contentMap));
|
await this.widgetApi
|
||||||
|
.sendToDevice(eventType, false, recursiveMapToObject(contentMap))
|
||||||
|
.catch(timeoutToConnectionError);
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
public async getOpenIdToken(): Promise<IOpenIDToken> {
|
public async getOpenIdToken(): Promise<IOpenIDToken> {
|
||||||
const token = await this.widgetApi.requestOpenIDConnectToken();
|
const token = await this.widgetApi.requestOpenIDConnectToken().catch(timeoutToConnectionError);
|
||||||
// the IOpenIDCredentials from the widget-api and IOpenIDToken form the matrix-js-sdk are compatible.
|
// the IOpenIDCredentials from the widget-api and IOpenIDToken form the matrix-js-sdk are compatible.
|
||||||
// we still recreate the token to make this transparent and catch'able by the linter in case the types change in the future.
|
// we still recreate the token to make this transparent and catch'able by the linter in case the types change in the future.
|
||||||
return <IOpenIDToken>{
|
return <IOpenIDToken>{
|
||||||
@@ -474,7 +490,9 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
contentMap.getOrCreate(userId).set(deviceId, payload);
|
contentMap.getOrCreate(userId).set(deviceId, payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.widgetApi.sendToDevice(eventType, false, recursiveMapToObject(contentMap));
|
await this.widgetApi
|
||||||
|
.sendToDevice(eventType, false, recursiveMapToObject(contentMap))
|
||||||
|
.catch(timeoutToConnectionError);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async encryptAndSendToDevices(userDeviceInfoArr: OlmDevice[], payload: object): Promise<void> {
|
public async encryptAndSendToDevices(userDeviceInfoArr: OlmDevice[], payload: object): Promise<void> {
|
||||||
@@ -484,7 +502,9 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
contentMap.getOrCreate(userId).set(deviceId, payload);
|
contentMap.getOrCreate(userId).set(deviceId, payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.widgetApi.sendToDevice((payload as { type: string }).type, true, recursiveMapToObject(contentMap));
|
await this.widgetApi
|
||||||
|
.sendToDevice((payload as { type: string }).type, true, recursiveMapToObject(contentMap))
|
||||||
|
.catch(timeoutToConnectionError);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -505,7 +525,9 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
encrypted: boolean,
|
encrypted: boolean,
|
||||||
contentMap: SendToDeviceContentMap,
|
contentMap: SendToDeviceContentMap,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
await this.widgetApi.sendToDevice(eventType, encrypted, recursiveMapToObject(contentMap));
|
await this.widgetApi
|
||||||
|
.sendToDevice(eventType, encrypted, recursiveMapToObject(contentMap))
|
||||||
|
.catch(timeoutToConnectionError);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Overridden since we get TURN servers automatically over the widget API,
|
// Overridden since we get TURN servers automatically over the widget API,
|
||||||
@@ -670,3 +692,16 @@ function processAndThrow(error: unknown): never {
|
|||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This converts an "Request timed out" error from the PostmessageTransport into a ConnectionError.
|
||||||
|
* It either throws the original error or a new ConnectionError.
|
||||||
|
**/
|
||||||
|
function timeoutToConnectionError(error: unknown): never {
|
||||||
|
// TODO: this should not check on error.message but instead it should be a specific type
|
||||||
|
// error instanceof WidgetTimeoutError
|
||||||
|
if (error instanceof Error && error.message === "Request timed out") {
|
||||||
|
throw new ConnectionError("widget api timeout");
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
@@ -52,3 +52,16 @@ export class ClientStoppedError extends Error {
|
|||||||
super("MatrixClient has been stopped");
|
super("MatrixClient has been stopped");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This error is thrown when the Homeserver does not support the delayed events enpdpoints.
|
||||||
|
*/
|
||||||
|
export class UnsupportedDelayedEventsEndpointError extends Error {
|
||||||
|
public constructor(
|
||||||
|
message: string,
|
||||||
|
public clientEndpoint: "sendDelayedEvent" | "updateDelayedEvent" | "sendDelayedStateEvent" | "getDelayedEvents",
|
||||||
|
) {
|
||||||
|
super(message);
|
||||||
|
this.name = "UnsupportedDelayedEventsEndpointError";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -1,3 +1,19 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2025 The Matrix.org Foundation C.I.C.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
import { EventType } from "../@types/event.ts";
|
import { EventType } from "../@types/event.ts";
|
||||||
import { UpdateDelayedEventAction } from "../@types/requests.ts";
|
import { UpdateDelayedEventAction } from "../@types/requests.ts";
|
||||||
import type { MatrixClient } from "../client.ts";
|
import type { MatrixClient } from "../client.ts";
|
||||||
@@ -11,44 +27,7 @@ import { type Focus } from "./focus.ts";
|
|||||||
import { isLivekitFocusActive } from "./LivekitFocus.ts";
|
import { isLivekitFocusActive } from "./LivekitFocus.ts";
|
||||||
import { type MembershipConfig } from "./MatrixRTCSession.ts";
|
import { type MembershipConfig } from "./MatrixRTCSession.ts";
|
||||||
import { type EmptyObject } from "../@types/common.ts";
|
import { type EmptyObject } from "../@types/common.ts";
|
||||||
/**
|
import { type IMembershipManager } from "./NewMembershipManager.ts";
|
||||||
* This interface defines what a MembershipManager uses and exposes.
|
|
||||||
* This interface is what we use to write tests and allows to change the actual implementation
|
|
||||||
* Without breaking tests because of some internal method renaming.
|
|
||||||
*
|
|
||||||
* @internal
|
|
||||||
*/
|
|
||||||
export interface IMembershipManager {
|
|
||||||
/**
|
|
||||||
* If we are trying to join the session.
|
|
||||||
* It does not reflect if the room state is already configures to represent us being joined.
|
|
||||||
* It only means that the Manager is running.
|
|
||||||
* @returns true if we intend to be participating in the MatrixRTC session
|
|
||||||
*/
|
|
||||||
isJoined(): boolean;
|
|
||||||
/**
|
|
||||||
* Start sending all necessary events to make this user participant in the RTC session.
|
|
||||||
* @param fociPreferred the list of preferred foci to use in the joined RTC membership event.
|
|
||||||
* @param fociActive the active focus to use in the joined RTC membership event.
|
|
||||||
*/
|
|
||||||
join(fociPreferred: Focus[], fociActive?: Focus): void;
|
|
||||||
/**
|
|
||||||
* Send all necessary events to make this user leave the RTC session.
|
|
||||||
* @param timeout the maximum duration in ms until the promise is forced to resolve.
|
|
||||||
* @returns It resolves with true in case the leave was sent successfully.
|
|
||||||
* It resolves with false in case we hit the timeout before sending successfully.
|
|
||||||
*/
|
|
||||||
leave(timeout?: number): Promise<boolean>;
|
|
||||||
/**
|
|
||||||
* Call this if the MatrixRTC session members have changed.
|
|
||||||
*/
|
|
||||||
onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise<void>;
|
|
||||||
/**
|
|
||||||
* The used active focus in the currently joined session.
|
|
||||||
* @returns the used active focus in the currently joined session or undefined if not joined.
|
|
||||||
*/
|
|
||||||
getActiveFocus(): Focus | undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This internal class is used by the MatrixRTCSession to manage the local user's own membership of the session.
|
* This internal class is used by the MatrixRTCSession to manage the local user's own membership of the session.
|
||||||
@@ -64,7 +43,8 @@ export interface IMembershipManager {
|
|||||||
*
|
*
|
||||||
* It is recommended to only use this interface for testing to allow replacing this class.
|
* It is recommended to only use this interface for testing to allow replacing this class.
|
||||||
*
|
*
|
||||||
* @internal
|
* @internal
|
||||||
|
* @deprecated Use {@link MembershipManager} instead
|
||||||
*/
|
*/
|
||||||
export class LegacyMembershipManager implements IMembershipManager {
|
export class LegacyMembershipManager implements IMembershipManager {
|
||||||
private relativeExpiry: number | undefined;
|
private relativeExpiry: number | undefined;
|
@@ -25,8 +25,10 @@ import { RoomStateEvent } from "../models/room-state.ts";
|
|||||||
import { type Focus } from "./focus.ts";
|
import { type Focus } from "./focus.ts";
|
||||||
import { KnownMembership } from "../@types/membership.ts";
|
import { KnownMembership } from "../@types/membership.ts";
|
||||||
import { type MatrixEvent } from "../models/event.ts";
|
import { type MatrixEvent } from "../models/event.ts";
|
||||||
import { LegacyMembershipManager, type IMembershipManager } from "./MembershipManager.ts";
|
import { MembershipManager, type IMembershipManager } from "./NewMembershipManager.ts";
|
||||||
import { EncryptionManager, type IEncryptionManager, type Statistics } from "./EncryptionManager.ts";
|
import { EncryptionManager, type IEncryptionManager, type Statistics } from "./EncryptionManager.ts";
|
||||||
|
import { LegacyMembershipManager } from "./LegacyMembershipManager.ts";
|
||||||
|
import { logDurationSync } from "../utils.ts";
|
||||||
|
|
||||||
const logger = rootLogger.getChild("MatrixRTCSession");
|
const logger = rootLogger.getChild("MatrixRTCSession");
|
||||||
|
|
||||||
@@ -39,6 +41,8 @@ export enum MatrixRTCSessionEvent {
|
|||||||
JoinStateChanged = "join_state_changed",
|
JoinStateChanged = "join_state_changed",
|
||||||
// The key used to encrypt media has changed
|
// The key used to encrypt media has changed
|
||||||
EncryptionKeyChanged = "encryption_key_changed",
|
EncryptionKeyChanged = "encryption_key_changed",
|
||||||
|
/** The membership manager had to shut down caused by an unrecoverable error */
|
||||||
|
MembershipManagerError = "membership_manager_error",
|
||||||
}
|
}
|
||||||
|
|
||||||
export type MatrixRTCSessionEventHandlerMap = {
|
export type MatrixRTCSessionEventHandlerMap = {
|
||||||
@@ -52,9 +56,17 @@ export type MatrixRTCSessionEventHandlerMap = {
|
|||||||
encryptionKeyIndex: number,
|
encryptionKeyIndex: number,
|
||||||
participantId: string,
|
participantId: string,
|
||||||
) => void;
|
) => void;
|
||||||
|
[MatrixRTCSessionEvent.MembershipManagerError]: (error: unknown) => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
export interface MembershipConfig {
|
export interface MembershipConfig {
|
||||||
|
/**
|
||||||
|
* Use the new Manager.
|
||||||
|
*
|
||||||
|
* Default: `false`.
|
||||||
|
*/
|
||||||
|
useNewMembershipManager?: boolean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The timeout (in milliseconds) after we joined the call, that our membership should expire
|
* The timeout (in milliseconds) after we joined the call, that our membership should expire
|
||||||
* unless we have explicitly updated it.
|
* unless we have explicitly updated it.
|
||||||
@@ -63,6 +75,17 @@ export interface MembershipConfig {
|
|||||||
*/
|
*/
|
||||||
membershipExpiryTimeout?: number;
|
membershipExpiryTimeout?: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The time in (in milliseconds) which the manager will prematurely send the updated state event before the membership `expires` time to make sure it
|
||||||
|
* sends the updated state event early enough.
|
||||||
|
*
|
||||||
|
* A headroom of 1000ms and a `membershipExpiryTimeout` of 10000ms would result in the first membership event update after 9s and
|
||||||
|
* a membership event that would be considered expired after 10s.
|
||||||
|
*
|
||||||
|
* This value does not have an effect on the value of `SessionMembershipData.expires`.
|
||||||
|
*/
|
||||||
|
membershipExpiryTimeoutHeadroom?: number;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The period (in milliseconds) with which we check that our membership event still exists on the
|
* The period (in milliseconds) with which we check that our membership event still exists on the
|
||||||
* server. If it is not found we create it again.
|
* server. If it is not found we create it again.
|
||||||
@@ -90,6 +113,16 @@ export interface MembershipConfig {
|
|||||||
* @deprecated It should be possible to make it stable without this.
|
* @deprecated It should be possible to make it stable without this.
|
||||||
*/
|
*/
|
||||||
callMemberEventRetryJitter?: number;
|
callMemberEventRetryJitter?: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of retries that the manager will do for delayed event sending/updating and state event sending when a server rate limit has been hit.
|
||||||
|
*/
|
||||||
|
maximumRateLimitRetryCount?: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of retries that the manager will do for delayed event sending/updating and state event sending when a network error occurs.
|
||||||
|
*/
|
||||||
|
maximumNetworkErrorRetryCount?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface EncryptionConfig {
|
export interface EncryptionConfig {
|
||||||
@@ -318,18 +351,28 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
|
|||||||
* @param joinConfig - Additional configuration for the joined session.
|
* @param joinConfig - Additional configuration for the joined session.
|
||||||
*/
|
*/
|
||||||
public joinRoomSession(fociPreferred: Focus[], fociActive?: Focus, joinConfig?: JoinSessionConfig): void {
|
public joinRoomSession(fociPreferred: Focus[], fociActive?: Focus, joinConfig?: JoinSessionConfig): void {
|
||||||
// create MembershipManager
|
|
||||||
if (this.isJoined()) {
|
if (this.isJoined()) {
|
||||||
logger.info(`Already joined to session in room ${this.roomSubset.roomId}: ignoring join call`);
|
logger.info(`Already joined to session in room ${this.roomSubset.roomId}: ignoring join call`);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
this.membershipManager = new LegacyMembershipManager(joinConfig, this.roomSubset, this.client, () =>
|
// Create MembershipManager
|
||||||
this.getOldestMembership(),
|
if (joinConfig?.useNewMembershipManager ?? false) {
|
||||||
);
|
this.membershipManager = new MembershipManager(joinConfig, this.roomSubset, this.client, () =>
|
||||||
|
this.getOldestMembership(),
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
this.membershipManager = new LegacyMembershipManager(joinConfig, this.roomSubset, this.client, () =>
|
||||||
|
this.getOldestMembership(),
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Join!
|
// Join!
|
||||||
this.membershipManager!.join(fociPreferred, fociActive);
|
this.membershipManager!.join(fociPreferred, fociActive, (e) => {
|
||||||
|
logger.error("MembershipManager encountered an unrecoverable error: ", e);
|
||||||
|
this.emit(MatrixRTCSessionEvent.MembershipManagerError, e);
|
||||||
|
this.emit(MatrixRTCSessionEvent.JoinStateChanged, this.isJoined());
|
||||||
|
});
|
||||||
this.encryptionManager!.join(joinConfig);
|
this.encryptionManager!.join(joinConfig);
|
||||||
|
|
||||||
this.emit(MatrixRTCSessionEvent.JoinStateChanged, true);
|
this.emit(MatrixRTCSessionEvent.JoinStateChanged, true);
|
||||||
@@ -492,7 +535,9 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
|
|||||||
|
|
||||||
if (changed) {
|
if (changed) {
|
||||||
logger.info(`Memberships for call in room ${this.roomSubset.roomId} have changed: emitting`);
|
logger.info(`Memberships for call in room ${this.roomSubset.roomId} have changed: emitting`);
|
||||||
this.emit(MatrixRTCSessionEvent.MembershipsChanged, oldMemberships, this.memberships);
|
logDurationSync(logger, "emit MatrixRTCSessionEvent.MembershipsChanged", () => {
|
||||||
|
this.emit(MatrixRTCSessionEvent.MembershipsChanged, oldMemberships, this.memberships);
|
||||||
|
});
|
||||||
|
|
||||||
void this.membershipManager?.onRTCSessionMemberUpdate(this.memberships);
|
void this.membershipManager?.onRTCSessionMemberUpdate(this.memberships);
|
||||||
}
|
}
|
||||||
|
931
src/matrixrtc/NewMembershipManager.ts
Normal file
931
src/matrixrtc/NewMembershipManager.ts
Normal file
@@ -0,0 +1,931 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2025 The Matrix.org Foundation C.I.C.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { EventType } from "../@types/event.ts";
|
||||||
|
import { UpdateDelayedEventAction } from "../@types/requests.ts";
|
||||||
|
import { type MatrixClient } from "../client.ts";
|
||||||
|
import { UnsupportedDelayedEventsEndpointError } from "../errors.ts";
|
||||||
|
import { ConnectionError, HTTPError, MatrixError } from "../http-api/errors.ts";
|
||||||
|
import { logger as rootLogger } from "../logger.ts";
|
||||||
|
import { type Room } from "../models/room.ts";
|
||||||
|
import { defer, type IDeferred } from "../utils.ts";
|
||||||
|
import { type CallMembership, DEFAULT_EXPIRE_DURATION, type SessionMembershipData } from "./CallMembership.ts";
|
||||||
|
import { type Focus } from "./focus.ts";
|
||||||
|
import { isLivekitFocusActive } from "./LivekitFocus.ts";
|
||||||
|
import { type MembershipConfig } from "./MatrixRTCSession.ts";
|
||||||
|
import { ActionScheduler, type ActionUpdate } from "./NewMembershipManagerActionScheduler.ts";
|
||||||
|
|
||||||
|
const logger = rootLogger.getChild("MatrixRTCSession");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This interface defines what a MembershipManager uses and exposes.
|
||||||
|
* This interface is what we use to write tests and allows changing the actual implementation
|
||||||
|
* without breaking tests because of some internal method renaming.
|
||||||
|
*
|
||||||
|
* @internal
|
||||||
|
*/
|
||||||
|
export interface IMembershipManager {
|
||||||
|
/**
|
||||||
|
* If we are trying to join, or have successfully joined the session.
|
||||||
|
* It does not reflect if the room state is already configured to represent us being joined.
|
||||||
|
* It only means that the Manager is running.
|
||||||
|
* @returns true if we intend to be participating in the MatrixRTC session
|
||||||
|
*/
|
||||||
|
isJoined(): boolean;
|
||||||
|
/**
|
||||||
|
* Start sending all necessary events to make this user participate in the RTC session.
|
||||||
|
* @param fociPreferred the list of preferred foci to use in the joined RTC membership event.
|
||||||
|
* @param fociActive the active focus to use in the joined RTC membership event.
|
||||||
|
* @throws can throw if it exceeds a configured maximum retry.
|
||||||
|
*/
|
||||||
|
join(fociPreferred: Focus[], fociActive?: Focus, onError?: (error: unknown) => void): void;
|
||||||
|
/**
|
||||||
|
* Send all necessary events to make this user leave the RTC session.
|
||||||
|
* @param timeout the maximum duration in ms until the promise is forced to resolve.
|
||||||
|
* @returns It resolves with true in case the leave was sent successfully.
|
||||||
|
* It resolves with false in case we hit the timeout before sending successfully.
|
||||||
|
*/
|
||||||
|
leave(timeout?: number): Promise<boolean>;
|
||||||
|
/**
|
||||||
|
* Call this if the MatrixRTC session members have changed.
|
||||||
|
*/
|
||||||
|
onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise<void>;
|
||||||
|
/**
|
||||||
|
* The used active focus in the currently joined session.
|
||||||
|
* @returns the used active focus in the currently joined session or undefined if not joined.
|
||||||
|
*/
|
||||||
|
getActiveFocus(): Focus | undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* MembershipActionTypes:
|
||||||
|
▼
|
||||||
|
┌─────────────────────┐
|
||||||
|
│SendFirstDelayedEvent│
|
||||||
|
└─────────────────────┘
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
┌─────────────┐
|
||||||
|
┌────────────│SendJoinEvent│────────────┐
|
||||||
|
│ └─────────────┘ │
|
||||||
|
│ ┌─────┐ ┌──────┐ │ ┌──────┐
|
||||||
|
▼ ▼ │ │ ▼ ▼ ▼ │
|
||||||
|
┌────────────┐ │ │ ┌───────────────────┐ │
|
||||||
|
│UpdateExpiry│ │ │ │RestartDelayedEvent│ │
|
||||||
|
└────────────┘ │ │ └───────────────────┘ │
|
||||||
|
│ │ │ │ │ │
|
||||||
|
└─────┘ └──────┘ │ │
|
||||||
|
│ │
|
||||||
|
┌────────────────────┐ │ │
|
||||||
|
│SendMainDelayedEvent│◄───────┘ │
|
||||||
|
└───────────────────┬┘ │
|
||||||
|
│ │
|
||||||
|
└─────────────────────┘
|
||||||
|
STOP ALL ABOVE
|
||||||
|
▼
|
||||||
|
┌───────────────────────────────┐
|
||||||
|
│ SendScheduledDelayedLeaveEvent│
|
||||||
|
└───────────────────────────────┘
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
┌──────────────┐
|
||||||
|
│SendLeaveEvent│
|
||||||
|
└──────────────┘
|
||||||
|
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* The different types of actions the MembershipManager can take.
|
||||||
|
* @internal
|
||||||
|
*/
|
||||||
|
export enum MembershipActionType {
|
||||||
|
SendFirstDelayedEvent = "SendFirstDelayedEvent",
|
||||||
|
// -> MembershipActionType.SendJoinEvent if successful
|
||||||
|
// -> DelayedLeaveActionType.SendFirstDelayedEvent on error, retry sending the first delayed event.
|
||||||
|
SendJoinEvent = "SendJoinEvent",
|
||||||
|
// -> MembershipActionType.SendJoinEvent if we run into a rate limit and need to retry
|
||||||
|
// -> MembershipActionType.Update if we successfully send the join event then schedule the expire event update
|
||||||
|
// -> DelayedLeaveActionType.RestartDelayedEvent to recheck the delayed event
|
||||||
|
RestartDelayedEvent = "RestartDelayedEvent",
|
||||||
|
// -> DelayedLeaveActionType.SendMainDelayedEvent on missing delay id but there is a rtc state event
|
||||||
|
// -> DelayedLeaveActionType.SendFirstDelayedEvent on missing delay id and there is no state event
|
||||||
|
// -> DelayedLeaveActionType.RestartDelayedEvent on success we schedule the next restart
|
||||||
|
UpdateExpiry = "UpdateExpiry",
|
||||||
|
// -> MembershipActionType.Update if the timeout has passed so the next update is required.
|
||||||
|
SendMainDelayedEvent = "SendMainDelayedEvent",
|
||||||
|
// -> DelayedLeaveActionType.RestartDelayedEvent on success start updating the delayed event
|
||||||
|
// -> DelayedLeaveActionType.SendMainDelayedEvent on error try again
|
||||||
|
SendScheduledDelayedLeaveEvent = "SendScheduledDelayedLeaveEvent",
|
||||||
|
// -> MembershipActionType.SendLeaveEvent on failiour (not found) we need to send the leave manually and cannot use the scheduled delayed event
|
||||||
|
// -> DelayedLeaveActionType.SendScheduledDelayedLeaveEvent on error we try again.
|
||||||
|
SendLeaveEvent = "SendLeaveEvent",
|
||||||
|
// -> MembershipActionType.SendLeaveEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @internal
|
||||||
|
*/
|
||||||
|
export interface ActionSchedulerState {
|
||||||
|
/** The delayId we got when successfully sending the delayed leave event.
|
||||||
|
* Gets set to undefined if the server claims it cannot find the delayed event anymore. */
|
||||||
|
delayId?: string;
|
||||||
|
/** Stores how often we have update the `expires` field.
|
||||||
|
* `expireUpdateIterations` * `membershipEventExpiryTimeout` resolves to the value the expires field should contain next */
|
||||||
|
expireUpdateIterations: number;
|
||||||
|
/** The time at which we send the first state event. The time the call started from the DAG point of view.
|
||||||
|
* This is used to compute the local sleep timestamps when to next update the member event with a new expires value. */
|
||||||
|
startTime: number;
|
||||||
|
/** The manager is in the state where its actually connected to the session. */
|
||||||
|
hasMemberStateEvent: boolean;
|
||||||
|
// There can be multiple retries at once so we need to store counters per action
|
||||||
|
// e.g. the send update membership and the restart delayed could be rate limited at the same time.
|
||||||
|
/** Retry counter for rate limits */
|
||||||
|
rateLimitRetries: Map<MembershipActionType, number>;
|
||||||
|
/** Retry counter for other errors */
|
||||||
|
networkErrorRetries: Map<MembershipActionType, number>;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Status {
|
||||||
|
Disconnected = "Disconnected",
|
||||||
|
Connecting = "Connecting",
|
||||||
|
ConnectingFailed = "ConnectingFailed",
|
||||||
|
Connected = "Connected",
|
||||||
|
Reconnecting = "Reconnecting",
|
||||||
|
Disconnecting = "Disconnecting",
|
||||||
|
Stuck = "Stuck",
|
||||||
|
Unknown = "Unknown",
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is responsible for sending all events relating to the own membership of a matrixRTC call.
|
||||||
|
* It has the following tasks:
|
||||||
|
* - Send the users leave delayed event before sending the membership
|
||||||
|
* - Send the users membership if the state machine is started
|
||||||
|
* - Check if the delayed event was canceled due to sending the membership
|
||||||
|
* - update the delayed event (`restart`)
|
||||||
|
* - Update the state event every ~5h = `DEFAULT_EXPIRE_DURATION` (so it does not get treated as expired)
|
||||||
|
* - When the state machine is stopped:
|
||||||
|
* - Disconnect the member
|
||||||
|
* - Stop the timer for the delay refresh
|
||||||
|
* - Stop the timer for updating the state event
|
||||||
|
*/
|
||||||
|
export class MembershipManager implements IMembershipManager {
|
||||||
|
public isJoined(): boolean {
|
||||||
|
return this.scheduler.running;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Puts the MembershipManager in a state where it tries to be joined.
|
||||||
|
* It will send delayed events and membership events
|
||||||
|
* @param fociPreferred
|
||||||
|
* @param focusActive
|
||||||
|
* @param onError This will be called once the membership manager encounters an unrecoverable error.
|
||||||
|
* This should bubble up the the frontend to communicate that the call does not work in the current environment.
|
||||||
|
*/
|
||||||
|
public join(fociPreferred: Focus[], focusActive?: Focus, onError?: (error: unknown) => void): void {
|
||||||
|
if (this.isJoined()) {
|
||||||
|
logger.error("MembershipManager is already running. Ignoring join request.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.fociPreferred = fociPreferred;
|
||||||
|
this.focusActive = focusActive;
|
||||||
|
this.leavePromiseDefer = undefined;
|
||||||
|
|
||||||
|
this.state = MembershipManager.defaultState;
|
||||||
|
|
||||||
|
this.scheduler
|
||||||
|
.startWithJoin()
|
||||||
|
.then(() => {
|
||||||
|
this.leavePromiseDefer?.resolve(true);
|
||||||
|
this.leavePromiseDefer = undefined;
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
logger.error("MembershipManager stopped because: ", e);
|
||||||
|
onError?.(e);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Leave from the call (Send an rtc session event with content: `{}`)
|
||||||
|
* @param timeout the maximum duration this promise will take to resolve
|
||||||
|
* @returns true if it managed to leave and false if the timeout condition happened.
|
||||||
|
*/
|
||||||
|
public leave(timeout?: number): Promise<boolean> {
|
||||||
|
if (!this.scheduler.running) return Promise.resolve(true);
|
||||||
|
|
||||||
|
// We use the promise to track if we already scheduled a leave event
|
||||||
|
// So we do not check scheduler.actions/scheduler.insertions
|
||||||
|
if (!this.leavePromiseDefer) {
|
||||||
|
// reset scheduled actions so we will not do any new actions.
|
||||||
|
this.leavePromiseDefer = defer<boolean>();
|
||||||
|
this.scheduler.initiateLeave();
|
||||||
|
if (timeout) setTimeout(() => this.leavePromiseDefer?.resolve(false), timeout);
|
||||||
|
}
|
||||||
|
return this.leavePromiseDefer.promise;
|
||||||
|
}
|
||||||
|
private leavePromiseDefer?: IDeferred<boolean>;
|
||||||
|
|
||||||
|
public async onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise<void> {
|
||||||
|
const isMyMembership = (m: CallMembership): boolean =>
|
||||||
|
m.sender === this.client.getUserId() && m.deviceId === this.client.getDeviceId();
|
||||||
|
|
||||||
|
if (this.isJoined() && !memberships.some(isMyMembership)) {
|
||||||
|
// If one of these actions are scheduled or are getting inserted in the next iteration, we should already
|
||||||
|
// take care of our missing membership.
|
||||||
|
const sendingMembershipActions = [
|
||||||
|
MembershipActionType.SendFirstDelayedEvent,
|
||||||
|
MembershipActionType.SendJoinEvent,
|
||||||
|
];
|
||||||
|
logger.warn("Missing own membership: force re-join");
|
||||||
|
if (this.scheduler.actions.find((a) => sendingMembershipActions.includes(a.type as MembershipActionType))) {
|
||||||
|
logger.error(
|
||||||
|
"NewMembershipManger tried adding another `SendFirstDelayedEvent` actions even though we already have one in the Queue\nActionQueueOnMemberUpdate:",
|
||||||
|
this.scheduler.actions,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// Only react to our own membership missing if we have not already scheduled sending a new membership DirectMembershipManagerAction.Join
|
||||||
|
this.state.hasMemberStateEvent = false;
|
||||||
|
this.scheduler.initiateJoin();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
|
|
||||||
|
public getActiveFocus(): Focus | undefined {
|
||||||
|
if (this.focusActive) {
|
||||||
|
// A livekit active focus
|
||||||
|
if (isLivekitFocusActive(this.focusActive)) {
|
||||||
|
if (this.focusActive.focus_selection === "oldest_membership") {
|
||||||
|
const oldestMembership = this.getOldestMembership();
|
||||||
|
return oldestMembership?.getPreferredFoci()[0];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.warn("Unknown own ActiveFocus type. This makes it impossible to connect to an SFU.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// We do not understand the membership format (could be legacy). We default to oldestMembership
|
||||||
|
// Once there are other methods this is a hard error!
|
||||||
|
const oldestMembership = this.getOldestMembership();
|
||||||
|
return oldestMembership?.getPreferredFoci()[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws if the client does not return user or device id.
|
||||||
|
* @param joinConfig
|
||||||
|
* @param room
|
||||||
|
* @param client
|
||||||
|
* @param getOldestMembership
|
||||||
|
*/
|
||||||
|
public constructor(
|
||||||
|
private joinConfig: MembershipConfig | undefined,
|
||||||
|
private room: Pick<Room, "getLiveTimeline" | "roomId" | "getVersion">,
|
||||||
|
private client: Pick<
|
||||||
|
MatrixClient,
|
||||||
|
| "getUserId"
|
||||||
|
| "getDeviceId"
|
||||||
|
| "sendStateEvent"
|
||||||
|
| "_unstable_sendDelayedStateEvent"
|
||||||
|
| "_unstable_updateDelayedEvent"
|
||||||
|
>,
|
||||||
|
private getOldestMembership: () => CallMembership | undefined,
|
||||||
|
) {
|
||||||
|
const [userId, deviceId] = [this.client.getUserId(), this.client.getDeviceId()];
|
||||||
|
if (userId === null) throw Error("Missing userId in client");
|
||||||
|
if (deviceId === null) throw Error("Missing deviceId in client");
|
||||||
|
this.deviceId = deviceId;
|
||||||
|
this.stateKey = this.makeMembershipStateKey(userId, deviceId);
|
||||||
|
this.state = MembershipManager.defaultState;
|
||||||
|
}
|
||||||
|
|
||||||
|
// MembershipManager mutable state.
|
||||||
|
private state: ActionSchedulerState;
|
||||||
|
private static get defaultState(): ActionSchedulerState {
|
||||||
|
return {
|
||||||
|
hasMemberStateEvent: false,
|
||||||
|
delayId: undefined,
|
||||||
|
|
||||||
|
startTime: 0,
|
||||||
|
rateLimitRetries: new Map(),
|
||||||
|
networkErrorRetries: new Map(),
|
||||||
|
expireUpdateIterations: 1,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
// Membership Event static parameters:
|
||||||
|
private deviceId: string;
|
||||||
|
private stateKey: string;
|
||||||
|
private fociPreferred?: Focus[];
|
||||||
|
private focusActive?: Focus;
|
||||||
|
|
||||||
|
// Config:
|
||||||
|
private membershipServerSideExpiryTimeoutOverride?: number;
|
||||||
|
|
||||||
|
private get callMemberEventRetryDelayMinimum(): number {
|
||||||
|
return this.joinConfig?.callMemberEventRetryDelayMinimum ?? 3_000;
|
||||||
|
}
|
||||||
|
private get membershipEventExpiryTimeout(): number {
|
||||||
|
return this.joinConfig?.membershipExpiryTimeout ?? DEFAULT_EXPIRE_DURATION;
|
||||||
|
}
|
||||||
|
private get membershipEventExpiryTimeoutHeadroom(): number {
|
||||||
|
return this.joinConfig?.membershipExpiryTimeoutHeadroom ?? 5_000;
|
||||||
|
}
|
||||||
|
private computeNextExpiryActionTs(iteration: number): number {
|
||||||
|
return (
|
||||||
|
this.state.startTime +
|
||||||
|
this.membershipEventExpiryTimeout * iteration -
|
||||||
|
this.membershipEventExpiryTimeoutHeadroom
|
||||||
|
);
|
||||||
|
}
|
||||||
|
private get membershipServerSideExpiryTimeout(): number {
|
||||||
|
return (
|
||||||
|
this.membershipServerSideExpiryTimeoutOverride ??
|
||||||
|
this.joinConfig?.membershipServerSideExpiryTimeout ??
|
||||||
|
8_000
|
||||||
|
);
|
||||||
|
}
|
||||||
|
private get membershipKeepAlivePeriod(): number {
|
||||||
|
return this.joinConfig?.membershipKeepAlivePeriod ?? 5_000;
|
||||||
|
}
|
||||||
|
private get maximumRateLimitRetryCount(): number {
|
||||||
|
return this.joinConfig?.maximumRateLimitRetryCount ?? 10;
|
||||||
|
}
|
||||||
|
private get maximumNetworkErrorRetryCount(): number {
|
||||||
|
return this.joinConfig?.maximumNetworkErrorRetryCount ?? 10;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scheduler:
|
||||||
|
private oldStatus?: Status;
|
||||||
|
private scheduler = new ActionScheduler((type): Promise<ActionUpdate> => {
|
||||||
|
if (this.oldStatus) {
|
||||||
|
// we put this at the beginning of the actions scheduler loop handle callback since it is a loop this
|
||||||
|
// is equivalent to running it at the end of the loop. (just after applying the status/action list changes)
|
||||||
|
logger.debug(`MembershipManager applied action changes. Status: ${this.oldStatus} -> ${this.status}`);
|
||||||
|
}
|
||||||
|
this.oldStatus = this.status;
|
||||||
|
logger.debug(`MembershipManager before processing action. status=${this.oldStatus}`);
|
||||||
|
return this.membershipLoopHandler(type);
|
||||||
|
});
|
||||||
|
|
||||||
|
// LOOP HANDLER:
|
||||||
|
private async membershipLoopHandler(type: MembershipActionType): Promise<ActionUpdate> {
|
||||||
|
this.oldStatus = this.status;
|
||||||
|
switch (type) {
|
||||||
|
case MembershipActionType.SendFirstDelayedEvent: {
|
||||||
|
// Before we start we check if we come from a state where we have a delay id.
|
||||||
|
if (!this.state.delayId) {
|
||||||
|
return this.sendFirstDelayedLeaveEvent(); // Normal case without any previous delayed id.
|
||||||
|
} else {
|
||||||
|
// This can happen if someone else (or another client) removes our own membership event.
|
||||||
|
// It will trigger `onRTCSessionMemberUpdate` queue `MembershipActionType.SendFirstDelayedEvent`.
|
||||||
|
// We might still have our delayed event from the previous participation and dependent on the server this might not
|
||||||
|
// get automatically removed if the state changes. Hence It would remove our membership unexpectedly shortly after the rejoin.
|
||||||
|
//
|
||||||
|
// In this block we will try to cancel this delayed event before setting up a new one.
|
||||||
|
|
||||||
|
return this.cancelKnownDelayIdBeforeSendFirstDelayedEvent(this.state.delayId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case MembershipActionType.RestartDelayedEvent: {
|
||||||
|
if (!this.state.delayId) {
|
||||||
|
// Delay id got reset. This action was used to check if the hs canceled the delayed event when the join state got sent.
|
||||||
|
return createInsertActionUpdate(
|
||||||
|
this.state.hasMemberStateEvent
|
||||||
|
? MembershipActionType.SendMainDelayedEvent
|
||||||
|
: MembershipActionType.SendFirstDelayedEvent,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return this.restartDelayedEvent(this.state.delayId);
|
||||||
|
}
|
||||||
|
case MembershipActionType.SendMainDelayedEvent: {
|
||||||
|
return this.sendMainDelayedEvent();
|
||||||
|
}
|
||||||
|
case MembershipActionType.SendScheduledDelayedLeaveEvent: {
|
||||||
|
// We are already good
|
||||||
|
if (!this.state.hasMemberStateEvent) {
|
||||||
|
return { replace: [] };
|
||||||
|
}
|
||||||
|
if (this.state.delayId) {
|
||||||
|
return this.sendScheduledDelayedLeaveEventOrFallbackToSendLeaveEvent(this.state.delayId);
|
||||||
|
} else {
|
||||||
|
return createInsertActionUpdate(MembershipActionType.SendLeaveEvent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case MembershipActionType.SendJoinEvent: {
|
||||||
|
return this.sendJoinEvent();
|
||||||
|
}
|
||||||
|
case MembershipActionType.UpdateExpiry: {
|
||||||
|
return this.updateExpiryOnJoinedEvent();
|
||||||
|
}
|
||||||
|
case MembershipActionType.SendLeaveEvent: {
|
||||||
|
// We are good already
|
||||||
|
if (!this.state.hasMemberStateEvent) {
|
||||||
|
return { replace: [] };
|
||||||
|
}
|
||||||
|
// This is only a fallback in case we do not have working delayed events support.
|
||||||
|
// first we should try to just send the scheduled leave event
|
||||||
|
return this.sendFallbackLeaveEvent();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// HANDLERS (used in the membershipLoopHandler)
|
||||||
|
private async sendFirstDelayedLeaveEvent(): Promise<ActionUpdate> {
|
||||||
|
return await this.client
|
||||||
|
._unstable_sendDelayedStateEvent(
|
||||||
|
this.room.roomId,
|
||||||
|
{
|
||||||
|
delay: this.membershipServerSideExpiryTimeout,
|
||||||
|
},
|
||||||
|
EventType.GroupCallMemberPrefix,
|
||||||
|
{}, // leave event
|
||||||
|
this.stateKey,
|
||||||
|
)
|
||||||
|
.then((response) => {
|
||||||
|
// On success we reset retries and set delayId.
|
||||||
|
this.state.rateLimitRetries.set(MembershipActionType.SendFirstDelayedEvent, 0);
|
||||||
|
this.state.networkErrorRetries.set(MembershipActionType.SendFirstDelayedEvent, 0);
|
||||||
|
this.state.delayId = response.delay_id;
|
||||||
|
return createInsertActionUpdate(MembershipActionType.SendJoinEvent);
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
const repeatActionType = MembershipActionType.SendFirstDelayedEvent;
|
||||||
|
if (this.manageMaxDelayExceededSituation(e)) {
|
||||||
|
return createInsertActionUpdate(repeatActionType);
|
||||||
|
}
|
||||||
|
const update = this.actionUpdateFromErrors(e, repeatActionType, "sendDelayedStateEvent");
|
||||||
|
if (update) return update;
|
||||||
|
|
||||||
|
// log and fall through
|
||||||
|
if (this.isUnsupportedDelayedEndpoint(e)) {
|
||||||
|
logger.info("Not using delayed event because the endpoint is not supported");
|
||||||
|
} else {
|
||||||
|
logger.info("Not using delayed event because: " + e);
|
||||||
|
}
|
||||||
|
// On any other error we fall back to not using delayed events and send the join state event immediately
|
||||||
|
return createInsertActionUpdate(MembershipActionType.SendJoinEvent);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async cancelKnownDelayIdBeforeSendFirstDelayedEvent(delayId: string): Promise<ActionUpdate> {
|
||||||
|
// Remove all running updates and restarts
|
||||||
|
return await this.client
|
||||||
|
._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Cancel)
|
||||||
|
.then(() => {
|
||||||
|
this.state.delayId = undefined;
|
||||||
|
this.resetRateLimitCounter(MembershipActionType.SendFirstDelayedEvent);
|
||||||
|
return createReplaceActionUpdate(MembershipActionType.SendFirstDelayedEvent);
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
const repeatActionType = MembershipActionType.SendFirstDelayedEvent;
|
||||||
|
const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent");
|
||||||
|
if (update) return update;
|
||||||
|
|
||||||
|
if (this.isNotFoundError(e)) {
|
||||||
|
// If we get a M_NOT_FOUND we know that the delayed event got already removed.
|
||||||
|
// This means we are good and can set it to undefined and run this again.
|
||||||
|
this.state.delayId = undefined;
|
||||||
|
return createReplaceActionUpdate(repeatActionType);
|
||||||
|
}
|
||||||
|
if (this.isUnsupportedDelayedEndpoint(e)) {
|
||||||
|
return createReplaceActionUpdate(MembershipActionType.SendJoinEvent);
|
||||||
|
}
|
||||||
|
// We do not just ignore and log this error since we would also need to reset the delayId.
|
||||||
|
|
||||||
|
// This becomes an unrecoverable error case since something is significantly off if we don't hit any of the above cases
|
||||||
|
// when state.delayId !== undefined
|
||||||
|
// We do not just ignore and log this error since we would also need to reset the delayId.
|
||||||
|
// It is cleaner if we, the frontend, rejoins instead of resetting the delayId here and behaving like in the success case.
|
||||||
|
throw Error(
|
||||||
|
"We failed to cancel a delayed event where we already had a delay id with an error we cannot automatically handle",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async restartDelayedEvent(delayId: string): Promise<ActionUpdate> {
|
||||||
|
return await this.client
|
||||||
|
._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Restart)
|
||||||
|
.then(() => {
|
||||||
|
this.resetRateLimitCounter(MembershipActionType.RestartDelayedEvent);
|
||||||
|
return createInsertActionUpdate(
|
||||||
|
MembershipActionType.RestartDelayedEvent,
|
||||||
|
this.membershipKeepAlivePeriod,
|
||||||
|
);
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
const repeatActionType = MembershipActionType.RestartDelayedEvent;
|
||||||
|
if (this.isNotFoundError(e)) {
|
||||||
|
this.state.delayId = undefined;
|
||||||
|
return createInsertActionUpdate(MembershipActionType.SendMainDelayedEvent);
|
||||||
|
}
|
||||||
|
// If the HS does not support delayed events we wont reschedule.
|
||||||
|
if (this.isUnsupportedDelayedEndpoint(e)) return {};
|
||||||
|
|
||||||
|
// TODO this also needs a test: get rate limit while checking id delayed event is scheduled
|
||||||
|
const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent");
|
||||||
|
if (update) return update;
|
||||||
|
|
||||||
|
// In other error cases we have no idea what is happening
|
||||||
|
throw Error("Could not restart delayed event, even though delayed events are supported. " + e);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async sendMainDelayedEvent(): Promise<ActionUpdate> {
|
||||||
|
return await this.client
|
||||||
|
._unstable_sendDelayedStateEvent(
|
||||||
|
this.room.roomId,
|
||||||
|
{
|
||||||
|
delay: this.membershipServerSideExpiryTimeout,
|
||||||
|
},
|
||||||
|
EventType.GroupCallMemberPrefix,
|
||||||
|
{}, // leave event
|
||||||
|
this.stateKey,
|
||||||
|
)
|
||||||
|
.then((response) => {
|
||||||
|
this.state.delayId = response.delay_id;
|
||||||
|
this.resetRateLimitCounter(MembershipActionType.SendMainDelayedEvent);
|
||||||
|
return createInsertActionUpdate(
|
||||||
|
MembershipActionType.RestartDelayedEvent,
|
||||||
|
this.membershipKeepAlivePeriod,
|
||||||
|
);
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
const repeatActionType = MembershipActionType.SendMainDelayedEvent;
|
||||||
|
// Don't do any other delayed event work if its not supported.
|
||||||
|
if (this.isUnsupportedDelayedEndpoint(e)) return {};
|
||||||
|
|
||||||
|
if (this.manageMaxDelayExceededSituation(e)) {
|
||||||
|
return createInsertActionUpdate(repeatActionType);
|
||||||
|
}
|
||||||
|
const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent");
|
||||||
|
if (update) return update;
|
||||||
|
|
||||||
|
throw Error("Could not send delayed event, even though delayed events are supported. " + e);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async sendScheduledDelayedLeaveEventOrFallbackToSendLeaveEvent(delayId: string): Promise<ActionUpdate> {
|
||||||
|
return await this.client
|
||||||
|
._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Send)
|
||||||
|
.then(() => {
|
||||||
|
this.state.hasMemberStateEvent = false;
|
||||||
|
this.resetRateLimitCounter(MembershipActionType.SendScheduledDelayedLeaveEvent);
|
||||||
|
|
||||||
|
return { replace: [] };
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
const repeatActionType = MembershipActionType.SendLeaveEvent;
|
||||||
|
if (this.isUnsupportedDelayedEndpoint(e)) return {};
|
||||||
|
if (this.isNotFoundError(e)) {
|
||||||
|
this.state.delayId = undefined;
|
||||||
|
return createInsertActionUpdate(repeatActionType);
|
||||||
|
}
|
||||||
|
const update = this.actionUpdateFromErrors(e, repeatActionType, "updateDelayedEvent");
|
||||||
|
if (update) return update;
|
||||||
|
|
||||||
|
// On any other error we fall back to SendLeaveEvent (this includes hard errors from rate limiting)
|
||||||
|
logger.warn(
|
||||||
|
"Encountered unexpected error during SendScheduledDelayedLeaveEvent. Falling back to SendLeaveEvent",
|
||||||
|
e,
|
||||||
|
);
|
||||||
|
return createInsertActionUpdate(repeatActionType);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async sendJoinEvent(): Promise<ActionUpdate> {
|
||||||
|
return await this.client
|
||||||
|
.sendStateEvent(
|
||||||
|
this.room.roomId,
|
||||||
|
EventType.GroupCallMemberPrefix,
|
||||||
|
this.makeMyMembership(this.membershipEventExpiryTimeout),
|
||||||
|
this.stateKey,
|
||||||
|
)
|
||||||
|
.then(() => {
|
||||||
|
this.state.startTime = Date.now();
|
||||||
|
// The next update should already use twice the membershipEventExpiryTimeout
|
||||||
|
this.state.expireUpdateIterations = 1;
|
||||||
|
this.state.hasMemberStateEvent = true;
|
||||||
|
this.resetRateLimitCounter(MembershipActionType.SendJoinEvent);
|
||||||
|
return {
|
||||||
|
insert: [
|
||||||
|
{ ts: Date.now(), type: MembershipActionType.RestartDelayedEvent },
|
||||||
|
{
|
||||||
|
ts: this.computeNextExpiryActionTs(this.state.expireUpdateIterations),
|
||||||
|
type: MembershipActionType.UpdateExpiry,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
const update = this.actionUpdateFromErrors(e, MembershipActionType.SendJoinEvent, "sendStateEvent");
|
||||||
|
if (update) return update;
|
||||||
|
throw e;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async updateExpiryOnJoinedEvent(): Promise<ActionUpdate> {
|
||||||
|
const nextExpireUpdateIteration = this.state.expireUpdateIterations + 1;
|
||||||
|
return await this.client
|
||||||
|
.sendStateEvent(
|
||||||
|
this.room.roomId,
|
||||||
|
EventType.GroupCallMemberPrefix,
|
||||||
|
this.makeMyMembership(this.membershipEventExpiryTimeout * nextExpireUpdateIteration),
|
||||||
|
this.stateKey,
|
||||||
|
)
|
||||||
|
.then(() => {
|
||||||
|
// Success, we reset retries and schedule update.
|
||||||
|
this.resetRateLimitCounter(MembershipActionType.UpdateExpiry);
|
||||||
|
this.state.expireUpdateIterations = nextExpireUpdateIteration;
|
||||||
|
return {
|
||||||
|
insert: [
|
||||||
|
{
|
||||||
|
ts: this.computeNextExpiryActionTs(nextExpireUpdateIteration),
|
||||||
|
type: MembershipActionType.UpdateExpiry,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
const update = this.actionUpdateFromErrors(e, MembershipActionType.UpdateExpiry, "sendStateEvent");
|
||||||
|
if (update) return update;
|
||||||
|
|
||||||
|
throw e;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
private async sendFallbackLeaveEvent(): Promise<ActionUpdate> {
|
||||||
|
return await this.client
|
||||||
|
.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, {}, this.stateKey)
|
||||||
|
.then(() => {
|
||||||
|
this.resetRateLimitCounter(MembershipActionType.SendLeaveEvent);
|
||||||
|
this.state.hasMemberStateEvent = false;
|
||||||
|
return { replace: [] };
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
const update = this.actionUpdateFromErrors(e, MembershipActionType.SendLeaveEvent, "sendStateEvent");
|
||||||
|
if (update) return update;
|
||||||
|
throw e;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// HELPERS
|
||||||
|
private makeMembershipStateKey(localUserId: string, localDeviceId: string): string {
|
||||||
|
const stateKey = `${localUserId}_${localDeviceId}`;
|
||||||
|
if (/^org\.matrix\.msc(3757|3779)\b/.exec(this.room.getVersion())) {
|
||||||
|
return stateKey;
|
||||||
|
} else {
|
||||||
|
return `_${stateKey}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs our own membership
|
||||||
|
*/
|
||||||
|
private makeMyMembership(expires: number): SessionMembershipData {
|
||||||
|
return {
|
||||||
|
call_id: "",
|
||||||
|
scope: "m.room",
|
||||||
|
application: "m.call",
|
||||||
|
device_id: this.deviceId,
|
||||||
|
expires,
|
||||||
|
focus_active: { type: "livekit", focus_selection: "oldest_membership" },
|
||||||
|
foci_preferred: this.fociPreferred ?? [],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error checks and handlers
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if its a NOT_FOUND error
|
||||||
|
* @param error the error causing this handler check/execution
|
||||||
|
* @returns true if its a not found error
|
||||||
|
*/
|
||||||
|
private isNotFoundError(error: unknown): boolean {
|
||||||
|
return error instanceof MatrixError && error.errcode === "M_NOT_FOUND";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if this is a DelayExceeded timeout and update the TimeoutOverride for the next try
|
||||||
|
* @param error the error causing this handler check/execution
|
||||||
|
* @returns true if its a delay exceeded error and we updated the local TimeoutOverride
|
||||||
|
*/
|
||||||
|
private manageMaxDelayExceededSituation(error: unknown): boolean {
|
||||||
|
if (
|
||||||
|
error instanceof MatrixError &&
|
||||||
|
error.errcode === "M_UNKNOWN" &&
|
||||||
|
error.data["org.matrix.msc4140.errcode"] === "M_MAX_DELAY_EXCEEDED"
|
||||||
|
) {
|
||||||
|
const maxDelayAllowed = error.data["org.matrix.msc4140.max_delay"];
|
||||||
|
if (typeof maxDelayAllowed === "number" && this.membershipServerSideExpiryTimeout > maxDelayAllowed) {
|
||||||
|
this.membershipServerSideExpiryTimeoutOverride = maxDelayAllowed;
|
||||||
|
}
|
||||||
|
logger.warn("Retry sending delayed disconnection event due to server timeout limitations:", error);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private actionUpdateFromErrors(
|
||||||
|
error: unknown,
|
||||||
|
type: MembershipActionType,
|
||||||
|
method: string,
|
||||||
|
): ActionUpdate | undefined {
|
||||||
|
const updateLimit = this.actionUpdateFromRateLimitError(error, method, type);
|
||||||
|
if (updateLimit) return updateLimit;
|
||||||
|
const updateNetwork = this.actionUpdateFromNetworkErrorRetry(error, type);
|
||||||
|
if (updateNetwork) return updateNetwork;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Check if we have a rate limit error and schedule the same action again if we dont exceed the rate limit retry count yet.
|
||||||
|
* @param error the error causing this handler check/execution
|
||||||
|
* @param method the method used for the throw message
|
||||||
|
* @param type which MembershipActionType we reschedule because of a rate limit.
|
||||||
|
* @throws If it is a rate limit error and the retry count got exceeded
|
||||||
|
* @returns Returns true if we handled the error by rescheduling the correct next action.
|
||||||
|
* Returns false if it is not a network error.
|
||||||
|
*/
|
||||||
|
private actionUpdateFromRateLimitError(
|
||||||
|
error: unknown,
|
||||||
|
method: string,
|
||||||
|
type: MembershipActionType,
|
||||||
|
): ActionUpdate | undefined {
|
||||||
|
// "Is rate limit"-boundary
|
||||||
|
if (!((error instanceof HTTPError || error instanceof MatrixError) && error.isRateLimitError())) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
// retry boundary
|
||||||
|
const rateLimitRetries = this.state.rateLimitRetries.get(type) ?? 0;
|
||||||
|
if (rateLimitRetries < this.maximumRateLimitRetryCount) {
|
||||||
|
let resendDelay: number;
|
||||||
|
const defaultMs = 5000;
|
||||||
|
try {
|
||||||
|
resendDelay = error.getRetryAfterMs() ?? defaultMs;
|
||||||
|
logger.info(`Rate limited by server, retrying in ${resendDelay}ms`);
|
||||||
|
} catch (e) {
|
||||||
|
logger.warn(
|
||||||
|
`Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`,
|
||||||
|
e,
|
||||||
|
);
|
||||||
|
resendDelay = defaultMs;
|
||||||
|
}
|
||||||
|
this.state.rateLimitRetries.set(type, rateLimitRetries + 1);
|
||||||
|
return createInsertActionUpdate(type, resendDelay);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw Error("Exceeded maximum retries for " + type + " attempts (client." + method + "): " + (error as Error));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FIXME Don't Check the error and retry the same MembershipAction again in the configured time and for the configured retry count.
|
||||||
|
* @param error the error causing this handler check/execution
|
||||||
|
* @param type the action type that we need to repeat because of the error
|
||||||
|
* @throws If it is a network error and the retry count got exceeded
|
||||||
|
* @returns
|
||||||
|
* Returns true if we handled the error by rescheduling the correct next action.
|
||||||
|
* Returns false if it is not a network error.
|
||||||
|
*/
|
||||||
|
private actionUpdateFromNetworkErrorRetry(error: unknown, type: MembershipActionType): ActionUpdate | undefined {
|
||||||
|
// "Is a network error"-boundary
|
||||||
|
const retries = this.state.networkErrorRetries.get(type) ?? 0;
|
||||||
|
const retryDurationString = this.callMemberEventRetryDelayMinimum / 1000 + "s";
|
||||||
|
const retryCounterString = "(" + retries + "/" + this.maximumNetworkErrorRetryCount + ")";
|
||||||
|
if (error instanceof Error && error.name === "AbortError") {
|
||||||
|
logger.warn(
|
||||||
|
"Network local timeout error while sending event, retrying in " +
|
||||||
|
retryDurationString +
|
||||||
|
" " +
|
||||||
|
retryCounterString,
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
} else if (error instanceof Error && error.message.includes("updating delayed event")) {
|
||||||
|
// TODO: We do not want error message matching here but instead the error should be a typed HTTPError
|
||||||
|
// and be handled below automatically (the same as in the SPA case).
|
||||||
|
//
|
||||||
|
// The error originates because of https://github.com/matrix-org/matrix-widget-api/blob/5d81d4a26ff69e4bd3ddc79a884c9527999fb2f4/src/ClientWidgetApi.ts#L698-L701
|
||||||
|
// uses `e` instance of HttpError (and not MatrixError)
|
||||||
|
// The element web widget driver (only checks for MatrixError) is then failing to process (`processError`) it as a typed error: https://github.com/element-hq/element-web/blob/471712cbf06a067e5499bd5d2d7a75f693d9a12d/src/stores/widgets/StopGapWidgetDriver.ts#L711-L715
|
||||||
|
// So it will not call: `error.asWidgetApiErrorData()` which is also missing for `HttpError`
|
||||||
|
//
|
||||||
|
// A proper fix would be to either find a place to convert the `HttpError` into a `MatrixError` and the `processError`
|
||||||
|
// method to handle it as expected or to adjust `processError` to also process `HttpError`'s.
|
||||||
|
logger.warn(
|
||||||
|
"delayed event update timeout error, retrying in " + retryDurationString + " " + retryCounterString,
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
} else if (error instanceof ConnectionError) {
|
||||||
|
logger.warn(
|
||||||
|
"Network connection error while sending event, retrying in " +
|
||||||
|
retryDurationString +
|
||||||
|
" " +
|
||||||
|
retryCounterString,
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
} else if (
|
||||||
|
(error instanceof HTTPError || error instanceof MatrixError) &&
|
||||||
|
typeof error.httpStatus === "number" &&
|
||||||
|
error.httpStatus >= 500 &&
|
||||||
|
error.httpStatus < 600
|
||||||
|
) {
|
||||||
|
logger.warn(
|
||||||
|
"Server error while sending event, retrying in " + retryDurationString + " " + retryCounterString,
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
// retry boundary
|
||||||
|
if (retries < this.maximumNetworkErrorRetryCount) {
|
||||||
|
this.state.networkErrorRetries.set(type, retries + 1);
|
||||||
|
return createInsertActionUpdate(type, this.callMemberEventRetryDelayMinimum);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Failure
|
||||||
|
throw Error(
|
||||||
|
"Reached maximum (" + this.maximumNetworkErrorRetryCount + ") retries cause by: " + (error as Error),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if its an UnsupportedDelayedEventsEndpointError and which implies that we cannot do any delayed event logic
|
||||||
|
* @param error The error to check
|
||||||
|
* @returns true it its an UnsupportedDelayedEventsEndpointError
|
||||||
|
*/
|
||||||
|
private isUnsupportedDelayedEndpoint(error: unknown): boolean {
|
||||||
|
return error instanceof UnsupportedDelayedEventsEndpointError;
|
||||||
|
}
|
||||||
|
|
||||||
|
private resetRateLimitCounter(type: MembershipActionType): void {
|
||||||
|
this.state.rateLimitRetries.set(type, 0);
|
||||||
|
this.state.networkErrorRetries.set(type, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public get status(): Status {
|
||||||
|
const actions = this.scheduler.actions;
|
||||||
|
if (actions.length === 1) {
|
||||||
|
const { type } = actions[0];
|
||||||
|
switch (type) {
|
||||||
|
case MembershipActionType.SendFirstDelayedEvent:
|
||||||
|
case MembershipActionType.SendJoinEvent:
|
||||||
|
case MembershipActionType.SendMainDelayedEvent:
|
||||||
|
return Status.Connecting;
|
||||||
|
case MembershipActionType.UpdateExpiry: // where no delayed events
|
||||||
|
return Status.Connected;
|
||||||
|
case MembershipActionType.SendScheduledDelayedLeaveEvent:
|
||||||
|
case MembershipActionType.SendLeaveEvent:
|
||||||
|
return Status.Disconnecting;
|
||||||
|
default:
|
||||||
|
// pass through as not expected
|
||||||
|
}
|
||||||
|
} else if (actions.length === 2) {
|
||||||
|
const types = actions.map((a) => a.type);
|
||||||
|
// normal state for connected with delayed events
|
||||||
|
if (
|
||||||
|
(types.includes(MembershipActionType.RestartDelayedEvent) ||
|
||||||
|
types.includes(MembershipActionType.SendMainDelayedEvent)) &&
|
||||||
|
types.includes(MembershipActionType.UpdateExpiry)
|
||||||
|
) {
|
||||||
|
return Status.Connected;
|
||||||
|
}
|
||||||
|
} else if (actions.length === 3) {
|
||||||
|
const types = actions.map((a) => a.type);
|
||||||
|
// It is a correct connected state if we already schedule the next Restart but have not yet cleaned up
|
||||||
|
// the current restart.
|
||||||
|
if (
|
||||||
|
types.filter((t) => t === MembershipActionType.RestartDelayedEvent).length === 2 &&
|
||||||
|
types.includes(MembershipActionType.UpdateExpiry)
|
||||||
|
) {
|
||||||
|
return Status.Connected;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.scheduler.running) {
|
||||||
|
return Status.Disconnected;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.error("MembershipManager has an unknown state. Actions: ", actions);
|
||||||
|
return Status.Unknown;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function createInsertActionUpdate(type: MembershipActionType, offset?: number): ActionUpdate {
|
||||||
|
return {
|
||||||
|
insert: [{ ts: Date.now() + (offset ?? 0), type }],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function createReplaceActionUpdate(type: MembershipActionType, offset?: number): ActionUpdate {
|
||||||
|
return {
|
||||||
|
replace: [{ ts: Date.now() + (offset ?? 0), type }],
|
||||||
|
};
|
||||||
|
}
|
131
src/matrixrtc/NewMembershipManagerActionScheduler.ts
Normal file
131
src/matrixrtc/NewMembershipManagerActionScheduler.ts
Normal file
@@ -0,0 +1,131 @@
|
|||||||
|
import { logger as rootLogger } from "../logger.ts";
|
||||||
|
import { type EmptyObject } from "../matrix.ts";
|
||||||
|
import { sleep } from "../utils.ts";
|
||||||
|
import { MembershipActionType } from "./NewMembershipManager.ts";
|
||||||
|
|
||||||
|
const logger = rootLogger.getChild("MatrixRTCSession");
|
||||||
|
|
||||||
|
/** @internal */
|
||||||
|
export interface Action {
|
||||||
|
/**
|
||||||
|
* When this action should be executed
|
||||||
|
*/
|
||||||
|
ts: number;
|
||||||
|
/**
|
||||||
|
* The state of the different loops
|
||||||
|
* can also be thought of as the type of the action
|
||||||
|
*/
|
||||||
|
type: MembershipActionType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @internal */
|
||||||
|
export type ActionUpdate =
|
||||||
|
| {
|
||||||
|
/** Replace all existing scheduled actions with this new array */
|
||||||
|
replace: Action[];
|
||||||
|
}
|
||||||
|
| {
|
||||||
|
/** Add these actions to the existing scheduled actions */
|
||||||
|
insert: Action[];
|
||||||
|
}
|
||||||
|
| EmptyObject;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This scheduler tracks the state of the current membership participation
|
||||||
|
* and runs one central timer that wakes up a handler callback with the correct action + state
|
||||||
|
* whenever necessary.
|
||||||
|
*
|
||||||
|
* It can also be awakened whenever a new action is added which is
|
||||||
|
* earlier then the current "next awake".
|
||||||
|
* @internal
|
||||||
|
*/
|
||||||
|
export class ActionScheduler {
|
||||||
|
public running = false;
|
||||||
|
|
||||||
|
public constructor(
|
||||||
|
/** This is the callback called for each scheduled action (`this.addAction()`) */
|
||||||
|
private membershipLoopHandler: (type: MembershipActionType) => Promise<ActionUpdate>,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
// function for the wakeup mechanism (in case we add an action externally and need to leave the current sleep)
|
||||||
|
private wakeup: (update: ActionUpdate) => void = (update: ActionUpdate): void => {
|
||||||
|
logger.error("Cannot call wakeup before calling `startWithJoin()`");
|
||||||
|
};
|
||||||
|
private _actions: Action[] = [];
|
||||||
|
public get actions(): Action[] {
|
||||||
|
return this._actions;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This starts the main loop of the membership manager that handles event sending, delayed event sending and delayed event restarting.
|
||||||
|
* @param initialActions The initial actions the manager will start with. It should be enough to pass: DelayedLeaveActionType.Initial
|
||||||
|
* @returns Promise that resolves once all actions have run and no more are scheduled.
|
||||||
|
* @throws This throws an error if one of the actions throws.
|
||||||
|
* In most other error cases the manager will try to handle any server errors by itself.
|
||||||
|
*/
|
||||||
|
public async startWithJoin(): Promise<void> {
|
||||||
|
if (this.running) {
|
||||||
|
logger.error("Cannot call startWithJoin() on NewMembershipActionScheduler while already running");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.running = true;
|
||||||
|
this._actions = [{ ts: Date.now(), type: MembershipActionType.SendFirstDelayedEvent }];
|
||||||
|
try {
|
||||||
|
while (this._actions.length > 0) {
|
||||||
|
// Sort so next (smallest ts) action is at the beginning
|
||||||
|
this._actions.sort((a, b) => a.ts - b.ts);
|
||||||
|
const nextAction = this._actions[0];
|
||||||
|
let wakeupUpdate: ActionUpdate | undefined = undefined;
|
||||||
|
|
||||||
|
// while we await for the next action, wakeup has to resolve the wakeupPromise
|
||||||
|
const wakeupPromise = new Promise<void>((resolve) => {
|
||||||
|
this.wakeup = (update: ActionUpdate): void => {
|
||||||
|
wakeupUpdate = update;
|
||||||
|
resolve();
|
||||||
|
};
|
||||||
|
});
|
||||||
|
if (nextAction.ts > Date.now()) await Promise.race([wakeupPromise, sleep(nextAction.ts - Date.now())]);
|
||||||
|
|
||||||
|
let handlerResult: ActionUpdate = {};
|
||||||
|
if (!wakeupUpdate) {
|
||||||
|
logger.debug(
|
||||||
|
`Current MembershipManager processing: ${nextAction.type}\nQueue:`,
|
||||||
|
this._actions,
|
||||||
|
`\nDate.now: "${Date.now()}`,
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
// `this.wakeup` can also be called and sets the `wakupUpdate` object while we are in the handler.
|
||||||
|
handlerResult = await this.membershipLoopHandler(nextAction.type as MembershipActionType);
|
||||||
|
} catch (e) {
|
||||||
|
throw Error(`The MembershipManager shut down because of the end condition: ${e}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// remove the processed action only after we are done processing
|
||||||
|
this._actions.splice(0, 1);
|
||||||
|
// The wakeupUpdate always wins since that is a direct external update.
|
||||||
|
const actionUpdate = wakeupUpdate ?? handlerResult;
|
||||||
|
|
||||||
|
if ("replace" in actionUpdate) {
|
||||||
|
this._actions = actionUpdate.replace;
|
||||||
|
} else if ("insert" in actionUpdate) {
|
||||||
|
this._actions.push(...actionUpdate.insert);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
// Set the rtc session "not running" state since we cannot recover from here and the consumer user of the
|
||||||
|
// MatrixRTCSession class needs to manually rejoin.
|
||||||
|
this.running = false;
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
this.running = false;
|
||||||
|
|
||||||
|
logger.debug("Leave MembershipManager ActionScheduler loop (no more actions)");
|
||||||
|
}
|
||||||
|
|
||||||
|
public initiateJoin(): void {
|
||||||
|
this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendFirstDelayedEvent }] });
|
||||||
|
}
|
||||||
|
public initiateLeave(): void {
|
||||||
|
this.wakeup?.({ replace: [{ ts: Date.now(), type: MembershipActionType.SendScheduledDelayedLeaveEvent }] });
|
||||||
|
}
|
||||||
|
}
|
17
src/utils.ts
17
src/utils.ts
@@ -405,6 +405,23 @@ export async function logDuration<T>(logger: BaseLogger, name: string, block: ()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility to log the duration of a synchronous block.
|
||||||
|
*
|
||||||
|
* @param logger - The logger to log to.
|
||||||
|
* @param name - The name of the operation.
|
||||||
|
* @param block - The block to execute.
|
||||||
|
*/
|
||||||
|
export function logDurationSync<T>(logger: BaseLogger, name: string, block: () => T): T {
|
||||||
|
const start = Date.now();
|
||||||
|
try {
|
||||||
|
return block();
|
||||||
|
} finally {
|
||||||
|
const end = Date.now();
|
||||||
|
logger.debug(`[Perf]: ${name} took ${end - start}ms`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Promise/async version of {@link setImmediate}.
|
* Promise/async version of {@link setImmediate}.
|
||||||
*
|
*
|
||||||
|
Reference in New Issue
Block a user