From 29879e83842cc6428fd24b95bf3edffb066c116f Mon Sep 17 00:00:00 2001 From: Timo K Date: Tue, 30 Sep 2025 14:14:31 +0200 Subject: [PATCH] incorporate CallMembership changes - rename Focus -> Transport - add RtcMembershipData (next to `sessionMembershipData`) - make `new CallMembership` initializable with both - move oldest member calculation into CallMembership Signed-off-by: Timo K --- spec/unit/matrixrtc/CallMembership.spec.ts | 140 ++++++++- spec/unit/matrixrtc/LivekitFocus.spec.ts | 24 +- spec/unit/matrixrtc/MatrixRTCSession.spec.ts | 10 +- spec/unit/matrixrtc/MembershipManager.spec.ts | 189 ++++++------ src/@types/event.ts | 8 +- src/matrixrtc/CallMembership.ts | 279 +++++++++++++++--- src/matrixrtc/IMembershipManager.ts | 12 +- src/matrixrtc/LivekitFocus.ts | 23 +- src/matrixrtc/MatrixRTCSession.ts | 114 ++++--- src/matrixrtc/MatrixRTCSessionManager.ts | 4 +- src/matrixrtc/MembershipManager.ts | 120 ++++---- src/matrixrtc/focus.ts | 25 -- src/matrixrtc/index.ts | 1 - src/matrixrtc/types.ts | 8 + src/models/room-member.ts | 2 +- 15 files changed, 651 insertions(+), 308 deletions(-) delete mode 100644 src/matrixrtc/focus.ts diff --git a/spec/unit/matrixrtc/CallMembership.spec.ts b/spec/unit/matrixrtc/CallMembership.spec.ts index cfa98ebd2..528f46cac 100644 --- a/spec/unit/matrixrtc/CallMembership.spec.ts +++ b/spec/unit/matrixrtc/CallMembership.spec.ts @@ -19,6 +19,7 @@ import { CallMembership, type SessionMembershipData, DEFAULT_EXPIRE_DURATION, + type RtcMembershipData, } from "../../../src/matrixrtc/CallMembership"; import { membershipTemplate } from "./mocks"; @@ -44,7 +45,7 @@ describe("CallMembership", () => { scope: "m.room", application: "m.call", device_id: "AAAAAAA", - focus_active: { type: "livekit" }, + focus_active: { type: "livekit", focus_selection: "oldest_membership" }, foci_preferred: [{ type: "livekit" }], }; @@ -94,11 +95,138 @@ describe("CallMembership", () => { it("returns preferred foci", () => { const fakeEvent = makeMockEvent(); const mockFocus = { type: "this_is_a_mock_focus" }; - const membership = new CallMembership( - fakeEvent, - Object.assign({}, membershipTemplate, { foci_preferred: [mockFocus] }), - ); - expect(membership.getPreferredFoci()).toEqual([mockFocus]); + const membership = new CallMembership(fakeEvent, { ...membershipTemplate, foci_preferred: [mockFocus] }); + expect(membership.transports).toEqual([mockFocus]); + }); + describe("getTransport", () => { + const mockFocus = { type: "this_is_a_mock_focus" }; + const oldestMembership = new CallMembership(makeMockEvent(), membershipTemplate); + it("gets the correct active transport with oldest_membership", () => { + const membership = new CallMembership(makeMockEvent(), { + ...membershipTemplate, + foci_preferred: [mockFocus], + focus_active: { type: "livekit", focus_selection: "oldest_membership" }, + }); + + // if we are the oldest member we use our focus. + expect(membership.getTransport(membership)).toStrictEqual(mockFocus); + + // If there is an older member we use its focus. + expect(membership.getTransport(oldestMembership)).toBe(membershipTemplate.foci_preferred[0]); + }); + + it("does not provide focus if the selection method is unknown", () => { + const membership = new CallMembership(makeMockEvent(), { + ...membershipTemplate, + foci_preferred: [mockFocus], + focus_active: { type: "livekit", focus_selection: "multi_sfu" }, + }); + + // if we are the oldest member we use our focus. + expect(membership.getTransport(membership)).toStrictEqual(mockFocus); + + // If there is an older member we still use our own focus in multi sfu. + expect(membership.getTransport(oldestMembership)).toBe(mockFocus); + }); + }); + }); + + describe("RtcMembershipData", () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + const membershipTemplate: RtcMembershipData = { + slot_id: "m.call#1", + application: { type: "m.call" }, + member: { user_id: "@alice:example.org", device_id: "AAAAAAA", id: "xyzHASHxyz" }, + rtc_transports: [{ type: "livekit" }], + versions: [], + }; + + it("rejects membership with no slot_id", () => { + expect(() => { + new CallMembership(makeMockEvent(), { ...membershipTemplate, slot_id: undefined }); + }).toThrow(); + }); + + it("rejects membership with no application", () => { + expect(() => { + new CallMembership(makeMockEvent(), { ...membershipTemplate, application: undefined }); + }).toThrow(); + }); + + it("rejects membership with incorrect application", () => { + expect(() => { + new CallMembership(makeMockEvent(), { + ...membershipTemplate, + application: { wrong_type_key: "unknown" }, + }); + }).toThrow(); + }); + + it("rejects membership with no member", () => { + expect(() => { + new CallMembership(makeMockEvent(), { ...membershipTemplate, member: undefined }); + }).toThrow(); + }); + + it("rejects membership with incorrect member", () => { + expect(() => { + new CallMembership(makeMockEvent(), { ...membershipTemplate, member: { i: "test" } }); + }).toThrow(); + expect(() => { + new CallMembership(makeMockEvent(), { + ...membershipTemplate, + member: { id: "test", device_id: "test", user_id_wrong: "test" }, + }); + }).toThrow(); + expect(() => { + new CallMembership(makeMockEvent(), { + ...membershipTemplate, + member: { id: "test", device_id_wrong: "test", user_id_wrong: "test" }, + }); + }).toThrow(); + expect(() => { + new CallMembership(makeMockEvent(), { + ...membershipTemplate, + member: { id: "test", device_id: "test", user_id: "@@test" }, + }); + }).toThrow(); + expect(() => { + new CallMembership(makeMockEvent(), { + ...membershipTemplate, + member: { id: "test", device_id: "test", user_id: "@test:user.id" }, + }); + }).not.toThrow(); + }); + + it("considers memberships unexpired if local age low enough", () => { + // TODO link prev event + }); + + it("considers memberships expired if local age large enough", () => { + // TODO link prev event + }); + + describe("getTransport", () => { + it("gets the correct active transport with oldest_membership", () => { + const oldestMembership = new CallMembership(makeMockEvent(), { + ...membershipTemplate, + rtc_transports: [{ type: "oldest_transport" }], + }); + const membership = new CallMembership(makeMockEvent(), membershipTemplate); + + // if we are the oldest member we use our focus. + expect(membership.getTransport(membership)).toStrictEqual({ type: "livekit" }); + + // If there is an older member we use our own focus focus. (RtcMembershipData always uses multi sfu) + expect(membership.getTransport(oldestMembership)).toStrictEqual({ type: "livekit" }); + }); }); }); diff --git a/spec/unit/matrixrtc/LivekitFocus.spec.ts b/spec/unit/matrixrtc/LivekitFocus.spec.ts index 265351163..7da0aebb5 100644 --- a/spec/unit/matrixrtc/LivekitFocus.spec.ts +++ b/spec/unit/matrixrtc/LivekitFocus.spec.ts @@ -14,26 +14,30 @@ See the License for the specific language governing permissions and limitations under the License. */ -import { isLivekitFocus, isLivekitFocusSelection, isLivekitFocusConfig } from "../../../src/matrixrtc/LivekitFocus"; +import { + isLivekitTransport, + isLivekitFocusSelection, + isLivekitTransportConfig, +} from "../../../src/matrixrtc/LivekitFocus"; describe("LivekitFocus", () => { it("isLivekitFocus", () => { expect( - isLivekitFocus({ + isLivekitTransport({ type: "livekit", livekit_service_url: "http://test.com", livekit_alias: "test", }), ).toBeTruthy(); - expect(isLivekitFocus({ type: "livekit" })).toBeFalsy(); + expect(isLivekitTransport({ type: "livekit" })).toBeFalsy(); expect( - isLivekitFocus({ type: "not-livekit", livekit_service_url: "http://test.com", livekit_alias: "test" }), + isLivekitTransport({ type: "not-livekit", livekit_service_url: "http://test.com", livekit_alias: "test" }), ).toBeFalsy(); expect( - isLivekitFocus({ type: "livekit", other_service_url: "http://test.com", livekit_alias: "test" }), + isLivekitTransport({ type: "livekit", other_service_url: "http://test.com", livekit_alias: "test" }), ).toBeFalsy(); expect( - isLivekitFocus({ type: "livekit", livekit_service_url: "http://test.com", other_alias: "test" }), + isLivekitTransport({ type: "livekit", livekit_service_url: "http://test.com", other_alias: "test" }), ).toBeFalsy(); }); it("isLivekitFocusActive", () => { @@ -48,13 +52,13 @@ describe("LivekitFocus", () => { }); it("isLivekitFocusConfig", () => { expect( - isLivekitFocusConfig({ + isLivekitTransportConfig({ type: "livekit", livekit_service_url: "http://test.com", }), ).toBeTruthy(); - expect(isLivekitFocusConfig({ type: "livekit" })).toBeFalsy(); - expect(isLivekitFocusConfig({ type: "not-livekit", livekit_service_url: "http://test.com" })).toBeFalsy(); - expect(isLivekitFocusConfig({ type: "livekit", other_service_url: "oldest_membership" })).toBeFalsy(); + expect(isLivekitTransportConfig({ type: "livekit" })).toBeFalsy(); + expect(isLivekitTransportConfig({ type: "not-livekit", livekit_service_url: "http://test.com" })).toBeFalsy(); + expect(isLivekitTransportConfig({ type: "livekit", other_service_url: "oldest_membership" })).toBeFalsy(); }); }); diff --git a/spec/unit/matrixrtc/MatrixRTCSession.spec.ts b/spec/unit/matrixrtc/MatrixRTCSession.spec.ts index 71d465483..390a201fe 100644 --- a/spec/unit/matrixrtc/MatrixRTCSession.spec.ts +++ b/spec/unit/matrixrtc/MatrixRTCSession.spec.ts @@ -53,12 +53,12 @@ describe("MatrixRTCSession", () => { sess = MatrixRTCSession.sessionForRoom(client, mockRoom, callSession); expect(sess?.memberships.length).toEqual(1); - expect(sess?.memberships[0].sessionDescription.id).toEqual(""); + expect(sess?.memberships[0].slotDescription.id).toEqual(""); expect(sess?.memberships[0].scope).toEqual("m.room"); expect(sess?.memberships[0].application).toEqual("m.call"); expect(sess?.memberships[0].deviceId).toEqual("AAAAAAA"); expect(sess?.memberships[0].isExpired()).toEqual(false); - expect(sess?.sessionDescription.id).toEqual(""); + expect(sess?.slotDescription.id).toEqual(""); }); it("ignores memberships where application is not m.call", () => { @@ -268,7 +268,9 @@ describe("MatrixRTCSession", () => { type: "livekit", focus_selection: "oldest_membership", }); - expect(sess.resolveActiveFocus()).toBe(firstPreferredFocus); + expect(sess.resolveActiveFocus(sess.memberships.find((m) => m.deviceId === "old"))).toBe( + firstPreferredFocus, + ); jest.useRealTimers(); }); it("does not provide focus if the selection method is unknown", () => { @@ -288,7 +290,7 @@ describe("MatrixRTCSession", () => { type: "livekit", focus_selection: "unknown", }); - expect(sess.resolveActiveFocus()).toBe(undefined); + expect(sess.resolveActiveFocus(sess.memberships.find((m) => m.deviceId === "old"))).toBe(undefined); }); }); diff --git a/spec/unit/matrixrtc/MembershipManager.spec.ts b/spec/unit/matrixrtc/MembershipManager.spec.ts index c22ab1839..01eb5856b 100644 --- a/spec/unit/matrixrtc/MembershipManager.spec.ts +++ b/spec/unit/matrixrtc/MembershipManager.spec.ts @@ -27,12 +27,11 @@ import { import { MembershipManagerEvent, Status, - type Focus, - type LivekitFocusActive, + type Transport, type SessionMembershipData, + type LivekitFocusSelection, } from "../../../src/matrixrtc"; import { makeMockClient, makeMockRoom, membershipTemplate, mockCallMembership, type MockClient } from "./mocks"; -import { logger } from "../../../src/logger.ts"; import { MembershipManager } from "../../../src/matrixrtc/MembershipManager.ts"; /** @@ -76,11 +75,11 @@ const callSession = { id: "", application: "m.call" }; describe("MembershipManager", () => { let client: MockClient; let room: Room; - const focusActive: LivekitFocusActive = { + const focusActive: LivekitFocusSelection = { focus_selection: "oldest_membership", type: "livekit", }; - const focus: Focus = { + const focus: Transport = { type: "livekit", livekit_service_url: "https://active.url", livekit_alias: "!active:active.url", @@ -104,12 +103,12 @@ describe("MembershipManager", () => { describe("isActivated()", () => { it("defaults to false", () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); expect(manager.isActivated()).toEqual(false); }); it("returns true after join()", () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([]); expect(manager.isActivated()).toEqual(true); }); @@ -123,8 +122,8 @@ describe("MembershipManager", () => { const updateDelayedEventHandle = createAsyncHandle(client._unstable_updateDelayedEvent as Mock); // Test - const memberManager = new MembershipManager(undefined, room, client, () => undefined, callSession); - memberManager.join([focus], focusActive); + const memberManager = new MembershipManager(undefined, room, client, callSession); + memberManager.join([focus], undefined); // expects await waitForMockCall(client.sendStateEvent, Promise.resolve({ event_id: "id" })); expect(client.sendStateEvent).toHaveBeenCalledWith( @@ -152,8 +151,45 @@ describe("MembershipManager", () => { expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1); }); + it("sends a rtc membership event when using `useRtcMemberFormat`", async () => { + // Spys/Mocks + + const updateDelayedEventHandle = createAsyncHandle(client._unstable_updateDelayedEvent as Mock); + + // Test + const memberManager = new MembershipManager({ useRtcMemberFormat: true }, room, client, callSession); + memberManager.join([], focus); + // expects + await waitForMockCall(client.sendStateEvent, Promise.resolve({ event_id: "id" })); + expect(client.sendStateEvent).toHaveBeenCalledWith( + room.roomId, + "org.matrix.msc3401.call.member", + { + application: { type: "m.call", id: "" }, + member: { + user_id: "@alice:example.org", + id: "_@alice:example.org_AAAAAAA_m.call", + device_id: "AAAAAAA", + }, + slot_id: "m.call#", + rtc_transports: [focus], + versions: [], + }, + "_@alice:example.org_AAAAAAA_m.call", + ); + updateDelayedEventHandle.resolve?.(); + expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledWith( + room.roomId, + { delay: 8000 }, + "org.matrix.msc3401.call.member", + {}, + "_@alice:example.org_AAAAAAA_m.call", + ); + expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1); + }); + it("reschedules delayed leave event if sending state cancels it", async () => { - const memberManager = new MembershipManager(undefined, room, client, () => undefined, callSession); + const memberManager = new MembershipManager(undefined, room, client, callSession); const waitForSendState = waitForMockCall(client.sendStateEvent); const waitForUpdateDelaye = waitForMockCallOnce( client._unstable_updateDelayedEvent, @@ -228,10 +264,9 @@ describe("MembershipManager", () => { }, room, client, - () => undefined, callSession, ); - manager.join([focus], focusActive); + manager.join([focus]); await sendDelayedStateExceedAttempt.then(); // needed to resolve after the send attempt catches await sendDelayedStateAttempt; @@ -286,8 +321,8 @@ describe("MembershipManager", () => { describe("delayed leave event", () => { it("does not try again to schedule a delayed leave event if not supported", () => { const delayedHandle = createAsyncHandle(client._unstable_sendDelayedStateEvent as Mock); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); - manager.join([focus], focusActive); + const manager = new MembershipManager({}, room, client, callSession); + manager.join([focus]); delayedHandle.reject?.( new UnsupportedDelayedEventsEndpointError( "Server does not support the delayed events API", @@ -298,21 +333,15 @@ describe("MembershipManager", () => { }); it("does try to schedule a delayed leave event again if rate limited", async () => { const delayedHandle = createAsyncHandle(client._unstable_sendDelayedStateEvent as Mock); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); - manager.join([focus], focusActive); + const manager = new MembershipManager({}, room, client, callSession); + manager.join([focus]); delayedHandle.reject?.(new HTTPError("rate limited", 429, undefined)); await jest.advanceTimersByTimeAsync(5000); expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(2); }); it("uses delayedLeaveEventDelayMs from config", () => { - const manager = new MembershipManager( - { delayedLeaveEventDelayMs: 123456 }, - room, - client, - () => undefined, - callSession, - ); - manager.join([focus], focusActive); + const manager = new MembershipManager({ delayedLeaveEventDelayMs: 123456 }, room, client, callSession); + manager.join([focus]); expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledWith( room.roomId, { delay: 123456 }, @@ -329,11 +358,11 @@ describe("MembershipManager", () => { { delayedLeaveEventRestartMs: RESTART_DELAY }, room, client, - () => undefined, + callSession, ); // Join with the membership manager - manager.join([focus], focusActive); + manager.join([focus]); expect(manager.status).toBe(Status.Connecting); // Let the scheduler run one iteration so that we can send the join state event await jest.runOnlyPendingTimersAsync(); @@ -367,11 +396,11 @@ describe("MembershipManager", () => { { membershipEventExpiryMs: 1234567 }, room, client, - () => undefined, + callSession, ); - manager.join([focus], focusActive); + manager.join([focus]); await waitForMockCall(client.sendStateEvent); expect(client.sendStateEvent).toHaveBeenCalledWith( room.roomId, @@ -393,11 +422,11 @@ describe("MembershipManager", () => { }); it("does nothing if join called when already joined", async () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); - manager.join([focus], focusActive); + const manager = new MembershipManager({}, room, client, callSession); + manager.join([focus]); await waitForMockCall(client.sendStateEvent); expect(client.sendStateEvent).toHaveBeenCalledTimes(1); - manager.join([focus], focusActive); + manager.join([focus]); expect(client.sendStateEvent).toHaveBeenCalledTimes(1); }); }); @@ -405,16 +434,16 @@ describe("MembershipManager", () => { describe("leave()", () => { // TODO add rate limit cases. it("resolves delayed leave event when leave is called", async () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); - manager.join([focus], focusActive); + const manager = new MembershipManager({}, room, client, callSession); + manager.join([focus]); await jest.advanceTimersByTimeAsync(1); await manager.leave(); expect(client._unstable_updateDelayedEvent).toHaveBeenLastCalledWith("id", "send"); expect(client.sendStateEvent).toHaveBeenCalled(); }); it("send leave event when leave is called and resolving delayed leave fails", async () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); - manager.join([focus], focusActive); + const manager = new MembershipManager({}, room, client, callSession); + manager.join([focus]); await jest.advanceTimersByTimeAsync(1); (client._unstable_updateDelayedEvent as Mock).mockRejectedValue("unknown"); await manager.leave(); @@ -428,60 +457,16 @@ describe("MembershipManager", () => { ); }); it("does nothing if not joined", () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); expect(async () => await manager.leave()).not.toThrow(); expect(client._unstable_sendDelayedStateEvent).not.toHaveBeenCalled(); expect(client.sendStateEvent).not.toHaveBeenCalled(); }); }); - describe("getsActiveFocus", () => { - it("gets the correct active focus with oldest_membership", () => { - const getOldestMembership = jest.fn(); - const manager = new MembershipManager({}, room, client, getOldestMembership, callSession); - // Before joining the active focus should be undefined (see FocusInUse on MatrixRTCSession) - expect(manager.getActiveFocus()).toBe(undefined); - manager.join([focus], focusActive); - // After joining we want our own focus to be the one we select. - getOldestMembership.mockReturnValue( - mockCallMembership( - { - ...membershipTemplate, - foci_preferred: [ - { - livekit_alias: "!active:active.url", - livekit_service_url: "https://active.url", - type: "livekit", - }, - ], - user_id: client.getUserId()!, - device_id: client.getDeviceId()!, - created_ts: 1000, - }, - room.roomId, - ), - ); - expect(manager.getActiveFocus()).toStrictEqual(focus); - getOldestMembership.mockReturnValue( - mockCallMembership( - Object.assign({}, membershipTemplate, { device_id: "old", created_ts: 1000 }), - room.roomId, - ), - ); - // If there is an older member we use its focus. - expect(manager.getActiveFocus()).toBe(membershipTemplate.foci_preferred[0]); - }); - - it("does not provide focus if the selection method is unknown", () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); - manager.join([focus], Object.assign(focusActive, { type: "unknown_type" })); - expect(manager.getActiveFocus()).toBe(undefined); - }); - }); - describe("onRTCSessionMemberUpdate()", () => { it("does nothing if not joined", async () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); await manager.onRTCSessionMemberUpdate([mockCallMembership(membershipTemplate, room.roomId)]); await jest.advanceTimersToNextTimerAsync(); expect(client.sendStateEvent).not.toHaveBeenCalled(); @@ -489,7 +474,7 @@ describe("MembershipManager", () => { expect(client._unstable_updateDelayedEvent).not.toHaveBeenCalled(); }); it("does nothing if own membership still present", async () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([focus], focusActive); await jest.advanceTimersByTimeAsync(1); const myMembership = (client.sendStateEvent as Mock).mock.calls[0][2]; @@ -513,7 +498,7 @@ describe("MembershipManager", () => { expect(client._unstable_updateDelayedEvent).not.toHaveBeenCalled(); }); it("recreates membership if it is missing", async () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([focus], focusActive); await jest.advanceTimersByTimeAsync(1); // clearing all mocks before checking what happens when calling: `onRTCSessionMemberUpdate` @@ -531,7 +516,7 @@ describe("MembershipManager", () => { }); it("updates the UpdateExpiry entry in the action scheduler", async () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([focus], focusActive); await jest.advanceTimersByTimeAsync(1); // clearing all mocks before checking what happens when calling: `onRTCSessionMemberUpdate` @@ -564,7 +549,6 @@ describe("MembershipManager", () => { { delayedLeaveEventRestartMs: 10_000, delayedLeaveEventDelayMs: 30_000 }, room, client, - () => undefined, { id: "", application: "m.call" }, ); manager.join([focus], focusActive); @@ -596,7 +580,7 @@ describe("MembershipManager", () => { { membershipEventExpiryMs: expire, membershipEventExpiryHeadroomMs: headroom }, room, client, - () => undefined, + { id: "", application: "m.call" }, ); manager.join([focus], focusActive); @@ -621,14 +605,14 @@ describe("MembershipManager", () => { describe("status updates", () => { it("starts 'Disconnected'", () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); expect(manager.status).toBe(Status.Disconnected); }); it("emits 'Connection' and 'Connected' after join", async () => { const handleDelayedEvent = createAsyncHandle(client._unstable_sendDelayedStateEvent); const handleStateEvent = createAsyncHandle(client.sendStateEvent); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); expect(manager.status).toBe(Status.Disconnected); const connectEmit = jest.fn(); manager.on(MembershipManagerEvent.StatusChanged, connectEmit); @@ -642,7 +626,7 @@ describe("MembershipManager", () => { expect(connectEmit).toHaveBeenCalledWith(Status.Connecting, Status.Connected); }); it("emits 'Disconnecting' and 'Disconnected' after leave", async () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); const connectEmit = jest.fn(); manager.on(MembershipManagerEvent.StatusChanged, connectEmit); manager.join([focus], focusActive); @@ -658,7 +642,7 @@ describe("MembershipManager", () => { it("sends retry if call membership event is still valid at time of retry", async () => { const handle = createAsyncHandle(client._unstable_sendDelayedStateEvent); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([focus], focusActive); expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1); @@ -685,7 +669,7 @@ describe("MembershipManager", () => { new Headers({ "Retry-After": "1" }), ), ); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); // Should call _unstable_sendDelayedStateEvent but not sendStateEvent because of the // RateLimit error. manager.join([focus], focusActive); @@ -705,7 +689,7 @@ describe("MembershipManager", () => { it("abandons retry loop if leave() was called before sending state event", async () => { const handle = createAsyncHandle(client._unstable_sendDelayedStateEvent); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([focus], focusActive); handle.reject?.( new MatrixError( @@ -740,7 +724,7 @@ describe("MembershipManager", () => { new Headers({ "Retry-After": "1" }), ), ); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([focus], focusActive); // Hit rate limit @@ -773,7 +757,7 @@ describe("MembershipManager", () => { new Headers({ "Retry-After": "2" }), ), ); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([focus], focusActive, delayEventSendError); for (let i = 0; i < 10; i++) { @@ -793,7 +777,7 @@ describe("MembershipManager", () => { new Headers({ "Retry-After": "1" }), ), ); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([focus], focusActive, delayEventRestartError); for (let i = 0; i < 10; i++) { @@ -804,7 +788,7 @@ describe("MembershipManager", () => { it("falls back to using pure state events when some error occurs while sending delayed events", async () => { const unrecoverableError = jest.fn(); (client._unstable_sendDelayedStateEvent as Mock).mockRejectedValue(new HTTPError("unknown", 601)); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([focus], focusActive, unrecoverableError); await waitForMockCall(client.sendStateEvent); expect(unrecoverableError).not.toHaveBeenCalledWith(); @@ -817,7 +801,6 @@ describe("MembershipManager", () => { { networkErrorRetryMs: 1000, maximumNetworkErrorRetryCount: 7 }, room, client, - () => undefined, callSession, ); manager.join([focus], focusActive, unrecoverableError); @@ -836,7 +819,7 @@ describe("MembershipManager", () => { (client._unstable_sendDelayedStateEvent as Mock).mockRejectedValue( new UnsupportedDelayedEventsEndpointError("not supported", "sendDelayedStateEvent"), ); - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([focus], focusActive, unrecoverableError); await jest.advanceTimersByTimeAsync(1); @@ -850,7 +833,7 @@ describe("MembershipManager", () => { { delayedLeaveEventDelayMs: 10000 }, room, client, - () => undefined, + callSession, ); const { promise: stuckPromise, reject: rejectStuckPromise } = Promise.withResolvers(); @@ -904,7 +887,7 @@ describe("MembershipManager", () => { describe("updateCallIntent()", () => { it("should fail if the user has not joined the call", async () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); // After joining we want our own focus to be the one we select. try { await manager.updateCallIntent("video"); @@ -913,7 +896,7 @@ describe("MembershipManager", () => { }); it("can adjust the intent", async () => { - const manager = new MembershipManager({}, room, client, () => undefined, callSession); + const manager = new MembershipManager({}, room, client, callSession); manager.join([]); expect(manager.isActivated()).toEqual(true); const membership = mockCallMembership({ ...membershipTemplate, user_id: client.getUserId()! }, room.roomId); @@ -926,7 +909,7 @@ describe("MembershipManager", () => { }); it("does nothing if the intent doesn't change", async () => { - const manager = new MembershipManager({ callIntent: "video" }, room, client, () => undefined, callSession); + const manager = new MembershipManager({ callIntent: "video" }, room, client, callSession); manager.join([]); expect(manager.isActivated()).toEqual(true); const membership = mockCallMembership( @@ -944,7 +927,7 @@ it("Should prefix log with MembershipManager used", () => { const client = makeMockClient("@alice:example.org", "AAAAAAA"); const room = makeMockRoom([membershipTemplate]); - const membershipManager = new MembershipManager(undefined, room, client, () => undefined, callSession, logger); + const membershipManager = new MembershipManager(undefined, room, client, callSession); const spy = jest.spyOn(console, "error"); // Double join diff --git a/src/@types/event.ts b/src/@types/event.ts index 6e4d0ddff..7ec827772 100644 --- a/src/@types/event.ts +++ b/src/@types/event.ts @@ -58,7 +58,7 @@ import { type ICallNotifyContent, } from "../matrixrtc/types.ts"; import { type M_POLL_END, type M_POLL_START, type PollEndEventContent, type PollStartEventContent } from "./polls.ts"; -import { type SessionMembershipData } from "../matrixrtc/CallMembership.ts"; +import { type RtcMembershipData, type SessionMembershipData } from "../matrixrtc/CallMembership.ts"; import { type LocalNotificationSettings } from "./local_notifications.ts"; import { type IPushRules } from "./PushRules.ts"; import { type SecretInfo, type SecretStorageKeyDescription } from "../secret-storage.ts"; @@ -368,7 +368,11 @@ export interface StateEvents { // MSC3401 [EventType.GroupCallPrefix]: IGroupCallRoomState; - [EventType.GroupCallMemberPrefix]: IGroupCallRoomMemberState | SessionMembershipData | EmptyObject; + [EventType.GroupCallMemberPrefix]: + | IGroupCallRoomMemberState + | SessionMembershipData + | RtcMembershipData + | EmptyObject; // MSC3089 [UNSTABLE_MSC3089_BRANCH.name]: MSC3089EventContent; diff --git a/src/matrixrtc/CallMembership.ts b/src/matrixrtc/CallMembership.ts index cb20c567e..5da96294f 100644 --- a/src/matrixrtc/CallMembership.ts +++ b/src/matrixrtc/CallMembership.ts @@ -14,11 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -import { type MatrixEvent } from "../matrix.ts"; +import { MXID_PATTERN } from "../models/room-member.ts"; import { deepCompare } from "../utils.ts"; -import { type Focus } from "./focus.ts"; -import { type SessionDescription } from "./MatrixRTCSession.ts"; -import { type RTCCallIntent } from "./types.ts"; +import { isLivekitFocusSelection, type LivekitFocusSelection } from "./LivekitFocus.ts"; +import { slotDescriptionToId, slotIdToDescription, type SlotDescription } from "./MatrixRTCSession.ts"; +import { type RTCCallIntent, type Transport } from "./types.ts"; +import { type RelationType } from "src/types.ts"; +import { type MatrixEvent } from "../models/event.ts"; /** * The default duration in milliseconds that a membership is considered valid for. @@ -28,6 +30,91 @@ import { type RTCCallIntent } from "./types.ts"; export const DEFAULT_EXPIRE_DURATION = 1000 * 60 * 60 * 4; type CallScope = "m.room" | "m.user"; +type Member = { user_id: string; device_id: string; id: string }; + +export interface RtcMembershipData { + "slot_id": string; + "member": Member; + "m.relates_to"?: { + event_id: string; + rel_type: RelationType.Reference; + }; + "application": { + type: string; + // other application specific keys + [key: string]: any; + }; + "rtc_transports": Transport[]; + "versions": string[]; + "created_ts"?: number; + "sticky_key"?: string; + /** + * The intent of the call from the perspective of this user. This may be an audio call, video call or + * something else. + */ + "m.call.intent"?: RTCCallIntent; +} + +const checkRtcMembershipData = ( + data: Partial>, + errors: string[], +): data is RtcMembershipData => { + const prefix = "Malformed rtc membership event: "; + + // required fields + if (typeof data.slot_id !== "string") errors.push(prefix + "slot_id must be string"); + if (typeof data.member !== "object" || data.member === null) { + errors.push(prefix + "member must be an object"); + } else { + if (typeof data.member.user_id !== "string") errors.push(prefix + "member.user_id must be string"); + else if (!MXID_PATTERN.test(data.member.user_id)) errors.push(prefix + "member.user_id must be a valid mxid"); + if (typeof data.member.device_id !== "string") errors.push(prefix + "member.device_id must be string"); + if (typeof data.member.id !== "string") errors.push(prefix + "member.id must be string"); + } + if (typeof data.application !== "object" || data.application === null) { + errors.push(prefix + "application must be an object"); + } else { + if (typeof data.application.type !== "string") errors.push(prefix + "application.type must be a string"); + } + if (data.rtc_transports === undefined || !Array.isArray(data.rtc_transports)) { + errors.push(prefix + "rtc_transports must be an array"); + } else { + // validate that each transport has at least a string 'type' + for (const t of data.rtc_transports) { + if (typeof t !== "object" || typeof (t as any).type !== "string") { + errors.push(prefix + "rtc_transports entries must be objects with a string type"); + break; + } + } + } + if (data.versions === undefined || !Array.isArray(data.versions)) { + errors.push(prefix + "versions must be an array"); + } else if (!data.versions.every((v) => typeof v === "string")) { + errors.push(prefix + "versions must be an array of strings"); + } + + // optional fields + if (data.created_ts !== undefined && typeof data.created_ts !== "number") { + errors.push(prefix + "created_ts must be number"); + } + if (data.sticky_key !== undefined && typeof data.sticky_key !== "string") { + errors.push(prefix + "sticky_key must be a string"); + } + if (data["m.call.intent"] !== undefined && typeof data["m.call.intent"] !== "string") { + errors.push(prefix + "m.call.intent must be a string"); + } + if (data["m.relates_to"] !== undefined) { + const rel = data["m.relates_to"] as RtcMembershipData["m.relates_to"]; + if (typeof rel !== "object" || rel === null) { + errors.push(prefix + "m.relates_to must be an object if provided"); + } else { + if (typeof rel.event_id !== "string") errors.push(prefix + "m.relates_to.event_id must be a string"); + if (rel.rel_type !== "m.reference") errors.push(prefix + "m.relates_to.rel_type must be m.reference"); + } + } + + return errors.length === 0; +}; /** * MSC4143 (MatrixRTC) session membership data. @@ -55,13 +142,13 @@ export type SessionMembershipData = { /** * The focus selection system this user/membership is using. */ - "focus_active": Focus; + "focus_active": LivekitFocusSelection; /** - * A list of possible foci this uses knows about. One of them might be used based on the focus_active + * A list of possible foci this user knows about. One of them might be used based on the focus_active * selection system. */ - "foci_preferred"?: Focus[]; + "foci_preferred": Transport[]; /** * Optional field that contains the creation of the session. If it is undefined the creation @@ -76,7 +163,7 @@ export type SessionMembershipData = { /** * If the `application` = `"m.call"` this defines if it is a room or user owned call. - * There can always be one room scroped call but multiple user owned calls (breakout sessions) + * There can always be one room scoped call but multiple user owned calls (breakout sessions) */ "scope"?: CallScope; @@ -103,8 +190,12 @@ const checkSessionsMembershipData = ( if (typeof data.call_id !== "string") errors.push(prefix + "call_id must be string"); if (typeof data.application !== "string") errors.push(prefix + "application must be a string"); if (typeof data.focus_active?.type !== "string") errors.push(prefix + "focus_active.type must be a string"); - if (data.foci_preferred !== undefined && !Array.isArray(data.foci_preferred)) - {errors.push(prefix + "foci_preferred must be an array");} + if (data.focus_active !== undefined && !isLivekitFocusSelection(data.focus_active)) { + errors.push(prefix + "focus_active has an invalid type"); + } + if (data.foci_preferred !== undefined && !Array.isArray(data.foci_preferred)) { + errors.push(prefix + "foci_preferred must be an array"); + } // optional parameters if (data.created_ts !== undefined && typeof data.created_ts !== "number") { errors.push(prefix + "created_ts must be number"); @@ -120,28 +211,43 @@ const checkSessionsMembershipData = ( return errors.length === 0; }; +type MembershipData = { kind: "rtc"; data: RtcMembershipData } | { kind: "session"; data: SessionMembershipData }; export class CallMembership { public static equal(a: CallMembership, b: CallMembership): boolean { + if (a === undefined || b === undefined) return a === b; return deepCompare(a.membershipData, b.membershipData); } - private membershipData: SessionMembershipData; + + private membershipData: MembershipData; public constructor( private parentEvent: MatrixEvent, data: any, ) { const sessionErrors: string[] = []; - if (!checkSessionsMembershipData(data, sessionErrors)) { - throw Error( - `unknown CallMembership data. Does not match MSC4143 call.member (${sessionErrors.join(" & ")}) events this could be a legacy membership event: (${data})`, - ); + const rtcErrors: string[] = []; + if (checkSessionsMembershipData(data, sessionErrors)) { + this.membershipData = { kind: "session", data }; + } else if (checkRtcMembershipData(data, rtcErrors)) { + this.membershipData = { kind: "rtc", data }; } else { - this.membershipData = data; + throw Error( + `unknown CallMembership data.` + + `Does not match MSC4143 call.member (${sessionErrors.join(" & ")})\n` + + `Does not match MSC4143 rtc.member (${rtcErrors.join(" & ")})\n` + + `events this could be a legacy membership event: (${data})`, + ); } } public get sender(): string | undefined { - return this.parentEvent.getSender(); + const { kind, data } = this.membershipData; + switch (kind) { + case "rtc": + return data.member.user_id; + case "session": + return this.parentEvent.getSender(); + } } public get eventId(): string | undefined { @@ -149,77 +255,156 @@ export class CallMembership { } /** - * @deprecated Use sessionDescription.id instead. + * The slot id to find all member building one session `slot_id` (format `{application}#{id}`). + * This is computed in case SessionMembershipData is used. */ - public get callId(): string { - return this.membershipData.call_id; + public get slotId(): string { + const { kind, data } = this.membershipData; + switch (kind) { + case "rtc": + return data.slot_id; + case "session": + return slotDescriptionToId({ application: this.application, id: data.call_id }); + } } public get deviceId(): string { - return this.membershipData.device_id; + const { kind, data } = this.membershipData; + switch (kind) { + case "rtc": + return data.member.device_id; + case "session": + return data.device_id; + } } public get callIntent(): RTCCallIntent | undefined { - return this.membershipData["m.call.intent"]; + return this.membershipData.data["m.call.intent"]; } - public get sessionDescription(): SessionDescription { - return { - application: this.membershipData.application, - id: this.membershipData.call_id, - }; + /** + * Parsed `slot_id` (format `{application}#{id}`) into its components (application and id). + */ + public get slotDescription(): SlotDescription { + return slotIdToDescription(this.slotId); } - public get application(): string | undefined { - return this.membershipData.application; + public get application(): string { + const { kind, data } = this.membershipData; + switch (kind) { + case "rtc": + return data.application.type; + case "session": + return data.application; + } } public get scope(): CallScope | undefined { - return this.membershipData.scope; + const { kind, data } = this.membershipData; + switch (kind) { + case "rtc": + return undefined; + case "session": + return data.scope; + } } public get membershipID(): string { // the createdTs behaves equivalent to the membershipID. - // we only need the field for the legacy member envents where we needed to update them + // we only need the field for the legacy member events where we needed to update them // synapse ignores sending state events if they have the same content. - return this.createdTs().toString(); + const { kind, data } = this.membershipData; + switch (kind) { + case "rtc": + return data.member.id; + case "session": + return (this.createdTs() ?? "").toString(); + } } public createdTs(): number { - return this.membershipData.created_ts ?? this.parentEvent.getTs(); + const { kind, data } = this.membershipData; + switch (kind) { + case "rtc": + // TODO we need to read the referenced (relation) event if available to get the real created_ts + return this.parentEvent.getTs(); + case "session": + return data.created_ts ?? this.parentEvent.getTs(); + } } /** * Gets the absolute expiry timestamp of the membership. * @returns The absolute expiry time of the membership as a unix timestamp in milliseconds or undefined if not applicable */ - public getAbsoluteExpiry(): number { - // TODO: calculate this from the MatrixRTCSession join configuration directly - return this.createdTs() + (this.membershipData.expires ?? DEFAULT_EXPIRE_DURATION); + public getAbsoluteExpiry(): number | undefined { + const { kind, data } = this.membershipData; + switch (kind) { + case "rtc": + return undefined; + case "session": + // TODO: calculate this from the MatrixRTCSession join configuration directly + return this.createdTs() + (data.expires ?? DEFAULT_EXPIRE_DURATION); + } } /** * @returns The number of milliseconds until the membership expires or undefined if applicable */ - public getMsUntilExpiry(): number { - // Assume that local clock is sufficiently in sync with other clocks in the distributed system. - // We used to try and adjust for the local clock being skewed, but there are cases where this is not accurate. - // The current implementation allows for the local clock to be -infinity to +MatrixRTCSession.MEMBERSHIP_EXPIRY_TIME/2 - return this.getAbsoluteExpiry() - Date.now(); + public getMsUntilExpiry(): number | undefined { + const { kind } = this.membershipData; + switch (kind) { + case "rtc": + return undefined; + case "session": + // Assume that local clock is sufficiently in sync with other clocks in the distributed system. + // We used to try and adjust for the local clock being skewed, but there are cases where this is not accurate. + // The current implementation allows for the local clock to be -infinity to +MatrixRTCSession.MEMBERSHIP_EXPIRY_TIME/2 + return this.getAbsoluteExpiry()! - Date.now(); + } } /** * @returns true if the membership has expired, otherwise false */ public isExpired(): boolean { - return this.getMsUntilExpiry() <= 0; + const { kind } = this.membershipData; + switch (kind) { + case "rtc": + return false; + case "session": + return this.getMsUntilExpiry()! <= 0; + } } - public getPreferredFoci(): Focus[] { - return this.membershipData.foci_preferred ?? []; + /** + * + * @param oldestMembership For backwards compatibility with session membership (legacy). + * @returns + */ + public getTransport(oldestMembership: CallMembership): Transport | undefined { + const { kind, data } = this.membershipData; + switch (kind) { + case "rtc": + return data.rtc_transports[0]; + case "session": + switch (data.focus_active.focus_selection) { + case "oldest_membership": + if (CallMembership.equal(this, oldestMembership)) return data.foci_preferred[0]; + if (oldestMembership !== undefined) return oldestMembership.getTransport(oldestMembership); + break; + case "multi_sfu": + return data.foci_preferred[0]; + } + } } - - public getFocusActive(): Focus { - return this.membershipData.focus_active; + public get transports(): Transport[] { + const { kind, data } = this.membershipData; + switch (kind) { + case "rtc": + return data.rtc_transports; + case "session": + return data.foci_preferred; + } } } diff --git a/src/matrixrtc/IMembershipManager.ts b/src/matrixrtc/IMembershipManager.ts index fb0b3d3b5..cf6963fdc 100644 --- a/src/matrixrtc/IMembershipManager.ts +++ b/src/matrixrtc/IMembershipManager.ts @@ -15,8 +15,7 @@ limitations under the License. */ import type { CallMembership } from "./CallMembership.ts"; -import type { Focus } from "./focus.ts"; -import type { RTCCallIntent, Status } from "./types.ts"; +import type { RTCCallIntent, Status, Transport } from "./types.ts"; import { type TypedEventEmitter } from "../models/typed-event-emitter.ts"; export enum MembershipManagerEvent { @@ -80,10 +79,11 @@ export interface IMembershipManager /** * Start sending all necessary events to make this user participate in the RTC session. * @param fociPreferred the list of preferred foci to use in the joined RTC membership event. - * @param fociActive the active focus to use in the joined RTC membership event. + * @param multiSfuFocus the active focus to use in the joined RTC membership event. Setting this implies the + * membership manager will use multi sfu. Use `undefined` to not use `oldest_membership` selection based sfu. * @throws can throw if it exceeds a configured maximum retry. */ - join(fociPreferred: Focus[], fociActive?: Focus, onError?: (error: unknown) => void): void; + join(fociPreferred: Transport[], multiSfuFocus?: Transport, onError?: (error: unknown) => void): void; /** * Send all necessary events to make this user leave the RTC session. * @param timeout the maximum duration in ms until the promise is forced to resolve. @@ -95,10 +95,6 @@ export interface IMembershipManager * Call this if the MatrixRTC session members have changed. */ onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise; - /** - * Determines the active focus used by the given session member, or undefined if not joined. - */ - resolveActiveFocus(member: CallMembership): Focus | undefined; /** * Update the intent of a membership on the call (e.g. user is now providing a video feed) diff --git a/src/matrixrtc/LivekitFocus.ts b/src/matrixrtc/LivekitFocus.ts index a799a0b7b..6c17ffc6a 100644 --- a/src/matrixrtc/LivekitFocus.ts +++ b/src/matrixrtc/LivekitFocus.ts @@ -14,26 +14,33 @@ See the License for the specific language governing permissions and limitations under the License. */ -import { type Focus } from "./focus.ts"; +import { type Transport } from "./types.ts"; -export interface LivekitFocusConfig extends Focus { +export interface LivekitTransportConfig extends Transport { type: "livekit"; livekit_service_url: string; } -export const isLivekitFocusConfig = (object: any): object is LivekitFocusConfig => +export const isLivekitTransportConfig = (object: any): object is LivekitTransportConfig => object.type === "livekit" && "livekit_service_url" in object; -export interface LivekitFocus extends LivekitFocusConfig { +export interface LivekitTransport extends LivekitTransportConfig { livekit_alias: string; } -export const isLivekitFocus = (object: any): object is LivekitFocus => - isLivekitFocusConfig(object) && "livekit_alias" in object; +export const isLivekitTransport = (object: any): object is LivekitTransport => + isLivekitTransportConfig(object) && "livekit_alias" in object; -export interface LivekitFocusSelection extends Focus { +/** + * Deprecated, this is just needed for the old focus active / focus fields of a call membership. + * Not needed for new implementations. + */ +export interface LivekitFocusSelection extends Transport { type: "livekit"; - focus_selection: "oldest_membership"; + focus_selection: "oldest_membership" | "multi_sfu"; } +/** + * deprecated see LivekitFocusSelection + */ export const isLivekitFocusSelection = (object: any): object is LivekitFocusSelection => object.type === "livekit" && "focus_selection" in object; diff --git a/src/matrixrtc/MatrixRTCSession.ts b/src/matrixrtc/MatrixRTCSession.ts index 5ff40819d..4fcc449f5 100644 --- a/src/matrixrtc/MatrixRTCSession.ts +++ b/src/matrixrtc/MatrixRTCSession.ts @@ -24,17 +24,17 @@ import { KnownMembership } from "../@types/membership.ts"; import { type ISendEventResponse } from "../@types/requests.ts"; import { CallMembership } from "./CallMembership.ts"; import { RoomStateEvent } from "../models/room-state.ts"; -import { type Focus } from "./focus.ts"; import { MembershipManager } from "./MembershipManager.ts"; import { EncryptionManager, type IEncryptionManager } from "./EncryptionManager.ts"; import { deepCompare, logDurationSync } from "../utils.ts"; -import { - type Statistics, - type RTCNotificationType, - type Status, - type IRTCNotificationContent, - type ICallNotifyContent, - type RTCCallIntent, +import type { + Statistics, + RTCNotificationType, + Status, + IRTCNotificationContent, + ICallNotifyContent, + RTCCallIntent, + Transport, } from "./types.ts"; import { RoomKeyTransport } from "./RoomKeyTransport.ts"; import { @@ -103,10 +103,17 @@ export interface SessionConfig { /** * The session description is used to identify a session. Used in the state event. */ -export interface SessionDescription { +export interface SlotDescription { id: string; application: string; } +export function slotIdToDescription(slotId: string): SlotDescription { + const [application, id] = slotId.split("#"); + return { application, id }; +} +export function slotDescriptionToId(slotDescription: SlotDescription): string { + return `${slotDescription.application}#${slotDescription.id}`; +} // The names follow these principles: // - we use the technical term delay if the option is related to delayed events. @@ -185,6 +192,7 @@ export interface MembershipConfig { * but only applies to calls to the `_unstable_updateDelayedEvent` endpoint with a body of `{action:"restart"}`.) */ delayedLeaveEventRestartLocalTimeoutMs?: number; + useRtcMemberFormat?: boolean; } export interface EncryptionConfig { @@ -241,7 +249,7 @@ export class MatrixRTCSession extends TypedEventEmitter< private membershipManager?: IMembershipManager; private encryptionManager?: IEncryptionManager; // The session Id of the call, this is the call_id of the call Member event. - private _callId: string | undefined; + private _slotId: string | undefined; private joinConfig?: SessionConfig; private logger: Logger; @@ -279,33 +287,53 @@ export class MatrixRTCSession extends TypedEventEmitter< * * It can be undefined since the callId is only known once the first membership joins. * The callId is the property that, per definition, groups memberships into one call. + * @deprecated use `slotId` instead. */ public get callId(): string | undefined { - return this._callId; + return this.slotDescription?.id; + } + /** + * The slotId of the call. + * `{application}#{appSpecificId}` + * It can be undefined since the slotId is only known once the first membership joins. + * The slotId is the property that, per definition, groups memberships into one call. + */ + public get slotId(): string | undefined { + return this._slotId; } /** * Returns all the call memberships for a room that match the provided `sessionDescription`, * oldest first. * - * @deprecated Use `MatrixRTCSession.sessionMembershipsForRoom` instead. + * @deprecated Use `MatrixRTCSession.sessionMembershipsForSlot` instead. */ public static callMembershipsForRoom( room: Pick, ): CallMembership[] { - return MatrixRTCSession.sessionMembershipsForRoom(room, { + return MatrixRTCSession.sessionMembershipsForSlot(room, { id: "", application: "m.call", }); } /** - * Returns all the call memberships for a room that match the provided `sessionDescription`, - * oldest first. + * @deprecated use `MatrixRTCSession.slotMembershipsForRoom` instead. */ public static sessionMembershipsForRoom( room: Pick, - sessionDescription: SessionDescription, + sessionDescription: SlotDescription, + ): CallMembership[] { + return this.sessionMembershipsForSlot(room, sessionDescription); + } + + /** + * Returns all the call memberships for a room that match the provided `sessionDescription`, + * oldest first. + */ + public static sessionMembershipsForSlot( + room: Pick, + slotDescription: SlotDescription, ): CallMembership[] { const logger = rootLogger.getChild(`[MatrixRTCSession ${room.roomId}]`); const roomState = room.getLiveTimeline().getState(EventTimeline.FORWARDS); @@ -338,9 +366,9 @@ export class MatrixRTCSession extends TypedEventEmitter< try { const membership = new CallMembership(memberEvent, membershipData); - if (!deepCompare(membership.sessionDescription, sessionDescription)) { + if (!deepCompare(membership.slotDescription, slotDescription)) { logger.info( - `Ignoring membership of user ${membership.sender} for a different session: ${JSON.stringify(membership.sessionDescription)}`, + `Ignoring membership of user ${membership.sender} for a different session: ${JSON.stringify(membership.slotDescription)}`, ); continue; } @@ -379,26 +407,29 @@ export class MatrixRTCSession extends TypedEventEmitter< * This method is an alias for `MatrixRTCSession.sessionForRoom` with * sessionDescription `{ id: "", application: "m.call" }`. * - * @deprecated Use `MatrixRTCSession.sessionForRoom` with sessionDescription `{ id: "", application: "m.call" }` instead. + * @deprecated Use `MatrixRTCSession.sessionForSlot` with sessionDescription `{ id: "", application: "m.call" }` instead. */ public static roomSessionForRoom(client: MatrixClient, room: Room): MatrixRTCSession { - const callMemberships = MatrixRTCSession.sessionMembershipsForRoom(room, { id: "", application: "m.call" }); + const callMemberships = MatrixRTCSession.sessionMembershipsForSlot(room, { id: "", application: "m.call" }); return new MatrixRTCSession(client, room, callMemberships, { id: "", application: "m.call" }); } + /** + * @deprecated Use `MatrixRTCSession.sessionForSlot` instead. + */ + public static sessionForRoom(client: MatrixClient, room: Room, slotDescription: SlotDescription): MatrixRTCSession { + return this.sessionForSlot(client, room, slotDescription); + } + /** * Return the MatrixRTC session for the room. * This returned session can be used to find out if there are active sessions - * for the requested room and `sessionDescription`. + * for the requested room and `slotDescription`. */ - public static sessionForRoom( - client: MatrixClient, - room: Room, - sessionDescription: SessionDescription, - ): MatrixRTCSession { - const callMemberships = MatrixRTCSession.sessionMembershipsForRoom(room, sessionDescription); + public static sessionForSlot(client: MatrixClient, room: Room, slotDescription: SlotDescription): MatrixRTCSession { + const callMemberships = MatrixRTCSession.sessionMembershipsForSlot(room, slotDescription); - return new MatrixRTCSession(client, room, callMemberships, sessionDescription); + return new MatrixRTCSession(client, room, callMemberships, slotDescription); } /** @@ -445,13 +476,13 @@ export class MatrixRTCSession extends TypedEventEmitter< public memberships: CallMembership[], /** * The session description is used to define the exact session this object is tracking. - * A session is distinct from another session if one of those properties differ: `roomSubset.roomId`, `sessionDescription.application`, `sessionDescription.id`. + * A session is distinct from another session if one of those properties differ: `roomSubset.roomId`, `slotDescription.application`, `slotDescription.id`. */ - public readonly sessionDescription: SessionDescription, + public readonly slotDescription: SlotDescription, ) { super(); this.logger = rootLogger.getChild(`[MatrixRTCSession ${roomSubset.roomId}]`); - this._callId = memberships[0]?.sessionDescription.id; + this._slotId = memberships[0]?.slotId; const roomState = this.roomSubset.getLiveTimeline().getState(EventTimeline.FORWARDS); // TODO: double check if this is actually needed. Should be covered by refreshRoom in MatrixRTCSessionManager roomState?.on(RoomStateEvent.Members, this.onRoomMemberUpdate); @@ -497,7 +528,7 @@ export class MatrixRTCSession extends TypedEventEmitter< * or optionally other room members homeserver well known. * @param joinConfig - Additional configuration for the joined session. */ - public joinRoomSession(fociPreferred: Focus[], fociActive?: Focus, joinConfig?: JoinSessionConfig): void { + public joinRoomSession(fociPreferred: Transport[], fociActive?: Transport, joinConfig?: JoinSessionConfig): void { if (this.isJoined()) { this.logger.info(`Already joined to session in room ${this.roomSubset.roomId}: ignoring join call`); return; @@ -508,8 +539,7 @@ export class MatrixRTCSession extends TypedEventEmitter< joinConfig, this.roomSubset, this.client, - () => this.getOldestMembership(), - this.sessionDescription, + this.slotDescription, this.logger, ); @@ -608,12 +638,18 @@ export class MatrixRTCSession extends TypedEventEmitter< } /** - * Get the active focus from the current CallMemberState event + * Get the focus in use from a specific specified member. + * @param member The member for which to get the active focus. If undefined, the own membership is used. * @returns The focus that is currently in use to connect to this session. This is undefined * if the client is not connected to this session. + * @deprecated use `member.getTransport(session.getOldestMembership())` instead if you want to get the active transport for a specific member. */ - public resolveActiveFocus(member: CallMembership): Focus | undefined { - return this.membershipManager?.resolveActiveFocus(member); + public resolveActiveFocus(member?: CallMembership): Transport | undefined { + const oldestMembership = this.getOldestMembership(); + if (!oldestMembership) return undefined; + const m = member === undefined ? this.membershipManager?.ownMembership : member; + if (!m) return undefined; + return m.getTransport(oldestMembership); } public getOldestMembership(): CallMembership | undefined { @@ -763,9 +799,9 @@ export class MatrixRTCSession extends TypedEventEmitter< */ private recalculateSessionMembers = (): void => { const oldMemberships = this.memberships; - this.memberships = MatrixRTCSession.sessionMembershipsForRoom(this.room, this.sessionDescription); + this.memberships = MatrixRTCSession.sessionMembershipsForSlot(this.room, this.slotDescription); - this._callId = this._callId ?? this.memberships[0]?.sessionDescription.id; + this._slotId = this._slotId ?? this.memberships[0]?.slotId; const changed = oldMemberships.length != this.memberships.length || diff --git a/src/matrixrtc/MatrixRTCSessionManager.ts b/src/matrixrtc/MatrixRTCSessionManager.ts index cc25105d9..f2f49cc91 100644 --- a/src/matrixrtc/MatrixRTCSessionManager.ts +++ b/src/matrixrtc/MatrixRTCSessionManager.ts @@ -20,7 +20,7 @@ import { TypedEventEmitter } from "../models/typed-event-emitter.ts"; import { type Room } from "../models/room.ts"; import { type RoomState, RoomStateEvent } from "../models/room-state.ts"; import { type MatrixEvent } from "../models/event.ts"; -import { MatrixRTCSession, type SessionDescription } from "./MatrixRTCSession.ts"; +import { MatrixRTCSession, type SlotDescription } from "./MatrixRTCSession.ts"; import { EventType } from "../@types/event.ts"; export enum MatrixRTCSessionManagerEvents { @@ -56,7 +56,7 @@ export class MatrixRTCSessionManager extends TypedEventEmitter void): void { + public join(fociPreferred: Transport[], multiSfuFocus?: Transport, onError?: (error: unknown) => void): void { if (this.scheduler.running) { this.logger.error("MembershipManager is already running. Ignoring join request."); return; } this.fociPreferred = fociPreferred; - this.focusActive = focusActive; + this.rtcTransport = multiSfuFocus; this.leavePromiseResolvers = undefined; this.activated = true; this.oldStatus = this.status; @@ -266,18 +274,6 @@ export class MembershipManager return Promise.resolve(); } - public resolveActiveFocus(member: CallMembership): Focus | undefined { - const data = member.getFocusActive(); - if (isLivekitFocusSelection(data) && data.focus_selection === "oldest_membership") { - const oldestMembership = this.getOldestMembership(); - if (member === oldestMembership) return member.getPreferredFoci()[0]; - if (oldestMembership !== undefined) return this.resolveActiveFocus(oldestMembership); - } else { - // This is a fully resolved focus config - return data; - } - } - public async updateCallIntent(callIntent: RTCCallIntent): Promise { if (!this.activated || !this.ownMembership) { throw Error("You cannot update your intent before joining the call"); @@ -295,7 +291,6 @@ export class MembershipManager * @param joinConfig * @param room * @param client - * @param getOldestMembership */ public constructor( private joinConfig: (SessionConfig & MembershipConfig) | undefined, @@ -308,8 +303,7 @@ export class MembershipManager | "_unstable_sendDelayedStateEvent" | "_unstable_updateDelayedEvent" >, - private getOldestMembership: () => CallMembership | undefined, - public readonly sessionDescription: SessionDescription, + public readonly slotDescription: SlotDescription, parentLogger?: Logger, ) { super(); @@ -318,7 +312,7 @@ export class MembershipManager if (userId === null) throw Error("Missing userId in client"); if (deviceId === null) throw Error("Missing deviceId in client"); this.deviceId = deviceId; - this.stateKey = this.makeMembershipStateKey(userId, deviceId); + this.memberId = this.makeMembershipStateKey(userId, deviceId); this.state = MembershipManager.defaultState; this.callIntent = joinConfig?.callIntent; this.scheduler = new ActionScheduler((type): Promise => { @@ -364,9 +358,10 @@ export class MembershipManager } // Membership Event static parameters: private deviceId: string; - private stateKey: string; - private fociPreferred?: Focus[]; - private focusActive?: Focus; + private memberId: string; + /** @deprecated This will be removed in favor or rtcTransport becoming a list of actively used transports */ + private fociPreferred?: Transport[]; + private rtcTransport?: Transport; // Config: private delayedLeaveEventDelayMsOverride?: number; @@ -399,6 +394,9 @@ export class MembershipManager private get delayedLeaveEventRestartLocalTimeoutMs(): number { return this.joinConfig?.delayedLeaveEventRestartLocalTimeoutMs ?? 2000; } + private get useRtcMemberFormat(): boolean { + return this.joinConfig?.useRtcMemberFormat ?? false; + } // LOOP HANDLER: private async membershipLoopHandler(type: MembershipActionType): Promise { switch (type) { @@ -467,7 +465,7 @@ export class MembershipManager }, EventType.GroupCallMemberPrefix, {}, // leave event - this.stateKey, + this.memberId, ) .then((response) => { this.state.expectedServerDelayLeaveTs = Date.now() + this.delayedLeaveEventDelayMs; @@ -654,7 +652,7 @@ export class MembershipManager this.room.roomId, EventType.GroupCallMemberPrefix, this.makeMyMembership(this.membershipEventExpiryMs), - this.stateKey, + this.memberId, ) .then(() => { this.setAndEmitProbablyLeft(false); @@ -700,7 +698,7 @@ export class MembershipManager this.room.roomId, EventType.GroupCallMemberPrefix, this.makeMyMembership(this.membershipEventExpiryMs * nextExpireUpdateIteration), - this.stateKey, + this.memberId, ) .then(() => { // Success, we reset retries and schedule update. @@ -724,7 +722,7 @@ export class MembershipManager } private async sendFallbackLeaveEvent(): Promise { return await this.client - .sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, {}, this.stateKey) + .sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, {}, this.memberId) .then(() => { this.resetRateLimitCounter(MembershipActionType.SendLeaveEvent); this.state.hasMemberStateEvent = false; @@ -739,7 +737,7 @@ export class MembershipManager // HELPERS private makeMembershipStateKey(localUserId: string, localDeviceId: string): string { - const stateKey = `${localUserId}_${localDeviceId}_${this.sessionDescription.application}${this.sessionDescription.id}`; + const stateKey = `${localUserId}_${localDeviceId}_${this.slotDescription.application}${this.slotDescription.id}`; if (/^org\.matrix\.msc(3757|3779)\b/.exec(this.room.getVersion())) { return stateKey; } else { @@ -750,24 +748,42 @@ export class MembershipManager /** * Constructs our own membership */ - private makeMyMembership(expires: number): SessionMembershipData { - const hasPreviousEvent = !!this.ownMembership; - return { - // TODO: use the new format for m.rtc.member events where call_id becomes session.id - "application": this.sessionDescription.application, - "call_id": this.sessionDescription.id, - "scope": "m.room", - "device_id": this.deviceId, - expires, - "m.call.intent": this.callIntent, - ...(this.focusActive === undefined - ? { - focus_active: { type: "livekit", focus_selection: "oldest_membership" } as const, - foci_preferred: this.fociPreferred ?? [], - } - : { focus_active: this.focusActive }), - ...(hasPreviousEvent ? { created_ts: this.ownMembership?.createdTs() } : undefined), - }; + private makeMyMembership(expires: number): SessionMembershipData | RtcMembershipData { + const ownMembership = this.ownMembership; + if (this.useRtcMemberFormat) { + const relationObject = ownMembership?.eventId + ? { "m.relation": { rel_type: RelationType.Reference, event_id: ownMembership?.eventId } } + : {}; + return { + application: { type: this.slotDescription.application, id: this.slotDescription.id }, + slot_id: slotDescriptionToId(this.slotDescription), + rtc_transports: this.rtcTransport ? [this.rtcTransport] : [], + member: { device_id: this.deviceId, user_id: this.client.getUserId()!, id: this.memberId }, + versions: [], + ...relationObject, + }; + } else { + const focusObjects = + this.rtcTransport === undefined + ? { + focus_active: { type: "livekit", focus_selection: "oldest_membership" } as const, + foci_preferred: this.fociPreferred ?? [], + } + : { + focus_active: { type: "livekit", focus_selection: "multi_sfu" } as const, + foci_preferred: [this.rtcTransport, ...(this.fociPreferred ?? [])], + }; + return { + "application": this.slotDescription.application, + "call_id": this.slotDescription.id, + "scope": "m.room", + "device_id": this.deviceId, + expires, + "m.call.intent": this.callIntent, + ...focusObjects, + ...(ownMembership !== undefined ? { created_ts: ownMembership.createdTs() } : undefined), + }; + } } // Error checks and handlers diff --git a/src/matrixrtc/focus.ts b/src/matrixrtc/focus.ts deleted file mode 100644 index cf9836dd4..000000000 --- a/src/matrixrtc/focus.ts +++ /dev/null @@ -1,25 +0,0 @@ -/* -Copyright 2023 The Matrix.org Foundation C.I.C. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/** - * Information about a MatrixRTC conference focus. The only attribute that - * the js-sdk (currently) knows about is the type: applications can extend - * this class for different types of focus. - */ -export interface Focus { - type: string; - [key: string]: unknown; -} diff --git a/src/matrixrtc/index.ts b/src/matrixrtc/index.ts index 40ab6919f..e383b3f10 100644 --- a/src/matrixrtc/index.ts +++ b/src/matrixrtc/index.ts @@ -15,7 +15,6 @@ limitations under the License. */ export * from "./CallMembership.ts"; -export type * from "./focus.ts"; export * from "./LivekitFocus.ts"; export * from "./MatrixRTCSession.ts"; export * from "./MatrixRTCSessionManager.ts"; diff --git a/src/matrixrtc/types.ts b/src/matrixrtc/types.ts index b344a22d8..08c32a206 100644 --- a/src/matrixrtc/types.ts +++ b/src/matrixrtc/types.ts @@ -156,3 +156,11 @@ export type Statistics = { export const isMyMembership = (m: CallMembership, userId: string, deviceId: string): boolean => m.sender === userId && m.deviceId === deviceId; + +/** + * A RTC transport is a JSON object that describes how to connect to a RTC member. + */ +export interface Transport { + type: string; + [key: string]: unknown; +} diff --git a/src/models/room-member.ts b/src/models/room-member.ts index 6cf702e8a..a2711d38e 100644 --- a/src/models/room-member.ts +++ b/src/models/room-member.ts @@ -388,7 +388,7 @@ export class RoomMember extends TypedEventEmitter