You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-11-23 17:02:25 +03:00
Add probablyLeft event to the MatrixRTCSession (#4962)
* Add probablyLeft emission to the MatrixRTCSession Signed-off-by: Timo K <toger5@hotmail.de> * add docstring Signed-off-by: Timo K <toger5@hotmail.de> * Review: add additional test + fix pending promises issue. Signed-off-by: Timo K <toger5@hotmail.de> * review: `Pick` only a subset of membership manager events Signed-off-by: Timo K <toger5@hotmail.de> * reveiw: update probablyLeft logic to be more straight forward Signed-off-by: Timo K <toger5@hotmail.de> * fix test Signed-off-by: Timo K <toger5@hotmail.de> * make test not wait for 5s Signed-off-by: Timo K <toger5@hotmail.de> * review Signed-off-by: Timo K <toger5@hotmail.de> * fix linter (rebase) Signed-off-by: Timo K <toger5@hotmail.de> * fix import Signed-off-by: Timo K <toger5@hotmail.de> --------- Signed-off-by: Timo K <toger5@hotmail.de>
This commit is contained in:
@@ -16,7 +16,14 @@ limitations under the License.
|
|||||||
|
|
||||||
import { type MockedFunction, type Mock } from "jest-mock";
|
import { type MockedFunction, type Mock } from "jest-mock";
|
||||||
|
|
||||||
import { EventType, HTTPError, MatrixError, UnsupportedDelayedEventsEndpointError, type Room } from "../../../src";
|
import {
|
||||||
|
type EmptyObject,
|
||||||
|
EventType,
|
||||||
|
HTTPError,
|
||||||
|
MatrixError,
|
||||||
|
UnsupportedDelayedEventsEndpointError,
|
||||||
|
type Room,
|
||||||
|
} from "../../../src";
|
||||||
import {
|
import {
|
||||||
MembershipManagerEvent,
|
MembershipManagerEvent,
|
||||||
Status,
|
Status,
|
||||||
@@ -611,6 +618,7 @@ describe("MembershipManager", () => {
|
|||||||
await testExpires(10_000, 1_000);
|
await testExpires(10_000, 1_000);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("status updates", () => {
|
describe("status updates", () => {
|
||||||
it("starts 'Disconnected'", () => {
|
it("starts 'Disconnected'", () => {
|
||||||
const manager = new MembershipManager({}, room, client, () => undefined, callSession);
|
const manager = new MembershipManager({}, room, client, () => undefined, callSession);
|
||||||
@@ -836,6 +844,63 @@ describe("MembershipManager", () => {
|
|||||||
expect(client.sendStateEvent).toHaveBeenCalled();
|
expect(client.sendStateEvent).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
describe("probablyLeft", () => {
|
||||||
|
it("emits probablyLeft when the membership manager could not hear back from the server for the duration of the delayed event", async () => {
|
||||||
|
const manager = new MembershipManager(
|
||||||
|
{ delayedLeaveEventDelayMs: 10000 },
|
||||||
|
room,
|
||||||
|
client,
|
||||||
|
() => undefined,
|
||||||
|
callSession,
|
||||||
|
);
|
||||||
|
const { promise: stuckPromise, reject: rejectStuckPromise } = Promise.withResolvers<EmptyObject>();
|
||||||
|
const probablyLeftEmit = jest.fn();
|
||||||
|
manager.on(MembershipManagerEvent.ProbablyLeft, probablyLeftEmit);
|
||||||
|
manager.join([focus], focusActive);
|
||||||
|
try {
|
||||||
|
// Let the scheduler run one iteration so that we can send the join state event
|
||||||
|
await waitForMockCall(client._unstable_updateDelayedEvent);
|
||||||
|
|
||||||
|
// We never resolve the delayed event so that we can test the probablyLeft event.
|
||||||
|
// This simulates the case where the server does not respond to the delayed event.
|
||||||
|
client._unstable_updateDelayedEvent = jest.fn(() => stuckPromise);
|
||||||
|
expect(client.sendStateEvent).toHaveBeenCalledTimes(1);
|
||||||
|
expect(manager.status).toBe(Status.Connected);
|
||||||
|
expect(probablyLeftEmit).not.toHaveBeenCalledWith(true);
|
||||||
|
// We expect the probablyLeft event to be emitted after the `delayedLeaveEventDelayMs` = 10000.
|
||||||
|
// We also track the calls to updated the delayed event that all will never resolve to simulate the server not responding.
|
||||||
|
// The numbers are a bit arbitrary since we use the local timeout that does not perfectly match the 5s check interval in this test.
|
||||||
|
await jest.advanceTimersByTimeAsync(5000);
|
||||||
|
// No emission after 5s
|
||||||
|
expect(probablyLeftEmit).not.toHaveBeenCalledWith(true);
|
||||||
|
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
await jest.advanceTimersByTimeAsync(4999);
|
||||||
|
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(3);
|
||||||
|
expect(probablyLeftEmit).not.toHaveBeenCalledWith(true);
|
||||||
|
|
||||||
|
// Reset mocks before we setup the next delayed event restart by advancing the timers 1 more ms.
|
||||||
|
(client._unstable_updateDelayedEvent as Mock<any>).mockResolvedValue({});
|
||||||
|
|
||||||
|
// Emit after 10s
|
||||||
|
await jest.advanceTimersByTimeAsync(1);
|
||||||
|
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(4);
|
||||||
|
expect(probablyLeftEmit).toHaveBeenCalledWith(true);
|
||||||
|
|
||||||
|
// Mock a sync which does not include our own membership
|
||||||
|
await manager.onRTCSessionMemberUpdate([]);
|
||||||
|
// Wait for the current ongoing delayed event sending to finish
|
||||||
|
await jest.advanceTimersByTimeAsync(1);
|
||||||
|
// We should send a new state event and an associated delayed leave event.
|
||||||
|
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(2);
|
||||||
|
expect(client.sendStateEvent).toHaveBeenCalledTimes(2);
|
||||||
|
// At the same time we expect the probablyLeft event to be emitted with false so we are back operational.
|
||||||
|
expect(probablyLeftEmit).toHaveBeenCalledWith(false);
|
||||||
|
} finally {
|
||||||
|
rejectStuckPromise();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Should prefix log with MembershipManager used", () => {
|
it("Should prefix log with MembershipManager used", () => {
|
||||||
|
|||||||
@@ -17,13 +17,21 @@ limitations under the License.
|
|||||||
import type { CallMembership } from "./CallMembership.ts";
|
import type { CallMembership } from "./CallMembership.ts";
|
||||||
import type { Focus } from "./focus.ts";
|
import type { Focus } from "./focus.ts";
|
||||||
import type { Status } from "./types.ts";
|
import type { Status } from "./types.ts";
|
||||||
|
import { type TypedEventEmitter } from "../models/typed-event-emitter.ts";
|
||||||
|
|
||||||
export enum MembershipManagerEvent {
|
export enum MembershipManagerEvent {
|
||||||
StatusChanged = "StatusChanged",
|
StatusChanged = "StatusChanged",
|
||||||
|
/**
|
||||||
|
* Emitted when the membership manager has not heard back from the server for the duration
|
||||||
|
* of the delayed event and hence failed to restart the delayed event.
|
||||||
|
* This means that the user is probably not joined anymore and the leave event was distributed to other session members.
|
||||||
|
*/
|
||||||
|
ProbablyLeft = "ProbablyLeft",
|
||||||
}
|
}
|
||||||
|
|
||||||
export type MembershipManagerEventHandlerMap = {
|
export type MembershipManagerEventHandlerMap = {
|
||||||
[MembershipManagerEvent.StatusChanged]: (prefStatus: Status, newStatus: Status) => void;
|
[MembershipManagerEvent.StatusChanged]: (prefStatus: Status, newStatus: Status) => void;
|
||||||
|
[MembershipManagerEvent.ProbablyLeft]: (probablyLeft: boolean) => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -33,7 +41,8 @@ export type MembershipManagerEventHandlerMap = {
|
|||||||
*
|
*
|
||||||
* @internal
|
* @internal
|
||||||
*/
|
*/
|
||||||
export interface IMembershipManager {
|
export interface IMembershipManager
|
||||||
|
extends TypedEventEmitter<MembershipManagerEvent, MembershipManagerEventHandlerMap> {
|
||||||
/**
|
/**
|
||||||
* If we are trying to join, or have successfully joined the session.
|
* If we are trying to join, or have successfully joined the session.
|
||||||
* It does not reflect if the room state is already configured to represent us being joined.
|
* It does not reflect if the room state is already configured to represent us being joined.
|
||||||
@@ -85,8 +94,4 @@ export interface IMembershipManager {
|
|||||||
* @returns the used active focus in the currently joined session or undefined if not joined.
|
* @returns the used active focus in the currently joined session or undefined if not joined.
|
||||||
*/
|
*/
|
||||||
getActiveFocus(): Focus | undefined;
|
getActiveFocus(): Focus | undefined;
|
||||||
|
|
||||||
// TypedEventEmitter methods:
|
|
||||||
on(event: MembershipManagerEvent.StatusChanged, listener: (oldStatus: Status, newStatus: Status) => void): this;
|
|
||||||
off(event: MembershipManagerEvent.StatusChanged, listener: (oldStatus: Status, newStatus: Status) => void): this;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,7 +29,11 @@ import { EncryptionManager, type IEncryptionManager } from "./EncryptionManager.
|
|||||||
import { deepCompare, logDurationSync } from "../utils.ts";
|
import { deepCompare, logDurationSync } from "../utils.ts";
|
||||||
import { type Statistics, type RTCNotificationType } from "./types.ts";
|
import { type Statistics, type RTCNotificationType } from "./types.ts";
|
||||||
import { RoomKeyTransport } from "./RoomKeyTransport.ts";
|
import { RoomKeyTransport } from "./RoomKeyTransport.ts";
|
||||||
import type { IMembershipManager } from "./IMembershipManager.ts";
|
import {
|
||||||
|
MembershipManagerEvent,
|
||||||
|
type MembershipManagerEventHandlerMap,
|
||||||
|
type IMembershipManager,
|
||||||
|
} from "./IMembershipManager.ts";
|
||||||
import { RTCEncryptionManager } from "./RTCEncryptionManager.ts";
|
import { RTCEncryptionManager } from "./RTCEncryptionManager.ts";
|
||||||
import {
|
import {
|
||||||
RoomAndToDeviceEvents,
|
RoomAndToDeviceEvents,
|
||||||
@@ -220,8 +224,10 @@ export type JoinSessionConfig = SessionConfig & MembershipConfig & EncryptionCon
|
|||||||
* This class doesn't deal with media at all, just membership & properties of a session.
|
* This class doesn't deal with media at all, just membership & properties of a session.
|
||||||
*/
|
*/
|
||||||
export class MatrixRTCSession extends TypedEventEmitter<
|
export class MatrixRTCSession extends TypedEventEmitter<
|
||||||
MatrixRTCSessionEvent | RoomAndToDeviceEvents,
|
MatrixRTCSessionEvent | RoomAndToDeviceEvents | MembershipManagerEvent.ProbablyLeft,
|
||||||
MatrixRTCSessionEventHandlerMap & RoomAndToDeviceEventsHandlerMap
|
MatrixRTCSessionEventHandlerMap &
|
||||||
|
RoomAndToDeviceEventsHandlerMap &
|
||||||
|
Pick<MembershipManagerEventHandlerMap, MembershipManagerEvent.ProbablyLeft>
|
||||||
> {
|
> {
|
||||||
private membershipManager?: IMembershipManager;
|
private membershipManager?: IMembershipManager;
|
||||||
private encryptionManager?: IEncryptionManager;
|
private encryptionManager?: IEncryptionManager;
|
||||||
@@ -456,8 +462,8 @@ export class MatrixRTCSession extends TypedEventEmitter<
|
|||||||
roomState?.off(RoomStateEvent.Members, this.onRoomMemberUpdate);
|
roomState?.off(RoomStateEvent.Members, this.onRoomMemberUpdate);
|
||||||
}
|
}
|
||||||
private reEmitter = new TypedReEmitter<
|
private reEmitter = new TypedReEmitter<
|
||||||
MatrixRTCSessionEvent | RoomAndToDeviceEvents,
|
MatrixRTCSessionEvent | RoomAndToDeviceEvents | MembershipManagerEvent,
|
||||||
MatrixRTCSessionEventHandlerMap & RoomAndToDeviceEventsHandlerMap
|
MatrixRTCSessionEventHandlerMap & RoomAndToDeviceEventsHandlerMap & MembershipManagerEventHandlerMap
|
||||||
>(this);
|
>(this);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -490,6 +496,7 @@ export class MatrixRTCSession extends TypedEventEmitter<
|
|||||||
this.logger,
|
this.logger,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
this.reEmitter.reEmit(this.membershipManager!, [MembershipManagerEvent.ProbablyLeft]);
|
||||||
// Create Encryption manager
|
// Create Encryption manager
|
||||||
let transport;
|
let transport;
|
||||||
if (joinConfig?.useExperimentalToDeviceTransport) {
|
if (joinConfig?.useExperimentalToDeviceTransport) {
|
||||||
|
|||||||
@@ -129,6 +129,12 @@ export interface MembershipManagerState {
|
|||||||
rateLimitRetries: Map<MembershipActionType, number>;
|
rateLimitRetries: Map<MembershipActionType, number>;
|
||||||
/** Retry counter for other errors */
|
/** Retry counter for other errors */
|
||||||
networkErrorRetries: Map<MembershipActionType, number>;
|
networkErrorRetries: Map<MembershipActionType, number>;
|
||||||
|
/** The time at which we expect the server to send the delayed leave event. */
|
||||||
|
expectedServerDelayLeaveTs?: number;
|
||||||
|
/** This is used to track if the client expects the scheduled delayed leave event to have
|
||||||
|
* been sent because restarting failed during the available time.
|
||||||
|
* Once we resend the delayed event or successfully restarted it will get unset. */
|
||||||
|
probablyLeft: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -343,6 +349,7 @@ export class MembershipManager
|
|||||||
rateLimitRetries: new Map(),
|
rateLimitRetries: new Map(),
|
||||||
networkErrorRetries: new Map(),
|
networkErrorRetries: new Map(),
|
||||||
expireUpdateIterations: 1,
|
expireUpdateIterations: 1,
|
||||||
|
probablyLeft: false,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
// Membership Event static parameters:
|
// Membership Event static parameters:
|
||||||
@@ -466,6 +473,8 @@ export class MembershipManager
|
|||||||
this.stateKey,
|
this.stateKey,
|
||||||
)
|
)
|
||||||
.then((response) => {
|
.then((response) => {
|
||||||
|
this.state.expectedServerDelayLeaveTs = Date.now() + this.delayedLeaveEventDelayMs;
|
||||||
|
this.setAndEmitProbablyLeft(false);
|
||||||
// On success we reset retries and set delayId.
|
// On success we reset retries and set delayId.
|
||||||
this.resetRateLimitCounter(MembershipActionType.SendDelayedEvent);
|
this.resetRateLimitCounter(MembershipActionType.SendDelayedEvent);
|
||||||
this.state.delayId = response.delay_id;
|
this.state.delayId = response.delay_id;
|
||||||
@@ -545,27 +554,58 @@ export class MembershipManager
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private setAndEmitProbablyLeft(probablyLeft: boolean): void {
|
||||||
|
if (this.state.probablyLeft === probablyLeft) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.state.probablyLeft = probablyLeft;
|
||||||
|
this.emit(MembershipManagerEvent.ProbablyLeft, this.state.probablyLeft);
|
||||||
|
}
|
||||||
|
|
||||||
private async restartDelayedEvent(delayId: string): Promise<ActionUpdate> {
|
private async restartDelayedEvent(delayId: string): Promise<ActionUpdate> {
|
||||||
|
// Compute the duration until we expect the server to send the delayed leave event.
|
||||||
|
const durationUntilServerDelayedLeave = this.state.expectedServerDelayLeaveTs
|
||||||
|
? this.state.expectedServerDelayLeaveTs - Date.now()
|
||||||
|
: undefined;
|
||||||
const abortPromise = new Promise((_, reject) => {
|
const abortPromise = new Promise((_, reject) => {
|
||||||
setTimeout(() => {
|
setTimeout(
|
||||||
|
() => {
|
||||||
reject(new AbortError("Restart delayed event timed out before the HS responded"));
|
reject(new AbortError("Restart delayed event timed out before the HS responded"));
|
||||||
}, this.delayedLeaveEventRestartLocalTimeoutMs);
|
},
|
||||||
|
// We abort immediately at the time where we expect the server to send the delayed leave event.
|
||||||
|
// At this point we want the catch block to run and set the `probablyLeft` state.
|
||||||
|
//
|
||||||
|
// While we are already in probablyLeft state, we use the unaltered delayedLeaveEventRestartLocalTimeoutMs.
|
||||||
|
durationUntilServerDelayedLeave !== undefined && !this.state.probablyLeft
|
||||||
|
? Math.min(this.delayedLeaveEventRestartLocalTimeoutMs, durationUntilServerDelayedLeave)
|
||||||
|
: this.delayedLeaveEventRestartLocalTimeoutMs,
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
// The obvious choice here would be to use the `IRequestOpts` to set the timeout. Since this call might be forwarded
|
// The obvious choice here would be to use the `IRequestOpts` to set the timeout. Since this call might be forwarded
|
||||||
// to the widget driver this information would ge lost. That is why we mimic the AbortError using the race.
|
// to the widget driver this information would get lost. That is why we mimic the AbortError using the race.
|
||||||
return await Promise.race([
|
return await Promise.race([
|
||||||
this.client._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Restart),
|
this.client._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Restart),
|
||||||
abortPromise,
|
abortPromise,
|
||||||
])
|
])
|
||||||
.then(() => {
|
.then(() => {
|
||||||
|
// Whenever we successfully restart the delayed event we update the `state.expectedServerDelayLeaveTs`
|
||||||
|
// which stores the predicted timestamp at which the server will send the delayed leave event if there wont be any further
|
||||||
|
// successful restart requests.
|
||||||
|
this.state.expectedServerDelayLeaveTs = Date.now() + this.delayedLeaveEventDelayMs;
|
||||||
this.resetRateLimitCounter(MembershipActionType.RestartDelayedEvent);
|
this.resetRateLimitCounter(MembershipActionType.RestartDelayedEvent);
|
||||||
|
this.setAndEmitProbablyLeft(false);
|
||||||
return createInsertActionUpdate(
|
return createInsertActionUpdate(
|
||||||
MembershipActionType.RestartDelayedEvent,
|
MembershipActionType.RestartDelayedEvent,
|
||||||
this.delayedLeaveEventRestartMs,
|
this.delayedLeaveEventRestartMs,
|
||||||
);
|
);
|
||||||
})
|
})
|
||||||
.catch((e) => {
|
.catch((e) => {
|
||||||
|
if (this.state.expectedServerDelayLeaveTs && this.state.expectedServerDelayLeaveTs <= Date.now()) {
|
||||||
|
// Once we reach this point it's likely that the server is sending the delayed leave event so we emit `probablyLeft = true`.
|
||||||
|
// It will emit `probablyLeft = false` once we notice about our leave through sync and successfully setup a new state event.
|
||||||
|
this.setAndEmitProbablyLeft(true);
|
||||||
|
}
|
||||||
const repeatActionType = MembershipActionType.RestartDelayedEvent;
|
const repeatActionType = MembershipActionType.RestartDelayedEvent;
|
||||||
if (this.isNotFoundError(e)) {
|
if (this.isNotFoundError(e)) {
|
||||||
this.state.delayId = undefined;
|
this.state.delayId = undefined;
|
||||||
@@ -620,6 +660,7 @@ export class MembershipManager
|
|||||||
this.stateKey,
|
this.stateKey,
|
||||||
)
|
)
|
||||||
.then(() => {
|
.then(() => {
|
||||||
|
this.setAndEmitProbablyLeft(false);
|
||||||
this.state.startTime = Date.now();
|
this.state.startTime = Date.now();
|
||||||
// The next update should already use twice the membershipEventExpiryTimeout
|
// The next update should already use twice the membershipEventExpiryTimeout
|
||||||
this.state.expireUpdateIterations = 1;
|
this.state.expireUpdateIterations = 1;
|
||||||
|
|||||||
Reference in New Issue
Block a user