1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-08-07 23:02:56 +03:00

Support MSC4140: Delayed events (#4294)

and use them for more reliable MatrixRTC session membership events.

Also implement "parent" delayed events, which were in a previous version
of the MSC and may be reintroduced or be part of a new MSC later.

NOTE: Still missing is support for sending encrypted delayed events.
This commit is contained in:
Andrew Ferrazzutti
2024-07-30 08:43:25 -04:00
committed by GitHub
parent 0300d6343f
commit 687d08dc9d
6 changed files with 715 additions and 58 deletions

View File

@@ -57,6 +57,7 @@ import {
Room,
RuleId,
TweakName,
UpdateDelayedEventAction,
} from "../../src";
import { supportsMatrixCall } from "../../src/webrtc/call";
import { makeBeaconEvent } from "../test-utils/beacon";
@@ -97,7 +98,7 @@ type HttpLookup = {
method: string;
path: string;
prefix?: string;
data?: Record<string, any>;
data?: Record<string, any> | Record<string, any>[];
error?: object;
expectBody?: Record<string, any>;
expectQueryParams?: QueryDict;
@@ -704,6 +705,328 @@ describe("MatrixClient", function () {
});
});
describe("_unstable_sendDelayedEvent", () => {
const unstableMSC4140Prefix = `${ClientPrefix.Unstable}/org.matrix.msc4140`;
const roomId = "!room:example.org";
const body = "This is the body";
const content = { body, msgtype: MsgType.Text } satisfies RoomMessageEventContent;
const timeoutDelayOpts = { delay: 2000 };
const realTimeoutDelayOpts = { "org.matrix.msc4140.delay": 2000 };
beforeEach(() => {
unstableFeatures["org.matrix.msc4140"] = true;
});
it("throws when unsupported by server", async () => {
unstableFeatures["org.matrix.msc4140"] = false;
const errorMessage = "Server does not support";
await expect(
client._unstable_sendDelayedEvent(
roomId,
timeoutDelayOpts,
null,
EventType.RoomMessage,
{ ...content },
client.makeTxnId(),
),
).rejects.toThrow(errorMessage);
await expect(
client._unstable_sendDelayedStateEvent(roomId, timeoutDelayOpts, EventType.RoomTopic, {
topic: "topic",
}),
).rejects.toThrow(errorMessage);
await expect(client._unstable_getDelayedEvents()).rejects.toThrow(errorMessage);
await expect(
client._unstable_updateDelayedEvent("anyDelayId", UpdateDelayedEventAction.Send),
).rejects.toThrow(errorMessage);
});
it("works with null threadId", async () => {
httpLookups = [];
const timeoutDelayTxnId = client.makeTxnId();
httpLookups.push({
method: "PUT",
path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${timeoutDelayTxnId}`,
expectQueryParams: realTimeoutDelayOpts,
data: { delay_id: "id1" },
expectBody: content,
});
const { delay_id: timeoutDelayId } = await client._unstable_sendDelayedEvent(
roomId,
timeoutDelayOpts,
null,
EventType.RoomMessage,
{ ...content },
timeoutDelayTxnId,
);
const actionDelayTxnId = client.makeTxnId();
httpLookups.push({
method: "PUT",
path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${actionDelayTxnId}`,
expectQueryParams: { "org.matrix.msc4140.parent_delay_id": timeoutDelayId },
data: { delay_id: "id2" },
expectBody: content,
});
await client._unstable_sendDelayedEvent(
roomId,
{ parent_delay_id: timeoutDelayId },
null,
EventType.RoomMessage,
{ ...content },
actionDelayTxnId,
);
});
it("works with non-null threadId", async () => {
httpLookups = [];
const threadId = "$threadId:server";
const expectBody = {
...content,
"m.relates_to": {
event_id: threadId,
is_falling_back: true,
rel_type: "m.thread",
},
};
const timeoutDelayTxnId = client.makeTxnId();
httpLookups.push({
method: "PUT",
path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${timeoutDelayTxnId}`,
expectQueryParams: realTimeoutDelayOpts,
data: { delay_id: "id1" },
expectBody,
});
const { delay_id: timeoutDelayId } = await client._unstable_sendDelayedEvent(
roomId,
timeoutDelayOpts,
threadId,
EventType.RoomMessage,
{ ...content },
timeoutDelayTxnId,
);
const actionDelayTxnId = client.makeTxnId();
httpLookups.push({
method: "PUT",
path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${actionDelayTxnId}`,
expectQueryParams: { "org.matrix.msc4140.parent_delay_id": timeoutDelayId },
data: { delay_id: "id2" },
expectBody,
});
await client._unstable_sendDelayedEvent(
roomId,
{ parent_delay_id: timeoutDelayId },
threadId,
EventType.RoomMessage,
{ ...content },
actionDelayTxnId,
);
});
it("should add thread relation if threadId is passed and the relation is missing", async () => {
httpLookups = [];
const threadId = "$threadId:server";
const expectBody = {
...content,
"m.relates_to": {
"m.in_reply_to": {
event_id: threadId,
},
"event_id": threadId,
"is_falling_back": true,
"rel_type": "m.thread",
},
};
const room = new Room(roomId, client, userId);
mocked(store.getRoom).mockReturnValue(room);
const rootEvent = new MatrixEvent({ event_id: threadId });
room.createThread(threadId, rootEvent, [rootEvent], false);
const timeoutDelayTxnId = client.makeTxnId();
httpLookups.push({
method: "PUT",
path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${timeoutDelayTxnId}`,
expectQueryParams: realTimeoutDelayOpts,
data: { delay_id: "id1" },
expectBody,
});
const { delay_id: timeoutDelayId } = await client._unstable_sendDelayedEvent(
roomId,
timeoutDelayOpts,
threadId,
EventType.RoomMessage,
{ ...content },
timeoutDelayTxnId,
);
const actionDelayTxnId = client.makeTxnId();
httpLookups.push({
method: "PUT",
path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${actionDelayTxnId}`,
expectQueryParams: { "org.matrix.msc4140.parent_delay_id": timeoutDelayId },
data: { delay_id: "id2" },
expectBody,
});
await client._unstable_sendDelayedEvent(
roomId,
{ parent_delay_id: timeoutDelayId },
threadId,
EventType.RoomMessage,
{ ...content },
actionDelayTxnId,
);
});
it("should add thread relation if threadId is passed and the relation is missing with reply", async () => {
httpLookups = [];
const threadId = "$threadId:server";
const content = {
body,
"msgtype": MsgType.Text,
"m.relates_to": {
"m.in_reply_to": {
event_id: "$other:event",
},
},
} satisfies RoomMessageEventContent;
const expectBody = {
...content,
"m.relates_to": {
"m.in_reply_to": {
event_id: "$other:event",
},
"event_id": threadId,
"is_falling_back": false,
"rel_type": "m.thread",
},
};
const room = new Room(roomId, client, userId);
mocked(store.getRoom).mockReturnValue(room);
const rootEvent = new MatrixEvent({ event_id: threadId });
room.createThread(threadId, rootEvent, [rootEvent], false);
const timeoutDelayTxnId = client.makeTxnId();
httpLookups.push({
method: "PUT",
path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${timeoutDelayTxnId}`,
expectQueryParams: realTimeoutDelayOpts,
data: { delay_id: "id1" },
expectBody,
});
const { delay_id: timeoutDelayId } = await client._unstable_sendDelayedEvent(
roomId,
timeoutDelayOpts,
threadId,
EventType.RoomMessage,
{ ...content },
timeoutDelayTxnId,
);
const actionDelayTxnId = client.makeTxnId();
httpLookups.push({
method: "PUT",
path: `/rooms/${encodeURIComponent(roomId)}/send/m.room.message/${actionDelayTxnId}`,
expectQueryParams: { "org.matrix.msc4140.parent_delay_id": timeoutDelayId },
data: { delay_id: "id2" },
expectBody,
});
await client._unstable_sendDelayedEvent(
roomId,
{ parent_delay_id: timeoutDelayId },
threadId,
EventType.RoomMessage,
{ ...content },
actionDelayTxnId,
);
});
it("can send a delayed state event", async () => {
httpLookups = [];
const content = { topic: "The year 2000" };
httpLookups.push({
method: "PUT",
path: `/rooms/${encodeURIComponent(roomId)}/state/m.room.topic/`,
expectQueryParams: realTimeoutDelayOpts,
data: { delay_id: "id1" },
expectBody: content,
});
const { delay_id: timeoutDelayId } = await client._unstable_sendDelayedStateEvent(
roomId,
timeoutDelayOpts,
EventType.RoomTopic,
{ ...content },
);
httpLookups.push({
method: "PUT",
path: `/rooms/${encodeURIComponent(roomId)}/state/m.room.topic/`,
expectQueryParams: { "org.matrix.msc4140.parent_delay_id": timeoutDelayId },
data: { delay_id: "id2" },
expectBody: content,
});
await client._unstable_sendDelayedStateEvent(
roomId,
{ parent_delay_id: timeoutDelayId },
EventType.RoomTopic,
{ ...content },
);
});
it("can look up delayed events", async () => {
httpLookups = [
{
method: "GET",
prefix: unstableMSC4140Prefix,
path: "/delayed_events",
data: [],
},
];
await client._unstable_getDelayedEvents();
});
it("can update delayed events", async () => {
const delayId = "id";
const action = UpdateDelayedEventAction.Restart;
httpLookups = [
{
method: "POST",
prefix: unstableMSC4140Prefix,
path: `/delayed_events/${encodeURIComponent(delayId)}`,
data: {
action,
},
},
];
await client._unstable_updateDelayedEvent(delayId, action);
});
});
it("should create (unstable) file trees", async () => {
const userId = "@test:example.org";
const roomId = "!room:example.org";
@@ -963,7 +1286,7 @@ describe("MatrixClient", function () {
const filter = new Filter(client.credentials.userId);
const filterId = await client.getOrCreateFilter(filterName, filter);
expect(filterId).toEqual(FILTER_RESPONSE.data?.filter_id);
expect(filterId).toEqual(!Array.isArray(FILTER_RESPONSE.data) && FILTER_RESPONSE.data?.filter_id);
});
});

View File

@@ -46,6 +46,9 @@ describe("MatrixRTCSession", () => {
client = new MatrixClient({ baseUrl: "base_url" });
client.getUserId = jest.fn().mockReturnValue("@alice:example.org");
client.getDeviceId = jest.fn().mockReturnValue("AAAAAAA");
client.doesServerSupportUnstableFeature = jest.fn((feature) =>
Promise.resolve(feature === "org.matrix.msc4140"),
);
});
afterEach(() => {
@@ -241,35 +244,61 @@ describe("MatrixRTCSession", () => {
foci_preferred: [mockFocus],
};
function testSession(
let sendStateEventMock: jest.Mock;
let sendDelayedStateMock: jest.Mock;
let sentStateEvent: Promise<void>;
let sentDelayedState: Promise<void>;
beforeEach(() => {
sentStateEvent = new Promise((resolve) => {
sendStateEventMock = jest.fn(resolve);
});
sentDelayedState = new Promise((resolve) => {
sendDelayedStateMock = jest.fn(() => {
resolve();
return {
delay_id: "id",
};
});
});
client.sendStateEvent = sendStateEventMock;
client._unstable_sendDelayedStateEvent = sendDelayedStateMock;
});
async function testSession(
membershipData: CallMembershipData[] | SessionMembershipData,
shouldUseLegacy: boolean,
): void {
): Promise<void> {
sess = MatrixRTCSession.roomSessionForRoom(client, makeMockRoom(membershipData));
const makeNewLegacyMembershipsMock = jest.spyOn(sess as any, "makeNewLegacyMemberships");
const makeNewMembershipMock = jest.spyOn(sess as any, "makeNewMembership");
sess.joinRoomSession([mockFocus], mockFocus, joinSessionConfig);
await Promise.race([sentStateEvent, new Promise((resolve) => setTimeout(resolve, 500))]);
expect(makeNewLegacyMembershipsMock).toHaveBeenCalledTimes(shouldUseLegacy ? 1 : 0);
expect(makeNewMembershipMock).toHaveBeenCalledTimes(shouldUseLegacy ? 0 : 1);
await Promise.race([sentDelayedState, new Promise((resolve) => setTimeout(resolve, 500))]);
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(shouldUseLegacy ? 0 : 1);
}
it("uses legacy events if there are any active legacy calls", () => {
testSession([expiredLegacyMembershipData, legacyMembershipData, sessionMembershipData], true);
it("uses legacy events if there are any active legacy calls", async () => {
await testSession([expiredLegacyMembershipData, legacyMembershipData, sessionMembershipData], true);
});
it('uses legacy events if a non-legacy call is in a "memberships" array', () => {
testSession([sessionMembershipData], true);
it('uses legacy events if a non-legacy call is in a "memberships" array', async () => {
await testSession([sessionMembershipData], true);
});
it("uses non-legacy events if all legacy calls are expired", () => {
testSession([expiredLegacyMembershipData], false);
it("uses non-legacy events if all legacy calls are expired", async () => {
await testSession([expiredLegacyMembershipData], false);
});
it("uses non-legacy events if there are only non-legacy calls", () => {
testSession(sessionMembershipData, false);
it("uses non-legacy events if there are only non-legacy calls", async () => {
await testSession(sessionMembershipData, false);
});
});
@@ -347,12 +376,27 @@ describe("MatrixRTCSession", () => {
describe("joining", () => {
let mockRoom: Room;
let sendStateEventMock: jest.Mock;
let sendDelayedStateMock: jest.Mock;
let sendEventMock: jest.Mock;
let sentStateEvent: Promise<void>;
let sentDelayedState: Promise<void>;
beforeEach(() => {
sendStateEventMock = jest.fn();
sentStateEvent = new Promise((resolve) => {
sendStateEventMock = jest.fn(resolve);
});
sentDelayedState = new Promise((resolve) => {
sendDelayedStateMock = jest.fn(() => {
resolve();
return {
delay_id: "id",
};
});
});
sendEventMock = jest.fn();
client.sendStateEvent = sendStateEventMock;
client._unstable_sendDelayedStateEvent = sendDelayedStateMock;
client.sendEvent = sendEventMock;
mockRoom = makeMockRoom([]);
@@ -373,9 +417,11 @@ describe("MatrixRTCSession", () => {
expect(sess!.isJoined()).toEqual(true);
});
it("sends a membership event when joining a call", () => {
it("sends a membership event when joining a call", async () => {
const realSetTimeout = setTimeout;
jest.useFakeTimers();
sess!.joinRoomSession([mockFocus], mockFocus);
await Promise.race([sentStateEvent, new Promise((resolve) => realSetTimeout(resolve, 500))]);
expect(client.sendStateEvent).toHaveBeenCalledWith(
mockRoom!.roomId,
EventType.GroupCallMemberPrefix,
@@ -396,6 +442,8 @@ describe("MatrixRTCSession", () => {
},
"@alice:example.org",
);
await Promise.race([sentDelayedState, new Promise((resolve) => realSetTimeout(resolve, 500))]);
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(0);
jest.useRealTimers();
});
@@ -403,13 +451,15 @@ describe("MatrixRTCSession", () => {
const activeFocusConfig = { type: "livekit", livekit_service_url: "https://active.url" };
const activeFocus = { type: "livekit", focus_selection: "oldest_membership" };
function testJoin(useOwnedStateEvents: boolean): void {
async function testJoin(useOwnedStateEvents: boolean): Promise<void> {
const realSetTimeout = setTimeout;
if (useOwnedStateEvents) {
mockRoom.getVersion = jest.fn().mockReturnValue("org.matrix.msc3779.default");
}
jest.useFakeTimers();
sess!.joinRoomSession([activeFocusConfig], activeFocus, { useLegacyMemberEvents: false });
await Promise.race([sentStateEvent, new Promise((resolve) => realSetTimeout(resolve, 500))]);
expect(client.sendStateEvent).toHaveBeenCalledWith(
mockRoom!.roomId,
EventType.GroupCallMemberPrefix,
@@ -423,15 +473,17 @@ describe("MatrixRTCSession", () => {
} satisfies SessionMembershipData,
`${!useOwnedStateEvents ? "_" : ""}@alice:example.org_AAAAAAA`,
);
await Promise.race([sentDelayedState, new Promise((resolve) => realSetTimeout(resolve, 500))]);
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
jest.useRealTimers();
}
it("sends a membership event with session payload when joining a non-legacy call", () => {
testJoin(false);
it("sends a membership event with session payload when joining a non-legacy call", async () => {
await testJoin(false);
});
it("does not prefix the state key with _ for rooms that support user-owned state events", () => {
testJoin(true);
it("does not prefix the state key with _ for rooms that support user-owned state events", async () => {
await testJoin(true);
});
});

View File

@@ -76,6 +76,56 @@ export interface ISendEventResponse {
event_id: string;
}
export type TimeoutDelay = {
delay: number;
};
export type ParentDelayId = {
parent_delay_id: string;
};
export type SendTimeoutDelayedEventRequestOpts = TimeoutDelay & Partial<ParentDelayId>;
export type SendActionDelayedEventRequestOpts = ParentDelayId;
export type SendDelayedEventRequestOpts = SendTimeoutDelayedEventRequestOpts | SendActionDelayedEventRequestOpts;
export type SendDelayedEventResponse = {
delay_id: string;
};
export enum UpdateDelayedEventAction {
Cancel = "cancel",
Restart = "restart",
Send = "send",
}
export type UpdateDelayedEventRequestOpts = SendDelayedEventResponse & {
action: UpdateDelayedEventAction;
};
type DelayedPartialTimelineEvent = {
room_id: string;
type: string;
content: IContent;
};
type DelayedPartialStateEvent = DelayedPartialTimelineEvent & {
state_key: string;
transaction_id: string;
};
type DelayedPartialEvent = DelayedPartialTimelineEvent | DelayedPartialStateEvent;
export type DelayedEventInfo = {
delayed_events: DelayedPartialEvent &
SendDelayedEventResponse &
SendDelayedEventRequestOpts &
{
running_since: number;
}[];
next_batch?: string;
};
export interface IPresenceOpts {
// One of "online", "offline" or "unavailable"
presence: "online" | "offline" | "unavailable";

View File

@@ -114,6 +114,7 @@ import { NotificationCountType, Room, RoomEvent, RoomEventHandlerMap, RoomNameSt
import { RoomMemberEvent, RoomMemberEventHandlerMap } from "./models/room-member";
import { IPowerLevelsContent, RoomStateEvent, RoomStateEventHandlerMap } from "./models/room-state";
import {
DelayedEventInfo,
IAddThreePidOnlyBody,
IBindThreePidBody,
IContextResponse,
@@ -134,6 +135,9 @@ import {
IStatusResponse,
ITagsResponse,
KnockRoomOpts,
SendDelayedEventRequestOpts,
SendDelayedEventResponse,
UpdateDelayedEventAction,
} from "./@types/requests";
import {
EventType,
@@ -530,6 +534,8 @@ export const UNSTABLE_MSC2666_SHARED_ROOMS = "uk.half-shot.msc2666";
export const UNSTABLE_MSC2666_MUTUAL_ROOMS = "uk.half-shot.msc2666.mutual_rooms";
export const UNSTABLE_MSC2666_QUERY_MUTUAL_ROOMS = "uk.half-shot.msc2666.query_mutual_rooms";
const UNSTABLE_MSC4140_DELAYED_EVENTS = "org.matrix.msc4140";
enum CrossSigningKeyType {
MasterKey = "master_key",
SelfSigningKey = "self_signing_key",
@@ -4573,12 +4579,19 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
threadId = threadIdOrEventType;
}
// If we expect that an event is part of a thread but is missing the relation
// we need to add it manually, as well as the reply fallback
if (threadId && !content!["m.relates_to"]?.rel_type) {
const isReply = !!content!["m.relates_to"]?.["m.in_reply_to"];
content!["m.relates_to"] = {
...content!["m.relates_to"],
this.addThreadRelationIfNeeded(content, threadId, roomId);
return this.sendCompleteEvent(roomId, threadId, { type: eventType, content }, txnId);
}
/**
* If we expect that an event is part of a thread but is missing the relation
* we need to add it manually, as well as the reply fallback
*/
private addThreadRelationIfNeeded(content: IContent, threadId: string | null, roomId: string): void {
if (threadId && !content["m.relates_to"]?.rel_type) {
const isReply = !!content["m.relates_to"]?.["m.in_reply_to"];
content["m.relates_to"] = {
...content["m.relates_to"],
rel_type: THREAD_RELATION_TYPE.name,
event_id: threadId,
// Set is_falling_back to true unless this is actually intended to be a reply
@@ -4586,7 +4599,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
};
const thread = this.getRoom(roomId)?.getThread(threadId);
if (thread && !isReply) {
content!["m.relates_to"]["m.in_reply_to"] = {
content["m.relates_to"]["m.in_reply_to"] = {
event_id:
thread
.lastReply((ev: MatrixEvent) => {
@@ -4596,8 +4609,6 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
};
}
}
return this.sendCompleteEvent(roomId, threadId, { type: eventType, content }, txnId);
}
/**
@@ -4611,7 +4622,38 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
threadId: string | null,
eventObject: Partial<IEvent>,
txnId?: string,
): Promise<ISendEventResponse> {
): Promise<ISendEventResponse>;
/**
* Sends a delayed event (MSC4140).
* @param eventObject - An object with the partial structure of an event, to which event_id, user_id, room_id and origin_server_ts will be added.
* @param delayOpts - Properties of the delay for this event.
* @param txnId - Optional.
* @returns Promise which resolves: to an empty object `{}`
* @returns Rejects: with an error response.
*/
private sendCompleteEvent(
roomId: string,
threadId: string | null,
eventObject: Partial<IEvent>,
delayOpts: SendDelayedEventRequestOpts,
txnId?: string,
): Promise<SendDelayedEventResponse>;
private sendCompleteEvent(
roomId: string,
threadId: string | null,
eventObject: Partial<IEvent>,
delayOptsOrTxnId?: SendDelayedEventRequestOpts | string,
txnIdOrVoid?: string,
): Promise<ISendEventResponse | SendDelayedEventResponse> {
let delayOpts: SendDelayedEventRequestOpts | undefined;
let txnId: string | undefined;
if (typeof delayOptsOrTxnId === "string") {
txnId = delayOptsOrTxnId;
} else {
delayOpts = delayOptsOrTxnId;
txnId = txnIdOrVoid;
}
if (!txnId) {
txnId = this.makeTxnId();
}
@@ -4634,9 +4676,11 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
localEvent.setThread(thread);
}
// set up re-emitter for this new event - this is normally the job of EventMapper but we don't use it here
this.reEmitter.reEmit(localEvent, [MatrixEventEvent.Replaced, MatrixEventEvent.VisibilityChange]);
room?.reEmitter.reEmit(localEvent, [MatrixEventEvent.BeforeRedaction]);
if (!delayOpts) {
// set up re-emitter for this new event - this is normally the job of EventMapper but we don't use it here
this.reEmitter.reEmit(localEvent, [MatrixEventEvent.Replaced, MatrixEventEvent.VisibilityChange]);
room?.reEmitter.reEmit(localEvent, [MatrixEventEvent.BeforeRedaction]);
}
// if this is a relation or redaction of an event
// that hasn't been sent yet (e.g. with a local id starting with a ~)
@@ -4651,29 +4695,56 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
}
const type = localEvent.getType();
this.logger.debug(`sendEvent of type ${type} in ${roomId} with txnId ${txnId}`);
this.logger.debug(
`sendEvent of type ${type} in ${roomId} with txnId ${txnId}${delayOpts ? " (delayed event)" : ""}`,
);
localEvent.setTxnId(txnId);
localEvent.setStatus(EventStatus.SENDING);
// add this event immediately to the local store as 'sending'.
room?.addPendingEvent(localEvent, txnId);
// TODO: separate store for delayed events?
if (!delayOpts) {
// add this event immediately to the local store as 'sending'.
room?.addPendingEvent(localEvent, txnId);
// addPendingEvent can change the state to NOT_SENT if it believes
// that there's other events that have failed. We won't bother to
// try sending the event if the state has changed as such.
if (localEvent.status === EventStatus.NOT_SENT) {
return Promise.reject(new Error("Event blocked by other events not yet sent"));
// addPendingEvent can change the state to NOT_SENT if it believes
// that there's other events that have failed. We won't bother to
// try sending the event if the state has changed as such.
if (localEvent.status === EventStatus.NOT_SENT) {
return Promise.reject(new Error("Event blocked by other events not yet sent"));
}
return this.encryptAndSendEvent(room, localEvent);
} else {
return this.encryptAndSendEvent(room, localEvent, delayOpts);
}
return this.encryptAndSendEvent(room, localEvent);
}
/**
* encrypts the event if necessary; adds the event to the queue, or sends it; marks the event as sent/unsent
* @returns returns a promise which resolves with the result of the send request
*/
protected async encryptAndSendEvent(room: Room | null, event: MatrixEvent): Promise<ISendEventResponse> {
protected async encryptAndSendEvent(room: Room | null, event: MatrixEvent): Promise<ISendEventResponse>;
/**
* Simply sends a delayed event without encrypting it.
* TODO: Allow encrypted delayed events, and encrypt them properly
* @param delayOpts - Properties of the delay for this event.
* @returns returns a promise which resolves with the result of the delayed send request
*/
protected async encryptAndSendEvent(
room: Room | null,
event: MatrixEvent,
delayOpts: SendDelayedEventRequestOpts,
): Promise<SendDelayedEventResponse>;
protected async encryptAndSendEvent(
room: Room | null,
event: MatrixEvent,
delayOpts?: SendDelayedEventRequestOpts,
): Promise<ISendEventResponse | SendDelayedEventResponse> {
if (delayOpts) {
return this.sendEventHttpRequest(event, delayOpts);
}
try {
let cancelled: boolean;
this.eventsBeingEncrypted.add(event.getId()!);
@@ -4824,7 +4895,15 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
}
}
private sendEventHttpRequest(event: MatrixEvent): Promise<ISendEventResponse> {
private sendEventHttpRequest(event: MatrixEvent): Promise<ISendEventResponse>;
private sendEventHttpRequest(
event: MatrixEvent,
delayOpts: SendDelayedEventRequestOpts,
): Promise<SendDelayedEventResponse>;
private sendEventHttpRequest(
event: MatrixEvent,
delayOpts?: SendDelayedEventRequestOpts,
): Promise<ISendEventResponse | SendDelayedEventResponse> {
let txnId = event.getTxnId();
if (!txnId) {
txnId = this.makeTxnId();
@@ -4856,12 +4935,20 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
path = utils.encodeUri("/rooms/$roomId/send/$eventType/$txnId", pathParams);
}
return this.http
.authedRequest<ISendEventResponse>(Method.Put, path, undefined, event.getWireContent())
.then((res) => {
const content = event.getWireContent();
if (!delayOpts) {
return this.http.authedRequest<ISendEventResponse>(Method.Put, path, undefined, content).then((res) => {
this.logger.debug(`Event sent to ${event.getRoomId()} with event id ${res.event_id}`);
return res;
});
} else {
return this.http.authedRequest<SendDelayedEventResponse>(
Method.Put,
path,
getUnstableDelayQueryOpts(delayOpts),
content,
);
}
}
/**
@@ -5191,6 +5278,101 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
return this.sendMessage(roomId, threadId, content);
}
/**
* Send a delayed timeline event.
*
* Note: This endpoint is unstable, and can throw an `Error`.
* Check progress on [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140) for more details.
*/
// eslint-disable-next-line
public async _unstable_sendDelayedEvent<K extends keyof TimelineEvents>(
roomId: string,
delayOpts: SendDelayedEventRequestOpts,
threadId: string | null,
eventType: K,
content: TimelineEvents[K],
txnId?: string,
): Promise<SendDelayedEventResponse> {
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
throw Error("Server does not support the delayed events API");
}
this.addThreadRelationIfNeeded(content, threadId, roomId);
return this.sendCompleteEvent(roomId, threadId, { type: eventType, content }, delayOpts, txnId);
}
/**
* Send a delayed state event.
*
* Note: This endpoint is unstable, and can throw an `Error`.
* Check progress on [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140) for more details.
*/
// eslint-disable-next-line
public async _unstable_sendDelayedStateEvent<K extends keyof StateEvents>(
roomId: string,
delayOpts: SendDelayedEventRequestOpts,
eventType: K,
content: StateEvents[K],
stateKey = "",
opts: IRequestOpts = {},
): Promise<SendDelayedEventResponse> {
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
throw Error("Server does not support the delayed events API");
}
const pathParams = {
$roomId: roomId,
$eventType: eventType,
$stateKey: stateKey,
};
let path = utils.encodeUri("/rooms/$roomId/state/$eventType", pathParams);
if (stateKey !== undefined) {
path = utils.encodeUri(path + "/$stateKey", pathParams);
}
return this.http.authedRequest(Method.Put, path, getUnstableDelayQueryOpts(delayOpts), content as Body, opts);
}
/**
* Get all pending delayed events for the calling user.
*
* Note: This endpoint is unstable, and can throw an `Error`.
* Check progress on [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140) for more details.
*/
// eslint-disable-next-line
public async _unstable_getDelayedEvents(fromToken?: string): Promise<DelayedEventInfo> {
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
throw Error("Server does not support the delayed events API");
}
const queryDict = fromToken ? { from: fromToken } : undefined;
return await this.http.authedRequest(Method.Get, "/delayed_events", queryDict, undefined, {
prefix: `${ClientPrefix.Unstable}/${UNSTABLE_MSC4140_DELAYED_EVENTS}`,
});
}
/**
* Manage a delayed event associated with the given delay_id.
*
* Note: This endpoint is unstable, and can throw an `Error`.
* Check progress on [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140) for more details.
*/
// eslint-disable-next-line
public async _unstable_updateDelayedEvent(delayId: string, action: UpdateDelayedEventAction): Promise<{}> {
if (!(await this.doesServerSupportUnstableFeature(UNSTABLE_MSC4140_DELAYED_EVENTS))) {
throw Error("Server does not support the delayed events API");
}
const path = utils.encodeUri("/delayed_events/$delayId", {
$delayId: delayId,
});
const data = {
action,
};
return await this.http.authedRequest(Method.Post, path, undefined, data, {
prefix: `${ClientPrefix.Unstable}/${UNSTABLE_MSC4140_DELAYED_EVENTS}`,
});
}
/**
* Send a receipt.
* @param event - The event being acknowledged
@@ -9892,6 +10074,12 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
}
}
function getUnstableDelayQueryOpts(delayOpts: SendDelayedEventRequestOpts): QueryDict {
return Object.fromEntries(
Object.entries(delayOpts).map(([k, v]) => [`${UNSTABLE_MSC4140_DELAYED_EVENTS}.${k}`, v]),
);
}
/**
* recalculates an accurate notifications count on event decryption.
* Servers do not have enough knowledge about encrypted events to calculate an

View File

@@ -26,7 +26,7 @@ import {
} from "matrix-widget-api";
import { MatrixEvent, IEvent, IContent, EventStatus } from "./models/event";
import { ISendEventResponse } from "./@types/requests";
import { ISendEventResponse, SendDelayedEventRequestOpts, SendDelayedEventResponse } from "./@types/requests";
import { EventType } from "./@types/event";
import { logger } from "./logger";
import {
@@ -248,7 +248,20 @@ export class RoomWidgetClient extends MatrixClient {
throw new Error(`Unknown room: ${roomIdOrAlias}`);
}
protected async encryptAndSendEvent(room: Room, event: MatrixEvent): Promise<ISendEventResponse> {
protected async encryptAndSendEvent(room: Room, event: MatrixEvent): Promise<ISendEventResponse>;
protected async encryptAndSendEvent(
room: Room,
event: MatrixEvent,
delayOpts: SendDelayedEventRequestOpts,
): Promise<SendDelayedEventResponse>;
protected async encryptAndSendEvent(
room: Room,
event: MatrixEvent,
delayOpts?: SendDelayedEventRequestOpts,
): Promise<ISendEventResponse | SendDelayedEventResponse> {
if (delayOpts) {
throw new Error("Delayed event sending via widgets is not implemented");
}
let response: ISendEventFromWidgetResponseData;
try {
response = await this.widgetApi.sendRoomEvent(event.getType(), event.getContent(), room.roomId);

View File

@@ -20,6 +20,7 @@ import { EventTimeline } from "../models/event-timeline";
import { Room } from "../models/room";
import { MatrixClient } from "../client";
import { EventType } from "../@types/event";
import { UpdateDelayedEventAction } from "../@types/requests";
import {
CallMembership,
CallMembershipData,
@@ -865,27 +866,57 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
newContent = this.makeNewMembership(localDeviceId);
}
const stateKey = legacy ? localUserId : this.makeMembershipStateKey(localUserId, localDeviceId);
try {
await this.client.sendStateEvent(
this.room.roomId,
EventType.GroupCallMemberPrefix,
newContent,
legacy ? localUserId : this.makeMembershipStateKey(localUserId, localDeviceId),
);
await this.client.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, newContent, stateKey);
logger.info(`Sent updated call member event.`);
// check periodically to see if we need to refresh our member event
if (this.isJoined() && legacy) {
this.memberEventTimeout = setTimeout(this.triggerCallMembershipEventUpdate, MEMBER_EVENT_CHECK_PERIOD);
if (this.isJoined()) {
if (legacy) {
this.memberEventTimeout = setTimeout(
this.triggerCallMembershipEventUpdate,
MEMBER_EVENT_CHECK_PERIOD,
);
} else {
try {
// TODO: If delayed event times out, re-join!
const res = await this.client._unstable_sendDelayedStateEvent(
this.room.roomId,
{
delay: 8000,
},
EventType.GroupCallMemberPrefix,
{}, // leave event
stateKey,
);
this.scheduleDelayDisconnection(res.delay_id);
} catch (e) {
logger.error("Failed to send delayed event:", e);
}
}
}
} catch (e) {
const resendDelay = CALL_MEMBER_EVENT_RETRY_DELAY_MIN + Math.random() * 2000;
logger.warn(`Failed to send call member event: retrying in ${resendDelay}`);
logger.warn(`Failed to send call member event (retrying in ${resendDelay}): ${e}`);
await new Promise((resolve) => setTimeout(resolve, resendDelay));
await this.triggerCallMembershipEventUpdate();
}
}
private scheduleDelayDisconnection(delayId: string): void {
this.memberEventTimeout = setTimeout(() => this.delayDisconnection(delayId), 5000);
}
private async delayDisconnection(delayId: string): Promise<void> {
try {
await this.client._unstable_updateDelayedEvent(delayId, UpdateDelayedEventAction.Restart);
this.scheduleDelayDisconnection(delayId);
} catch (e) {
logger.error("Failed to delay our disconnection event", e);
}
}
private stateEventsContainOngoingLegacySession(callMemberEvents: Map<string, MatrixEvent>): boolean {
for (const callMemberEvent of callMemberEvents.values()) {
const content = callMemberEvent.getContent();