You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-08-07 23:02:56 +03:00
Fix local echo in embedded mode (#4498)
* fix local echo * dont use custome event emitter anymore * move logic into updateTxId * temp testing * use generic eventEmtitter names * add tests --------- Co-authored-by: Robin <robin@robin.town> Co-authored-by: Hugh Nimmo-Smith <hughns@users.noreply.github.com>
This commit is contained in:
@@ -30,6 +30,7 @@ import {
|
|||||||
ITurnServer,
|
ITurnServer,
|
||||||
IRoomEvent,
|
IRoomEvent,
|
||||||
IOpenIDCredentials,
|
IOpenIDCredentials,
|
||||||
|
ISendEventFromWidgetResponseData,
|
||||||
WidgetApiResponseError,
|
WidgetApiResponseError,
|
||||||
} from "matrix-widget-api";
|
} from "matrix-widget-api";
|
||||||
|
|
||||||
@@ -40,6 +41,7 @@ import { ICapabilities, RoomWidgetClient } from "../../src/embedded";
|
|||||||
import { MatrixEvent } from "../../src/models/event";
|
import { MatrixEvent } from "../../src/models/event";
|
||||||
import { ToDeviceBatch } from "../../src/models/ToDeviceMessage";
|
import { ToDeviceBatch } from "../../src/models/ToDeviceMessage";
|
||||||
import { DeviceInfo } from "../../src/crypto/deviceinfo";
|
import { DeviceInfo } from "../../src/crypto/deviceinfo";
|
||||||
|
import { sleep } from "../../src/utils";
|
||||||
|
|
||||||
const testOIDCToken = {
|
const testOIDCToken = {
|
||||||
access_token: "12345678",
|
access_token: "12345678",
|
||||||
@@ -127,9 +129,16 @@ describe("RoomWidgetClient", () => {
|
|||||||
const makeClient = async (
|
const makeClient = async (
|
||||||
capabilities: ICapabilities,
|
capabilities: ICapabilities,
|
||||||
sendContentLoaded: boolean | undefined = undefined,
|
sendContentLoaded: boolean | undefined = undefined,
|
||||||
|
userId?: string,
|
||||||
): Promise<void> => {
|
): Promise<void> => {
|
||||||
const baseUrl = "https://example.org";
|
const baseUrl = "https://example.org";
|
||||||
client = createRoomWidgetClient(widgetApi, capabilities, "!1:example.org", { baseUrl }, sendContentLoaded);
|
client = createRoomWidgetClient(
|
||||||
|
widgetApi,
|
||||||
|
capabilities,
|
||||||
|
"!1:example.org",
|
||||||
|
{ baseUrl, userId },
|
||||||
|
sendContentLoaded,
|
||||||
|
);
|
||||||
expect(widgetApi.start).toHaveBeenCalled(); // needs to have been called early in order to not miss messages
|
expect(widgetApi.start).toHaveBeenCalled(); // needs to have been called early in order to not miss messages
|
||||||
widgetApi.emit("ready");
|
widgetApi.emit("ready");
|
||||||
await client.startClient();
|
await client.startClient();
|
||||||
@@ -192,6 +201,142 @@ describe("RoomWidgetClient", () => {
|
|||||||
.map((e) => e.getEffectiveEvent()),
|
.map((e) => e.getEffectiveEvent()),
|
||||||
).toEqual([event]);
|
).toEqual([event]);
|
||||||
});
|
});
|
||||||
|
describe("local echos", () => {
|
||||||
|
const setupRemoteEcho = () => {
|
||||||
|
makeClient(
|
||||||
|
{
|
||||||
|
receiveEvent: ["org.matrix.rageshake_request"],
|
||||||
|
sendEvent: ["org.matrix.rageshake_request"],
|
||||||
|
},
|
||||||
|
undefined,
|
||||||
|
"@me:example.org",
|
||||||
|
);
|
||||||
|
expect(widgetApi.requestCapabilityForRoomTimeline).toHaveBeenCalledWith("!1:example.org");
|
||||||
|
expect(widgetApi.requestCapabilityToReceiveEvent).toHaveBeenCalledWith("org.matrix.rageshake_request");
|
||||||
|
const injectSpy = jest.spyOn((client as any).syncApi, "injectRoomEvents");
|
||||||
|
const widgetSendEmitter = new EventEmitter();
|
||||||
|
const widgetSendPromise = new Promise<void>((resolve) =>
|
||||||
|
widgetSendEmitter.once("send", () => resolve()),
|
||||||
|
);
|
||||||
|
const resolveWidgetSend = () => widgetSendEmitter.emit("send");
|
||||||
|
widgetApi.sendRoomEvent.mockImplementation(
|
||||||
|
async (eType, content, roomId): Promise<ISendEventFromWidgetResponseData> => {
|
||||||
|
await widgetSendPromise;
|
||||||
|
return { room_id: "!1:example.org", event_id: "event_id" };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
return { injectSpy, resolveWidgetSend };
|
||||||
|
};
|
||||||
|
const remoteEchoEvent = new CustomEvent(`action:${WidgetApiToWidgetAction.SendEvent}`, {
|
||||||
|
detail: {
|
||||||
|
data: {
|
||||||
|
type: "org.matrix.rageshake_request",
|
||||||
|
|
||||||
|
room_id: "!1:example.org",
|
||||||
|
event_id: "event_id",
|
||||||
|
sender: "@me:example.org",
|
||||||
|
state_key: "bar",
|
||||||
|
content: { hello: "world" },
|
||||||
|
unsigned: { transaction_id: "1234" },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
cancelable: true,
|
||||||
|
});
|
||||||
|
it("get response then local echo", async () => {
|
||||||
|
await sleep(600);
|
||||||
|
const { injectSpy, resolveWidgetSend } = await setupRemoteEcho();
|
||||||
|
|
||||||
|
// Begin by sending an event:
|
||||||
|
client.sendEvent("!1:example.org", "org.matrix.rageshake_request", { request_id: 12 }, "widgetTxId");
|
||||||
|
// we do not expect it to be send -- until we call `resolveWidgetSend`
|
||||||
|
expect(injectSpy).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
// We first get the response from the widget
|
||||||
|
resolveWidgetSend();
|
||||||
|
// We then get the remote echo from the widget
|
||||||
|
widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, remoteEchoEvent);
|
||||||
|
|
||||||
|
// gets emitted after the event got injected
|
||||||
|
await new Promise<void>((resolve) => client.once(ClientEvent.Event, () => resolve()));
|
||||||
|
expect(injectSpy).toHaveBeenCalled();
|
||||||
|
|
||||||
|
const call = injectSpy.mock.calls[0] as any;
|
||||||
|
const injectedEv = call[2][0];
|
||||||
|
expect(injectedEv.getType()).toBe("org.matrix.rageshake_request");
|
||||||
|
expect(injectedEv.getUnsigned().transaction_id).toBe("widgetTxId");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("get local echo then response", async () => {
|
||||||
|
await sleep(600);
|
||||||
|
const { injectSpy, resolveWidgetSend } = await setupRemoteEcho();
|
||||||
|
|
||||||
|
// Begin by sending an event:
|
||||||
|
client.sendEvent("!1:example.org", "org.matrix.rageshake_request", { request_id: 12 }, "widgetTxId");
|
||||||
|
// we do not expect it to be send -- until we call `resolveWidgetSend`
|
||||||
|
expect(injectSpy).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
// We first get the remote echo from the widget
|
||||||
|
widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, remoteEchoEvent);
|
||||||
|
expect(injectSpy).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
// We then get the response from the widget
|
||||||
|
resolveWidgetSend();
|
||||||
|
|
||||||
|
// Gets emitted after the event got injected
|
||||||
|
await new Promise<void>((resolve) => client.once(ClientEvent.Event, () => resolve()));
|
||||||
|
expect(injectSpy).toHaveBeenCalled();
|
||||||
|
|
||||||
|
const call = injectSpy.mock.calls[0] as any;
|
||||||
|
const injectedEv = call[2][0];
|
||||||
|
expect(injectedEv.getType()).toBe("org.matrix.rageshake_request");
|
||||||
|
expect(injectedEv.getUnsigned().transaction_id).toBe("widgetTxId");
|
||||||
|
});
|
||||||
|
it("__ local echo then response", async () => {
|
||||||
|
await sleep(600);
|
||||||
|
const { injectSpy, resolveWidgetSend } = await setupRemoteEcho();
|
||||||
|
|
||||||
|
// Begin by sending an event:
|
||||||
|
client.sendEvent("!1:example.org", "org.matrix.rageshake_request", { request_id: 12 }, "widgetTxId");
|
||||||
|
// we do not expect it to be send -- until we call `resolveWidgetSend`
|
||||||
|
expect(injectSpy).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
// We first get the remote echo from the widget
|
||||||
|
widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, remoteEchoEvent);
|
||||||
|
const otherRemoteEcho = new CustomEvent(`action:${WidgetApiToWidgetAction.SendEvent}`, {
|
||||||
|
detail: { data: { ...remoteEchoEvent.detail.data } },
|
||||||
|
});
|
||||||
|
otherRemoteEcho.detail.data.unsigned.transaction_id = "4567";
|
||||||
|
otherRemoteEcho.detail.data.event_id = "other_id";
|
||||||
|
widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, otherRemoteEcho);
|
||||||
|
|
||||||
|
// Simulate the wait time while the widget is waiting for a response
|
||||||
|
// after we already received the remote echo
|
||||||
|
await sleep(20);
|
||||||
|
// even after the wait we do not want any event to be injected.
|
||||||
|
// we do not know their event_id and cannot know if they are the remote echo
|
||||||
|
// where we need to update the txId because they are send by this client
|
||||||
|
expect(injectSpy).not.toHaveBeenCalled();
|
||||||
|
// We then get the response from the widget
|
||||||
|
resolveWidgetSend();
|
||||||
|
|
||||||
|
// Gets emitted after the event got injected
|
||||||
|
await new Promise<void>((resolve) => client.once(ClientEvent.Event, () => resolve()));
|
||||||
|
// Now we want both events to be injected since we know the txId - event_id match
|
||||||
|
expect(injectSpy).toHaveBeenCalled();
|
||||||
|
|
||||||
|
// it has been called with the event sent by ourselves
|
||||||
|
const call = injectSpy.mock.calls[0] as any;
|
||||||
|
const injectedEv = call[2][0];
|
||||||
|
expect(injectedEv.getType()).toBe("org.matrix.rageshake_request");
|
||||||
|
expect(injectedEv.getUnsigned().transaction_id).toBe("widgetTxId");
|
||||||
|
|
||||||
|
// It has been called by the event we blocked because of our send right afterwards
|
||||||
|
const call2 = injectSpy.mock.calls[1] as any;
|
||||||
|
const injectedEv2 = call2[2][0];
|
||||||
|
expect(injectedEv2.getType()).toBe("org.matrix.rageshake_request");
|
||||||
|
expect(injectedEv2.getUnsigned().transaction_id).toBe("4567");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it("handles widget errors with generic error data", async () => {
|
it("handles widget errors with generic error data", async () => {
|
||||||
const error = new Error("failed to send");
|
const error = new Error("failed to send");
|
||||||
|
@@ -57,6 +57,7 @@ import { ToDeviceBatch, ToDevicePayload } from "./models/ToDeviceMessage.ts";
|
|||||||
import { DeviceInfo } from "./crypto/deviceinfo.ts";
|
import { DeviceInfo } from "./crypto/deviceinfo.ts";
|
||||||
import { IOlmDevice } from "./crypto/algorithms/megolm.ts";
|
import { IOlmDevice } from "./crypto/algorithms/megolm.ts";
|
||||||
import { MapWithDefault, recursiveMapToObject } from "./utils.ts";
|
import { MapWithDefault, recursiveMapToObject } from "./utils.ts";
|
||||||
|
import { TypedEventEmitter } from "./matrix.ts";
|
||||||
|
|
||||||
interface IStateEventRequest {
|
interface IStateEventRequest {
|
||||||
eventType: string;
|
eventType: string;
|
||||||
@@ -123,6 +124,10 @@ export interface ICapabilities {
|
|||||||
updateDelayedEvents?: boolean;
|
updateDelayedEvents?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export enum RoomWidgetClientEvent {
|
||||||
|
PendingEventsChanged = "PendingEvent.pendingEventsChanged",
|
||||||
|
}
|
||||||
|
export type EventHandlerMap = { [RoomWidgetClientEvent.PendingEventsChanged]: () => void };
|
||||||
/**
|
/**
|
||||||
* A MatrixClient that routes its requests through the widget API instead of the
|
* A MatrixClient that routes its requests through the widget API instead of the
|
||||||
* real CS API.
|
* real CS API.
|
||||||
@@ -134,6 +139,9 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
private lifecycle?: AbortController;
|
private lifecycle?: AbortController;
|
||||||
private syncState: SyncState | null = null;
|
private syncState: SyncState | null = null;
|
||||||
|
|
||||||
|
private pendingSendingEventsTxId: { type: string; id: string | undefined; txId: string }[] = [];
|
||||||
|
private eventEmitter = new TypedEventEmitter<keyof EventHandlerMap, EventHandlerMap>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param widgetApi - The widget api to use for communication.
|
* @param widgetApi - The widget api to use for communication.
|
||||||
@@ -330,6 +338,8 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
const content = event.event.redacts
|
const content = event.event.redacts
|
||||||
? { ...event.getContent(), redacts: event.event.redacts }
|
? { ...event.getContent(), redacts: event.event.redacts }
|
||||||
: event.getContent();
|
: event.getContent();
|
||||||
|
|
||||||
|
// Delayed event special case.
|
||||||
if (delayOpts) {
|
if (delayOpts) {
|
||||||
// TODO: updatePendingEvent for delayed events?
|
// TODO: updatePendingEvent for delayed events?
|
||||||
const response = await this.widgetApi.sendRoomEvent(
|
const response = await this.widgetApi.sendRoomEvent(
|
||||||
@@ -342,6 +352,10 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
return this.validateSendDelayedEventResponse(response);
|
return this.validateSendDelayedEventResponse(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const txId = event.getTxnId();
|
||||||
|
// Add the txnId to the pending list (still with unknown evID)
|
||||||
|
if (txId) this.pendingSendingEventsTxId.push({ type: event.getType(), id: undefined, txId });
|
||||||
|
|
||||||
let response: ISendEventFromWidgetResponseData;
|
let response: ISendEventFromWidgetResponseData;
|
||||||
try {
|
try {
|
||||||
response = await this.widgetApi.sendRoomEvent(event.getType(), content, room.roomId);
|
response = await this.widgetApi.sendRoomEvent(event.getType(), content, room.roomId);
|
||||||
@@ -349,9 +363,15 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
this.updatePendingEventStatus(room, event, EventStatus.NOT_SENT);
|
this.updatePendingEventStatus(room, event, EventStatus.NOT_SENT);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
// This also checks for an event id on the response
|
// This also checks for an event id on the response
|
||||||
room.updatePendingEvent(event, EventStatus.SENT, response.event_id);
|
room.updatePendingEvent(event, EventStatus.SENT, response.event_id);
|
||||||
|
|
||||||
|
// Update the pending events list with the eventId
|
||||||
|
this.pendingSendingEventsTxId.forEach((p) => {
|
||||||
|
if (p.txId === txId) p.id = response.event_id;
|
||||||
|
});
|
||||||
|
this.eventEmitter.emit(RoomWidgetClientEvent.PendingEventsChanged);
|
||||||
|
|
||||||
return { event_id: response.event_id! };
|
return { event_id: response.event_id! };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -495,6 +515,48 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
await this.widgetApi.transport.reply<IWidgetApiAcknowledgeResponseData>(ev.detail, {});
|
await this.widgetApi.transport.reply<IWidgetApiAcknowledgeResponseData>(ev.detail, {});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private updateTxId = async (event: MatrixEvent): Promise<void> => {
|
||||||
|
// We update the txId for remote echos that originate from this client.
|
||||||
|
// This happens with the help of `pendingSendingEventsTxId` where we store all events that are currently sending
|
||||||
|
// with their widget txId and once ready the final evId.
|
||||||
|
if (
|
||||||
|
// This could theoretically be an event send by this device
|
||||||
|
// In that case we need to update the txId of the event because the embedded client/widget
|
||||||
|
// knows this event with a different transaction Id than what was used by the host client.
|
||||||
|
event.getSender() === this.getUserId() &&
|
||||||
|
// We optimize by not blocking events from types that we have not send
|
||||||
|
// with this client.
|
||||||
|
this.pendingSendingEventsTxId.some((p) => event.getType() === p.type)
|
||||||
|
) {
|
||||||
|
// Compare by event Id if we have a matching pending event where we know the txId.
|
||||||
|
let matchingTxId = this.pendingSendingEventsTxId.find((p) => p.id === event.getId())?.txId;
|
||||||
|
// Block any further processing of this event until we have received the sending response.
|
||||||
|
// -> until we know the event id.
|
||||||
|
// -> until we have not pending events anymore.
|
||||||
|
while (!matchingTxId && this.pendingSendingEventsTxId.length > 0) {
|
||||||
|
// Recheck whenever the PendingEventsChanged
|
||||||
|
await new Promise<void>((resolve) =>
|
||||||
|
this.eventEmitter.once(RoomWidgetClientEvent.PendingEventsChanged, () => resolve()),
|
||||||
|
);
|
||||||
|
matchingTxId = this.pendingSendingEventsTxId.find((p) => p.id === event.getId())?.txId;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We found the correct txId: we update the event and delete the entry of the pending events.
|
||||||
|
if (matchingTxId) {
|
||||||
|
event.setTxnId(matchingTxId);
|
||||||
|
event.setUnsigned({ ...event.getUnsigned(), transaction_id: matchingTxId });
|
||||||
|
}
|
||||||
|
this.pendingSendingEventsTxId = this.pendingSendingEventsTxId.filter((p) => p.id !== event.getId());
|
||||||
|
|
||||||
|
// Emit once there are no pending events anymore to release all other events that got
|
||||||
|
// awaited in the `while (!matchingTxId && this.pendingSendingEventsTxId.length > 0)` loop
|
||||||
|
// but are not send by this client.
|
||||||
|
if (this.pendingSendingEventsTxId.length === 0) {
|
||||||
|
this.eventEmitter.emit(RoomWidgetClientEvent.PendingEventsChanged);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
private onEvent = async (ev: CustomEvent<ISendEventToWidgetActionRequest>): Promise<void> => {
|
private onEvent = async (ev: CustomEvent<ISendEventToWidgetActionRequest>): Promise<void> => {
|
||||||
ev.preventDefault();
|
ev.preventDefault();
|
||||||
|
|
||||||
@@ -502,6 +564,9 @@ export class RoomWidgetClient extends MatrixClient {
|
|||||||
// send us events from other rooms if this widget is always on screen
|
// send us events from other rooms if this widget is always on screen
|
||||||
if (ev.detail.data.room_id === this.roomId) {
|
if (ev.detail.data.room_id === this.roomId) {
|
||||||
const event = new MatrixEvent(ev.detail.data as Partial<IEvent>);
|
const event = new MatrixEvent(ev.detail.data as Partial<IEvent>);
|
||||||
|
|
||||||
|
// Only inject once we have update the txId
|
||||||
|
await this.updateTxId(event);
|
||||||
await this.syncApi!.injectRoomEvents(this.room!, [], [event]);
|
await this.syncApi!.injectRoomEvents(this.room!, [], [event]);
|
||||||
this.emit(ClientEvent.Event, event);
|
this.emit(ClientEvent.Event, event);
|
||||||
this.setSyncState(SyncState.Syncing);
|
this.setSyncState(SyncState.Syncing);
|
||||||
|
@@ -56,7 +56,7 @@ export class MatrixRTCSessionManager extends TypedEventEmitter<MatrixRTCSessionM
|
|||||||
|
|
||||||
public start(): void {
|
public start(): void {
|
||||||
// We shouldn't need to null-check here, but matrix-client.spec.ts mocks getRooms
|
// We shouldn't need to null-check here, but matrix-client.spec.ts mocks getRooms
|
||||||
// returing nothing, and breaks tests if you change it to return an empty array :'(
|
// returning nothing, and breaks tests if you change it to return an empty array :'(
|
||||||
for (const room of this.client.getRooms() ?? []) {
|
for (const room of this.client.getRooms() ?? []) {
|
||||||
const session = MatrixRTCSession.roomSessionForRoom(this.client, room);
|
const session = MatrixRTCSession.roomSessionForRoom(this.client, room);
|
||||||
if (session.memberships.length > 0) {
|
if (session.memberships.length > 0) {
|
||||||
|
Reference in New Issue
Block a user