You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-08-07 23:02:56 +03:00
Retry to-device messages (#2549)
* Retry to-device messages This adds a queueToDevice API alongside sendToDevice which is a much higher-level API that adds the messages to a queue, stored in persistent storage, and retries them periodically. Also converts sending of megolm keys to use the new API. Other uses of sendToDevice are nopt converted in this PR, but could be later. Requires https://github.com/matrix-org/matrix-mock-request/pull/17 * Bump matrix-mock-request * Add more waits to make indexeddb tests pass * Switch some test expectations to queueToDevice * Stop straight away if the client has been stopped Hopefully will fix tests being flakey and logging after tests have finished. * Add return types & fix constant usage * Fix return type Co-authored-by: Germain <germains@element.io> * Fix return type Co-authored-by: Germain <germains@element.io> * Fix return type Co-authored-by: Germain <germains@element.io> * Stop the client in all test cases Co-authored-by: Germain <germains@element.io>
This commit is contained in:
@@ -102,7 +102,7 @@
|
|||||||
"jest-localstorage-mock": "^2.4.6",
|
"jest-localstorage-mock": "^2.4.6",
|
||||||
"jest-sonar-reporter": "^2.0.0",
|
"jest-sonar-reporter": "^2.0.0",
|
||||||
"jsdoc": "^3.6.6",
|
"jsdoc": "^3.6.6",
|
||||||
"matrix-mock-request": "^2.1.0",
|
"matrix-mock-request": "^2.1.1",
|
||||||
"rimraf": "^3.0.2",
|
"rimraf": "^3.0.2",
|
||||||
"terser": "^5.5.1",
|
"terser": "^5.5.1",
|
||||||
"tsify": "^5.0.2",
|
"tsify": "^5.0.2",
|
||||||
|
@@ -59,6 +59,7 @@ describe("MegolmDecryption", function() {
|
|||||||
mockBaseApis = {
|
mockBaseApis = {
|
||||||
claimOneTimeKeys: jest.fn(),
|
claimOneTimeKeys: jest.fn(),
|
||||||
sendToDevice: jest.fn(),
|
sendToDevice: jest.fn(),
|
||||||
|
queueToDevice: jest.fn(),
|
||||||
} as unknown as MockedObject<MatrixClient>;
|
} as unknown as MockedObject<MatrixClient>;
|
||||||
|
|
||||||
const cryptoStore = new MemoryCryptoStore();
|
const cryptoStore = new MemoryCryptoStore();
|
||||||
@@ -179,6 +180,7 @@ describe("MegolmDecryption", function() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
mockBaseApis.sendToDevice.mockReset();
|
mockBaseApis.sendToDevice.mockReset();
|
||||||
|
mockBaseApis.queueToDevice.mockReset();
|
||||||
|
|
||||||
// do the share
|
// do the share
|
||||||
megolmDecryption.shareKeysWithDevice(keyRequest);
|
megolmDecryption.shareKeysWithDevice(keyRequest);
|
||||||
@@ -324,6 +326,7 @@ describe("MegolmDecryption", function() {
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
mockBaseApis.sendToDevice.mockResolvedValue(undefined);
|
mockBaseApis.sendToDevice.mockResolvedValue(undefined);
|
||||||
|
mockBaseApis.queueToDevice.mockResolvedValue(undefined);
|
||||||
|
|
||||||
aliceDeviceInfo = {
|
aliceDeviceInfo = {
|
||||||
deviceId: 'aliceDevice',
|
deviceId: 'aliceDevice',
|
||||||
@@ -403,7 +406,7 @@ describe("MegolmDecryption", function() {
|
|||||||
expect(mockCrypto.downloadKeys).toHaveBeenCalledWith(
|
expect(mockCrypto.downloadKeys).toHaveBeenCalledWith(
|
||||||
['@alice:home.server'], false,
|
['@alice:home.server'], false,
|
||||||
);
|
);
|
||||||
expect(mockBaseApis.sendToDevice).toHaveBeenCalled();
|
expect(mockBaseApis.queueToDevice).toHaveBeenCalled();
|
||||||
expect(mockBaseApis.claimOneTimeKeys).toHaveBeenCalledWith(
|
expect(mockBaseApis.claimOneTimeKeys).toHaveBeenCalledWith(
|
||||||
[['@alice:home.server', 'aliceDevice']], 'signed_curve25519', 2000,
|
[['@alice:home.server', 'aliceDevice']], 'signed_curve25519', 2000,
|
||||||
);
|
);
|
||||||
@@ -446,7 +449,7 @@ describe("MegolmDecryption", function() {
|
|||||||
'YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWI',
|
'YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWI',
|
||||||
);
|
);
|
||||||
|
|
||||||
mockBaseApis.sendToDevice.mockClear();
|
mockBaseApis.queueToDevice.mockClear();
|
||||||
await megolmEncryption.reshareKeyWithDevice(
|
await megolmEncryption.reshareKeyWithDevice(
|
||||||
olmDevice.deviceCurve25519Key,
|
olmDevice.deviceCurve25519Key,
|
||||||
ct1.session_id,
|
ct1.session_id,
|
||||||
@@ -454,7 +457,7 @@ describe("MegolmDecryption", function() {
|
|||||||
aliceDeviceInfo,
|
aliceDeviceInfo,
|
||||||
);
|
);
|
||||||
|
|
||||||
expect(mockBaseApis.sendToDevice).not.toHaveBeenCalled();
|
expect(mockBaseApis.queueToDevice).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
338
spec/unit/queueToDevice.spec.ts
Normal file
338
spec/unit/queueToDevice.spec.ts
Normal file
@@ -0,0 +1,338 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import MockHttpBackend from 'matrix-mock-request';
|
||||||
|
import { indexedDB as fakeIndexedDB } from 'fake-indexeddb';
|
||||||
|
|
||||||
|
import { IHttpOpts, IndexedDBStore, MatrixEvent, MemoryStore, Room } from "../../src";
|
||||||
|
import { MatrixClient } from "../../src/client";
|
||||||
|
import { ToDeviceBatch } from '../../src/models/ToDeviceMessage';
|
||||||
|
import { logger } from '../../src/logger';
|
||||||
|
import { IStore } from '../../src/store';
|
||||||
|
|
||||||
|
const FAKE_USER = "@alice:example.org";
|
||||||
|
const FAKE_DEVICE_ID = "AAAAAAAA";
|
||||||
|
const FAKE_PAYLOAD = {
|
||||||
|
"foo": 42,
|
||||||
|
};
|
||||||
|
const EXPECTED_BODY = {
|
||||||
|
messages: {
|
||||||
|
[FAKE_USER]: {
|
||||||
|
[FAKE_DEVICE_ID]: FAKE_PAYLOAD,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const FAKE_MSG = {
|
||||||
|
userId: FAKE_USER,
|
||||||
|
deviceId: FAKE_DEVICE_ID,
|
||||||
|
payload: FAKE_PAYLOAD,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum StoreType {
|
||||||
|
Memory = 'Memory',
|
||||||
|
IndexedDB = 'IndexedDB',
|
||||||
|
}
|
||||||
|
|
||||||
|
// Jest now uses @sinonjs/fake-timers which exposes tickAsync() and a number of
|
||||||
|
// other async methods which break the event loop, letting scheduled promise
|
||||||
|
// callbacks run. Unfortunately, Jest doesn't expose these, so we have to do
|
||||||
|
// it manually (this is what sinon does under the hood). We do both in a loop
|
||||||
|
// until the thing we expect happens: hopefully this is the least flakey way
|
||||||
|
// and avoids assuming anything about the app's behaviour.
|
||||||
|
const realSetTimeout = setTimeout;
|
||||||
|
function flushPromises() {
|
||||||
|
return new Promise(r => {
|
||||||
|
realSetTimeout(r, 1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function flushAndRunTimersUntil(cond: () => boolean) {
|
||||||
|
while (!cond()) {
|
||||||
|
await flushPromises();
|
||||||
|
if (cond()) break;
|
||||||
|
jest.advanceTimersToNextTimer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe.each([
|
||||||
|
[StoreType.Memory], [StoreType.IndexedDB],
|
||||||
|
])("queueToDevice (%s store)", function(storeType) {
|
||||||
|
let httpBackend: MockHttpBackend;
|
||||||
|
let client: MatrixClient;
|
||||||
|
|
||||||
|
beforeEach(async function() {
|
||||||
|
httpBackend = new MockHttpBackend();
|
||||||
|
|
||||||
|
let store: IStore;
|
||||||
|
if (storeType === StoreType.IndexedDB) {
|
||||||
|
const idbStore = new IndexedDBStore({ indexedDB: fakeIndexedDB });
|
||||||
|
await idbStore.startup();
|
||||||
|
store = idbStore;
|
||||||
|
} else {
|
||||||
|
store = new MemoryStore();
|
||||||
|
}
|
||||||
|
|
||||||
|
client = new MatrixClient({
|
||||||
|
baseUrl: "https://my.home.server",
|
||||||
|
accessToken: "my.access.token",
|
||||||
|
request: httpBackend.requestFn as IHttpOpts["request"],
|
||||||
|
store,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(function() {
|
||||||
|
jest.useRealTimers();
|
||||||
|
client.stopClient();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("sends a to-device message", async function() {
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).check((request) => {
|
||||||
|
expect(request.data).toEqual(EXPECTED_BODY);
|
||||||
|
}).respond(200, {});
|
||||||
|
|
||||||
|
await client.queueToDevice({
|
||||||
|
eventType: "org.example.foo",
|
||||||
|
batch: [
|
||||||
|
FAKE_MSG,
|
||||||
|
],
|
||||||
|
});
|
||||||
|
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("retries on error", async function() {
|
||||||
|
jest.useFakeTimers();
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).respond(500);
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).check((request) => {
|
||||||
|
expect(request.data).toEqual(EXPECTED_BODY);
|
||||||
|
}).respond(200, {});
|
||||||
|
|
||||||
|
await client.queueToDevice({
|
||||||
|
eventType: "org.example.foo",
|
||||||
|
batch: [
|
||||||
|
FAKE_MSG,
|
||||||
|
],
|
||||||
|
});
|
||||||
|
await flushAndRunTimersUntil(() => httpBackend.requests.length > 0);
|
||||||
|
expect(httpBackend.flushSync(null, 1)).toEqual(1);
|
||||||
|
|
||||||
|
await flushAndRunTimersUntil(() => httpBackend.requests.length > 0);
|
||||||
|
|
||||||
|
expect(httpBackend.flushSync(null, 1)).toEqual(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("stops retrying on 4xx errors", async function() {
|
||||||
|
jest.useFakeTimers();
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).respond(400);
|
||||||
|
|
||||||
|
await client.queueToDevice({
|
||||||
|
eventType: "org.example.foo",
|
||||||
|
batch: [
|
||||||
|
FAKE_MSG,
|
||||||
|
],
|
||||||
|
});
|
||||||
|
await flushAndRunTimersUntil(() => httpBackend.requests.length > 0);
|
||||||
|
expect(httpBackend.flushSync(null, 1)).toEqual(1);
|
||||||
|
|
||||||
|
// Asserting that another request is never made is obviously
|
||||||
|
// a bit tricky - we just flush the queue what should hopefully
|
||||||
|
// be plenty of times and assert that nothing comes through.
|
||||||
|
let tries = 0;
|
||||||
|
await flushAndRunTimersUntil(() => ++tries === 10);
|
||||||
|
|
||||||
|
expect(httpBackend.requests.length).toEqual(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("honours ratelimiting", async function() {
|
||||||
|
jest.useFakeTimers();
|
||||||
|
|
||||||
|
// pick something obscure enough it's unlikley to clash with a
|
||||||
|
// retry delay the algorithm uses anyway
|
||||||
|
const retryDelay = 279 * 1000;
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).respond(429, {
|
||||||
|
errcode: "M_LIMIT_EXCEEDED",
|
||||||
|
retry_after_ms: retryDelay,
|
||||||
|
});
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).respond(200, {});
|
||||||
|
|
||||||
|
await client.queueToDevice({
|
||||||
|
eventType: "org.example.foo",
|
||||||
|
batch: [
|
||||||
|
FAKE_MSG,
|
||||||
|
],
|
||||||
|
});
|
||||||
|
await flushAndRunTimersUntil(() => httpBackend.requests.length > 0);
|
||||||
|
expect(httpBackend.flushSync(null, 1)).toEqual(1);
|
||||||
|
await flushPromises();
|
||||||
|
|
||||||
|
logger.info("Advancing clock to just before expected retry time...");
|
||||||
|
|
||||||
|
jest.advanceTimersByTime(retryDelay - 1000);
|
||||||
|
await flushPromises();
|
||||||
|
|
||||||
|
expect(httpBackend.requests.length).toEqual(0);
|
||||||
|
|
||||||
|
logger.info("Advancing clock past expected retry time...");
|
||||||
|
|
||||||
|
jest.advanceTimersByTime(2000);
|
||||||
|
await flushPromises();
|
||||||
|
|
||||||
|
expect(httpBackend.flushSync(null, 1)).toEqual(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("retries on retryImmediately()", async function() {
|
||||||
|
httpBackend.when("GET", "/_matrix/client/versions").respond(200, {
|
||||||
|
versions: ["r0.0.1"],
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.all([client.startClient(), httpBackend.flush(null, 1, 20)]);
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).respond(500);
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).respond(200, {});
|
||||||
|
|
||||||
|
await client.queueToDevice({
|
||||||
|
eventType: "org.example.foo",
|
||||||
|
batch: [
|
||||||
|
FAKE_MSG,
|
||||||
|
],
|
||||||
|
});
|
||||||
|
expect(await httpBackend.flush(null, 1, 1)).toEqual(1);
|
||||||
|
await flushPromises();
|
||||||
|
|
||||||
|
client.retryImmediately();
|
||||||
|
|
||||||
|
expect(await httpBackend.flush(null, 1, 20)).toEqual(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("retries on when client is started", async function() {
|
||||||
|
httpBackend.when("GET", "/_matrix/client/versions").respond(200, {
|
||||||
|
versions: ["r0.0.1"],
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]);
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).respond(500);
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).respond(200, {});
|
||||||
|
|
||||||
|
await client.queueToDevice({
|
||||||
|
eventType: "org.example.foo",
|
||||||
|
batch: [
|
||||||
|
FAKE_MSG,
|
||||||
|
],
|
||||||
|
});
|
||||||
|
expect(await httpBackend.flush(null, 1, 1)).toEqual(1);
|
||||||
|
await flushPromises();
|
||||||
|
|
||||||
|
client.stopClient();
|
||||||
|
await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]);
|
||||||
|
|
||||||
|
expect(await httpBackend.flush(null, 1, 20)).toEqual(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("retries when a message is retried", async function() {
|
||||||
|
httpBackend.when("GET", "/_matrix/client/versions").respond(200, {
|
||||||
|
versions: ["r0.0.1"],
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.all([client.startClient(), httpBackend.flush(null, 1, 20)]);
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).respond(500);
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).respond(200, {});
|
||||||
|
|
||||||
|
await client.queueToDevice({
|
||||||
|
eventType: "org.example.foo",
|
||||||
|
batch: [
|
||||||
|
FAKE_MSG,
|
||||||
|
],
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(await httpBackend.flush(null, 1, 1)).toEqual(1);
|
||||||
|
await flushPromises();
|
||||||
|
|
||||||
|
const dummyEvent = new MatrixEvent({
|
||||||
|
event_id: "!fake:example.org",
|
||||||
|
});
|
||||||
|
const mockRoom = {
|
||||||
|
updatePendingEvent: jest.fn(),
|
||||||
|
} as unknown as Room;
|
||||||
|
client.resendEvent(dummyEvent, mockRoom);
|
||||||
|
|
||||||
|
expect(await httpBackend.flush(null, 1, 20)).toEqual(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("splits many messages into multiple HTTP requests", async function() {
|
||||||
|
const batch: ToDeviceBatch = {
|
||||||
|
eventType: "org.example.foo",
|
||||||
|
batch: [],
|
||||||
|
};
|
||||||
|
|
||||||
|
for (let i = 0; i <= 20; ++i) {
|
||||||
|
batch.batch.push({
|
||||||
|
userId: `@user${i}:example.org`,
|
||||||
|
deviceId: FAKE_DEVICE_ID,
|
||||||
|
payload: FAKE_PAYLOAD,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).check((request) => {
|
||||||
|
expect(Object.keys(request.data.messages).length).toEqual(20);
|
||||||
|
}).respond(200, {});
|
||||||
|
|
||||||
|
httpBackend.when(
|
||||||
|
"PUT", "/sendToDevice/org.example.foo/",
|
||||||
|
).check((request) => {
|
||||||
|
expect(Object.keys(request.data.messages).length).toEqual(1);
|
||||||
|
}).respond(200, {});
|
||||||
|
|
||||||
|
await client.queueToDevice(batch);
|
||||||
|
await httpBackend.flushAllExpected();
|
||||||
|
});
|
||||||
|
});
|
125
src/ToDeviceMessageQueue.ts
Normal file
125
src/ToDeviceMessageQueue.ts
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { logger } from "./logger";
|
||||||
|
import { MatrixClient } from "./matrix";
|
||||||
|
import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage";
|
||||||
|
import { MatrixScheduler } from "./scheduler";
|
||||||
|
|
||||||
|
const MAX_BATCH_SIZE = 20;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maintains a queue of outgoing to-device messages, sending them
|
||||||
|
* as soon as the homeserver is reachable.
|
||||||
|
*/
|
||||||
|
export class ToDeviceMessageQueue {
|
||||||
|
private sending = false;
|
||||||
|
private running = true;
|
||||||
|
private retryTimeout: number = null;
|
||||||
|
private retryAttempts = 0;
|
||||||
|
|
||||||
|
constructor(private client: MatrixClient) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public start(): void {
|
||||||
|
this.running = true;
|
||||||
|
this.sendQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
public stop(): void {
|
||||||
|
this.running = false;
|
||||||
|
if (this.retryTimeout !== null) clearTimeout(this.retryTimeout);
|
||||||
|
this.retryTimeout = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async queueBatch(batch: ToDeviceBatch): Promise<void> {
|
||||||
|
const batches: ToDeviceBatchWithTxnId[] = [];
|
||||||
|
for (let i = 0; i < batch.batch.length; i += MAX_BATCH_SIZE) {
|
||||||
|
batches.push({
|
||||||
|
eventType: batch.eventType,
|
||||||
|
batch: batch.batch.slice(i, i + MAX_BATCH_SIZE),
|
||||||
|
txnId: this.client.makeTxnId(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.client.store.saveToDeviceBatches(batches);
|
||||||
|
this.sendQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
public sendQueue = async (): Promise<void> => {
|
||||||
|
if (this.retryTimeout !== null) clearTimeout(this.retryTimeout);
|
||||||
|
this.retryTimeout = null;
|
||||||
|
|
||||||
|
if (this.sending || !this.running) return;
|
||||||
|
|
||||||
|
logger.debug("Attempting to send queued to-device messages");
|
||||||
|
|
||||||
|
this.sending = true;
|
||||||
|
let headBatch;
|
||||||
|
try {
|
||||||
|
while (this.running) {
|
||||||
|
headBatch = await this.client.store.getOldestToDeviceBatch();
|
||||||
|
if (headBatch === null) break;
|
||||||
|
await this.sendBatch(headBatch);
|
||||||
|
await this.client.store.removeToDeviceBatch(headBatch.id);
|
||||||
|
this.retryAttempts = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure we're still running after the async tasks: if not, stop.
|
||||||
|
if (!this.running) return;
|
||||||
|
|
||||||
|
logger.debug("All queued to-device messages sent");
|
||||||
|
} catch (e) {
|
||||||
|
++this.retryAttempts;
|
||||||
|
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||||
|
// eslint-disable-next-line new-cap
|
||||||
|
const retryDelay = MatrixScheduler.RETRY_BACKOFF_RATELIMIT(null, this.retryAttempts, e);
|
||||||
|
if (retryDelay === -1) {
|
||||||
|
// the scheduler function doesn't differentiate between fatal errors and just getting
|
||||||
|
// bored and giving up for now
|
||||||
|
if (Math.floor(e.httpStatus / 100) === 4) {
|
||||||
|
logger.error("Fatal error when sending to-device message - dropping to-device batch!", e);
|
||||||
|
await this.client.store.removeToDeviceBatch(headBatch.id);
|
||||||
|
} else {
|
||||||
|
logger.info("Automatic retry limit reached for to-device messages.");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`Failed to send batch of to-device messages. Will retry in ${retryDelay}ms`, e);
|
||||||
|
this.retryTimeout = setTimeout(this.sendQueue, retryDelay);
|
||||||
|
} finally {
|
||||||
|
this.sending = false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempts to send a batch of to-device messages.
|
||||||
|
*/
|
||||||
|
private async sendBatch(batch: IndexedToDeviceBatch): Promise<void> {
|
||||||
|
const contentMap: Record<string, Record<string, ToDevicePayload>> = {};
|
||||||
|
for (const item of batch.batch) {
|
||||||
|
if (!contentMap[item.userId]) {
|
||||||
|
contentMap[item.userId] = {};
|
||||||
|
}
|
||||||
|
contentMap[item.userId][item.deviceId] = item.payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`Sending batch of ${batch.batch.length} to-device messages with ID ${batch.id}`);
|
||||||
|
|
||||||
|
await this.client.sendToDevice(batch.eventType, contentMap, batch.txnId);
|
||||||
|
}
|
||||||
|
}
|
@@ -194,6 +194,8 @@ import { MSC3575SlidingSyncRequest, MSC3575SlidingSyncResponse, SlidingSync } fr
|
|||||||
import { SlidingSyncSdk } from "./sliding-sync-sdk";
|
import { SlidingSyncSdk } from "./sliding-sync-sdk";
|
||||||
import { Thread, THREAD_RELATION_TYPE } from "./models/thread";
|
import { Thread, THREAD_RELATION_TYPE } from "./models/thread";
|
||||||
import { MBeaconInfoEventContent, M_BEACON_INFO } from "./@types/beacon";
|
import { MBeaconInfoEventContent, M_BEACON_INFO } from "./@types/beacon";
|
||||||
|
import { ToDeviceMessageQueue } from "./ToDeviceMessageQueue";
|
||||||
|
import { ToDeviceBatch } from "./models/ToDeviceMessage";
|
||||||
|
|
||||||
export type Store = IStore;
|
export type Store = IStore;
|
||||||
|
|
||||||
@@ -939,6 +941,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
protected mediaHandler = new MediaHandler(this);
|
protected mediaHandler = new MediaHandler(this);
|
||||||
protected pendingEventEncryption = new Map<string, Promise<void>>();
|
protected pendingEventEncryption = new Map<string, Promise<void>>();
|
||||||
|
|
||||||
|
private toDeviceMessageQueue: ToDeviceMessageQueue;
|
||||||
|
|
||||||
constructor(opts: IMatrixClientCreateOpts) {
|
constructor(opts: IMatrixClientCreateOpts) {
|
||||||
super();
|
super();
|
||||||
|
|
||||||
@@ -1033,6 +1037,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
// we don't want to start sending unencrypted events to them.
|
// we don't want to start sending unencrypted events to them.
|
||||||
this.roomList = new RoomList(this.cryptoStore);
|
this.roomList = new RoomList(this.cryptoStore);
|
||||||
|
|
||||||
|
this.toDeviceMessageQueue = new ToDeviceMessageQueue(this);
|
||||||
|
|
||||||
// The SDK doesn't really provide a clean way for events to recalculate the push
|
// The SDK doesn't really provide a clean way for events to recalculate the push
|
||||||
// actions for themselves, so we have to kinda help them out when they are encrypted.
|
// actions for themselves, so we have to kinda help them out when they are encrypted.
|
||||||
// We do this so that push rules are correctly executed on events in their decrypted
|
// We do this so that push rules are correctly executed on events in their decrypted
|
||||||
@@ -1196,6 +1202,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
}, 1000 * this.clientOpts.clientWellKnownPollPeriod);
|
}, 1000 * this.clientOpts.clientWellKnownPollPeriod);
|
||||||
this.fetchClientWellKnown();
|
this.fetchClientWellKnown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.toDeviceMessageQueue.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -1223,6 +1231,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
if (this.clientWellKnownIntervalID !== undefined) {
|
if (this.clientWellKnownIntervalID !== undefined) {
|
||||||
global.clearInterval(this.clientWellKnownIntervalID);
|
global.clearInterval(this.clientWellKnownIntervalID);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.toDeviceMessageQueue.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -1561,9 +1571,13 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
/**
|
/**
|
||||||
* Retry a backed off syncing request immediately. This should only be used when
|
* Retry a backed off syncing request immediately. This should only be used when
|
||||||
* the user <b>explicitly</b> attempts to retry their lost connection.
|
* the user <b>explicitly</b> attempts to retry their lost connection.
|
||||||
|
* Will also retry any outbound to-device messages currently in the queue to be sent
|
||||||
|
* (retries of regular outgoing events are handled separately, per-event).
|
||||||
* @return {boolean} True if this resulted in a request being retried.
|
* @return {boolean} True if this resulted in a request being retried.
|
||||||
*/
|
*/
|
||||||
public retryImmediately(): boolean {
|
public retryImmediately(): boolean {
|
||||||
|
// don't await for this promise: we just want to kick it off
|
||||||
|
this.toDeviceMessageQueue.sendQueue();
|
||||||
return this.syncApi.retryImmediately();
|
return this.syncApi.retryImmediately();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3500,7 +3514,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resend an event.
|
* Resend an event. Will also retry any to-device messages waiting to be sent.
|
||||||
* @param {MatrixEvent} event The event to resend.
|
* @param {MatrixEvent} event The event to resend.
|
||||||
* @param {Room} room Optional. The room the event is in. Will update the
|
* @param {Room} room Optional. The room the event is in. Will update the
|
||||||
* timeline entry if provided.
|
* timeline entry if provided.
|
||||||
@@ -3508,6 +3522,9 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
* @return {module:http-api.MatrixError} Rejects: with an error response.
|
* @return {module:http-api.MatrixError} Rejects: with an error response.
|
||||||
*/
|
*/
|
||||||
public resendEvent(event: MatrixEvent, room: Room): Promise<ISendEventResponse> {
|
public resendEvent(event: MatrixEvent, room: Room): Promise<ISendEventResponse> {
|
||||||
|
// also kick the to-device queue to retry
|
||||||
|
this.toDeviceMessageQueue.sendQueue();
|
||||||
|
|
||||||
this.updatePendingEventStatus(room, event, EventStatus.SENDING);
|
this.updatePendingEventStatus(room, event, EventStatus.SENDING);
|
||||||
return this.encryptAndSendEvent(room, event);
|
return this.encryptAndSendEvent(room, event);
|
||||||
}
|
}
|
||||||
@@ -8694,7 +8711,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send an event to a specific list of devices
|
* Send an event to a specific list of devices.
|
||||||
|
* This is a low-level API that simply wraps the HTTP API
|
||||||
|
* call to send to-device messages. We recommend using
|
||||||
|
* queueToDevice() which is a higher level API.
|
||||||
*
|
*
|
||||||
* @param {string} eventType type of event to send
|
* @param {string} eventType type of event to send
|
||||||
* @param {Object.<string, Object<string, Object>>} contentMap
|
* @param {Object.<string, Object<string, Object>>} contentMap
|
||||||
@@ -8726,6 +8746,17 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
return this.http.authedRequest(undefined, Method.Put, path, undefined, body);
|
return this.http.authedRequest(undefined, Method.Put, path, undefined, body);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends events directly to specific devices using Matrix's to-device
|
||||||
|
* messaging system. The batch will be split up into appropriately sized
|
||||||
|
* batches for sending and stored in the store so they can be retried
|
||||||
|
* later if they fail to send. Retries will happen automatically.
|
||||||
|
* @param batch The to-device messages to send
|
||||||
|
*/
|
||||||
|
public queueToDevice(batch: ToDeviceBatch): Promise<void> {
|
||||||
|
return this.toDeviceMessageQueue.queueBatch(batch);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the third party protocols that can be reached using
|
* Get the third party protocols that can be reached using
|
||||||
* this HS
|
* this HS
|
||||||
|
@@ -22,6 +22,7 @@ limitations under the License.
|
|||||||
|
|
||||||
import { logger } from '../../logger';
|
import { logger } from '../../logger';
|
||||||
import * as olmlib from "../olmlib";
|
import * as olmlib from "../olmlib";
|
||||||
|
import { EventType } from '../../@types/event';
|
||||||
import {
|
import {
|
||||||
DecryptionAlgorithm,
|
DecryptionAlgorithm,
|
||||||
DecryptionError,
|
DecryptionError,
|
||||||
@@ -37,6 +38,7 @@ import { IOlmSessionResult } from "../olmlib";
|
|||||||
import { DeviceInfoMap } from "../DeviceList";
|
import { DeviceInfoMap } from "../DeviceList";
|
||||||
import { MatrixEvent } from "../..";
|
import { MatrixEvent } from "../..";
|
||||||
import { IEventDecryptionResult, IMegolmSessionData, IncomingRoomKeyRequest } from "../index";
|
import { IEventDecryptionResult, IMegolmSessionData, IncomingRoomKeyRequest } from "../index";
|
||||||
|
import { ToDeviceBatch, ToDeviceMessage } from '../../models/ToDeviceMessage';
|
||||||
|
|
||||||
// determine whether the key can be shared with invitees
|
// determine whether the key can be shared with invitees
|
||||||
export function isRoomSharedHistory(room: Room): boolean {
|
export function isRoomSharedHistory(room: Room): boolean {
|
||||||
@@ -609,7 +611,11 @@ class MegolmEncryption extends EncryptionAlgorithm {
|
|||||||
userDeviceMap: IOlmDevice[],
|
userDeviceMap: IOlmDevice[],
|
||||||
payload: IPayload,
|
payload: IPayload,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const contentMap: Record<string, Record<string, IEncryptedContent>> = {};
|
const toDeviceBatch: ToDeviceBatch = {
|
||||||
|
eventType: EventType.RoomMessageEncrypted,
|
||||||
|
batch: [],
|
||||||
|
};
|
||||||
|
|
||||||
// Map from userId to a map of deviceId to deviceInfo
|
// Map from userId to a map of deviceId to deviceInfo
|
||||||
const deviceInfoByUserIdAndDeviceId = new Map<string, Map<string, DeviceInfo>>();
|
const deviceInfoByUserIdAndDeviceId = new Map<string, Map<string, DeviceInfo>>();
|
||||||
|
|
||||||
@@ -637,10 +643,11 @@ class MegolmEncryption extends EncryptionAlgorithm {
|
|||||||
// We hold by reference, this updates deviceInfoByUserIdAndDeviceId[userId]
|
// We hold by reference, this updates deviceInfoByUserIdAndDeviceId[userId]
|
||||||
userIdDeviceInfo.set(deviceId, deviceInfo);
|
userIdDeviceInfo.set(deviceId, deviceInfo);
|
||||||
|
|
||||||
if (!contentMap[userId]) {
|
toDeviceBatch.batch.push({
|
||||||
contentMap[userId] = {};
|
userId,
|
||||||
}
|
deviceId,
|
||||||
contentMap[userId][deviceId] = encryptedContent;
|
payload: encryptedContent,
|
||||||
|
});
|
||||||
|
|
||||||
promises.push(
|
promises.push(
|
||||||
olmlib.encryptMessageForDevice(
|
olmlib.encryptMessageForDevice(
|
||||||
@@ -660,40 +667,29 @@ class MegolmEncryption extends EncryptionAlgorithm {
|
|||||||
// in which case it will have just not added anything to the ciphertext object.
|
// in which case it will have just not added anything to the ciphertext object.
|
||||||
// There's no point sending messages to devices if we couldn't encrypt to them,
|
// There's no point sending messages to devices if we couldn't encrypt to them,
|
||||||
// since that's effectively a blank message.
|
// since that's effectively a blank message.
|
||||||
for (const userId of Object.keys(contentMap)) {
|
const prunedBatch: ToDeviceMessage[] = [];
|
||||||
for (const deviceId of Object.keys(contentMap[userId])) {
|
for (const msg of toDeviceBatch.batch) {
|
||||||
if (Object.keys(contentMap[userId][deviceId].ciphertext).length === 0) {
|
if (Object.keys(msg.payload.ciphertext).length > 0) {
|
||||||
logger.log(
|
prunedBatch.push(msg);
|
||||||
"No ciphertext for device " +
|
} else {
|
||||||
userId + ":" + deviceId + ": pruning",
|
logger.log(
|
||||||
);
|
"No ciphertext for device " +
|
||||||
delete contentMap[userId][deviceId];
|
msg.userId + ":" + msg.deviceId + ": pruning",
|
||||||
}
|
);
|
||||||
}
|
|
||||||
// No devices left for that user? Strip that too.
|
|
||||||
if (Object.keys(contentMap[userId]).length === 0) {
|
|
||||||
logger.log("Pruned all devices for user " + userId);
|
|
||||||
delete contentMap[userId];
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Is there anything left?
|
toDeviceBatch.batch = prunedBatch;
|
||||||
if (Object.keys(contentMap).length === 0) {
|
|
||||||
logger.log("No users left to send to: aborting");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
return this.baseApis.sendToDevice("m.room.encrypted", contentMap).then(() => {
|
return this.baseApis.queueToDevice(toDeviceBatch).then(() => {
|
||||||
// store that we successfully uploaded the keys of the current slice
|
// store that we successfully uploaded the keys of the current slice
|
||||||
for (const userId of Object.keys(contentMap)) {
|
for (const msg of toDeviceBatch.batch) {
|
||||||
for (const deviceId of Object.keys(contentMap[userId])) {
|
session.markSharedWithDevice(
|
||||||
session.markSharedWithDevice(
|
msg.userId,
|
||||||
userId,
|
msg.deviceId,
|
||||||
deviceId,
|
deviceInfoByUserIdAndDeviceId.get(msg.userId).get(msg.deviceId).getIdentityKey(),
|
||||||
deviceInfoByUserIdAndDeviceId.get(userId).get(deviceId).getIdentityKey(),
|
chainIndex,
|
||||||
chainIndex,
|
);
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
38
src/models/ToDeviceMessage.ts
Normal file
38
src/models/ToDeviceMessage.ts
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
export type ToDevicePayload = Record<string, any>;
|
||||||
|
|
||||||
|
export interface ToDeviceMessage {
|
||||||
|
userId: string;
|
||||||
|
deviceId: string;
|
||||||
|
payload: ToDevicePayload;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ToDeviceBatch {
|
||||||
|
eventType: string;
|
||||||
|
batch: ToDeviceMessage[];
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only used internally
|
||||||
|
export interface ToDeviceBatchWithTxnId extends ToDeviceBatch {
|
||||||
|
txnId: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only used internally
|
||||||
|
export interface IndexedToDeviceBatch extends ToDeviceBatchWithTxnId {
|
||||||
|
id: number;
|
||||||
|
}
|
@@ -57,7 +57,7 @@ export class MatrixScheduler<T = ISendEventResponse> {
|
|||||||
* failure was due to a rate limited request, the time specified in the error is
|
* failure was due to a rate limited request, the time specified in the error is
|
||||||
* waited before being retried.
|
* waited before being retried.
|
||||||
* @param {MatrixEvent} event
|
* @param {MatrixEvent} event
|
||||||
* @param {Number} attempts
|
* @param {Number} attempts Number of attempts that have been made, including the one that just failed (ie. starting at 1)
|
||||||
* @param {MatrixError} err
|
* @param {MatrixError} err
|
||||||
* @return {Number}
|
* @return {Number}
|
||||||
* @see module:scheduler~retryAlgorithm
|
* @see module:scheduler~retryAlgorithm
|
||||||
|
@@ -23,6 +23,7 @@ import { RoomSummary } from "../models/room-summary";
|
|||||||
import { IMinimalEvent, IRooms, ISyncResponse } from "../sync-accumulator";
|
import { IMinimalEvent, IRooms, ISyncResponse } from "../sync-accumulator";
|
||||||
import { IStartClientOpts } from "../client";
|
import { IStartClientOpts } from "../client";
|
||||||
import { IStateEventWithRoomId } from "../@types/search";
|
import { IStateEventWithRoomId } from "../@types/search";
|
||||||
|
import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage";
|
||||||
|
|
||||||
export interface ISavedSync {
|
export interface ISavedSync {
|
||||||
nextBatch: string;
|
nextBatch: string;
|
||||||
@@ -31,8 +32,7 @@ export interface ISavedSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a stub store. This does no-ops on most store methods.
|
* A store for most of the data js-sdk needs to store, apart from crypto data
|
||||||
* @constructor
|
|
||||||
*/
|
*/
|
||||||
export interface IStore {
|
export interface IStore {
|
||||||
readonly accountData: Record<string, MatrixEvent>; // type : content
|
readonly accountData: Record<string, MatrixEvent>; // type : content
|
||||||
@@ -57,21 +57,21 @@ export interface IStore {
|
|||||||
setSyncToken(token: string): void;
|
setSyncToken(token: string): void;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* No-op.
|
* Store the given room.
|
||||||
* @param {Room} room
|
* @param {Room} room The room to be stored. All properties must be stored.
|
||||||
*/
|
*/
|
||||||
storeRoom(room: Room): void;
|
storeRoom(room: Room): void;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* No-op.
|
* Retrieve a room by its' room ID.
|
||||||
* @param {string} roomId
|
* @param {string} roomId The room ID.
|
||||||
* @return {null}
|
* @return {Room} The room or null.
|
||||||
*/
|
*/
|
||||||
getRoom(roomId: string): Room | null;
|
getRoom(roomId: string): Room | null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* No-op.
|
* Retrieve all known rooms.
|
||||||
* @return {Array} An empty array.
|
* @return {Room[]} A list of rooms, which may be empty.
|
||||||
*/
|
*/
|
||||||
getRooms(): Room[];
|
getRooms(): Room[];
|
||||||
|
|
||||||
@@ -82,35 +82,36 @@ export interface IStore {
|
|||||||
removeRoom(roomId: string): void;
|
removeRoom(roomId: string): void;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* No-op.
|
* Retrieve a summary of all the rooms.
|
||||||
* @return {Array} An empty array.
|
* @return {RoomSummary[]} A summary of each room.
|
||||||
*/
|
*/
|
||||||
getRoomSummaries(): RoomSummary[];
|
getRoomSummaries(): RoomSummary[];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* No-op.
|
* Store a User.
|
||||||
* @param {User} user
|
* @param {User} user The user to store.
|
||||||
*/
|
*/
|
||||||
storeUser(user: User): void;
|
storeUser(user: User): void;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* No-op.
|
* Retrieve a User by its' user ID.
|
||||||
* @param {string} userId
|
* @param {string} userId The user ID.
|
||||||
* @return {null}
|
* @return {User} The user or null.
|
||||||
*/
|
*/
|
||||||
getUser(userId: string): User | null;
|
getUser(userId: string): User | null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* No-op.
|
* Retrieve all known users.
|
||||||
* @return {User[]}
|
* @return {User[]} A list of users, which may be empty.
|
||||||
*/
|
*/
|
||||||
getUsers(): User[];
|
getUsers(): User[];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* No-op.
|
* Retrieve scrollback for this room.
|
||||||
* @param {Room} room
|
* @param {Room} room The matrix room
|
||||||
* @param {number} limit
|
* @param {number} limit The max number of old events to retrieve.
|
||||||
* @return {Array}
|
* @return {Array<Object>} An array of objects which will be at most 'limit'
|
||||||
|
* length and at least 0. The objects are the raw event JSON.
|
||||||
*/
|
*/
|
||||||
scrollback(room: Room, limit: number): MatrixEvent[];
|
scrollback(room: Room, limit: number): MatrixEvent[];
|
||||||
|
|
||||||
@@ -209,8 +210,23 @@ export interface IStore {
|
|||||||
*/
|
*/
|
||||||
deleteAllData(): Promise<void>;
|
deleteAllData(): Promise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the out-of-band membership events for this room that
|
||||||
|
* were previously loaded.
|
||||||
|
* @param {string} roomId
|
||||||
|
* @returns {event[]} the events, potentially an empty array if OOB loading didn't yield any new members
|
||||||
|
* @returns {null} in case the members for this room haven't been stored yet
|
||||||
|
*/
|
||||||
getOutOfBandMembers(roomId: string): Promise<IStateEventWithRoomId[] | null>;
|
getOutOfBandMembers(roomId: string): Promise<IStateEventWithRoomId[] | null>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores the out-of-band membership events for this room. Note that
|
||||||
|
* it still makes sense to store an empty array as the OOB status for the room is
|
||||||
|
* marked as fetched, and getOutOfBandMembers will return an empty array instead of null
|
||||||
|
* @param {string} roomId
|
||||||
|
* @param {event[]} membershipEvents the membership events to store
|
||||||
|
* @returns {Promise} when all members have been stored
|
||||||
|
*/
|
||||||
setOutOfBandMembers(roomId: string, membershipEvents: IStateEventWithRoomId[]): Promise<void>;
|
setOutOfBandMembers(roomId: string, membershipEvents: IStateEventWithRoomId[]): Promise<void>;
|
||||||
|
|
||||||
clearOutOfBandMembers(roomId: string): Promise<void>;
|
clearOutOfBandMembers(roomId: string): Promise<void>;
|
||||||
@@ -222,4 +238,19 @@ export interface IStore {
|
|||||||
getPendingEvents(roomId: string): Promise<Partial<IEvent>[]>;
|
getPendingEvents(roomId: string): Promise<Partial<IEvent>[]>;
|
||||||
|
|
||||||
setPendingEvents(roomId: string, events: Partial<IEvent>[]): Promise<void>;
|
setPendingEvents(roomId: string, events: Partial<IEvent>[]): Promise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores batches of outgoing to-device messages
|
||||||
|
*/
|
||||||
|
saveToDeviceBatches(batch: ToDeviceBatchWithTxnId[]): Promise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches the oldest batch of to-device messages in the queue
|
||||||
|
*/
|
||||||
|
getOldestToDeviceBatch(): Promise<IndexedToDeviceBatch>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a specific batch of to-device messages from the queue
|
||||||
|
*/
|
||||||
|
removeToDeviceBatch(id: number): Promise<void>;
|
||||||
}
|
}
|
||||||
|
@@ -16,6 +16,7 @@ limitations under the License.
|
|||||||
|
|
||||||
import { ISavedSync } from "./index";
|
import { ISavedSync } from "./index";
|
||||||
import { IEvent, IStartClientOpts, IStateEventWithRoomId, ISyncResponse } from "..";
|
import { IEvent, IStartClientOpts, IStateEventWithRoomId, ISyncResponse } from "..";
|
||||||
|
import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage";
|
||||||
|
|
||||||
export interface IIndexedDBBackend {
|
export interface IIndexedDBBackend {
|
||||||
connect(): Promise<void>;
|
connect(): Promise<void>;
|
||||||
@@ -31,6 +32,9 @@ export interface IIndexedDBBackend {
|
|||||||
getUserPresenceEvents(): Promise<UserTuple[]>;
|
getUserPresenceEvents(): Promise<UserTuple[]>;
|
||||||
getClientOptions(): Promise<IStartClientOpts>;
|
getClientOptions(): Promise<IStartClientOpts>;
|
||||||
storeClientOptions(options: IStartClientOpts): Promise<void>;
|
storeClientOptions(options: IStartClientOpts): Promise<void>;
|
||||||
|
saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise<void>;
|
||||||
|
getOldestToDeviceBatch(): Promise<IndexedToDeviceBatch>;
|
||||||
|
removeToDeviceBatch(id: number): Promise<void>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type UserTuple = [userId: string, presenceEvent: Partial<IEvent>];
|
export type UserTuple = [userId: string, presenceEvent: Partial<IEvent>];
|
||||||
|
@@ -21,8 +21,9 @@ import { logger } from '../logger';
|
|||||||
import { IStartClientOpts, IStateEventWithRoomId } from "..";
|
import { IStartClientOpts, IStateEventWithRoomId } from "..";
|
||||||
import { ISavedSync } from "./index";
|
import { ISavedSync } from "./index";
|
||||||
import { IIndexedDBBackend, UserTuple } from "./indexeddb-backend";
|
import { IIndexedDBBackend, UserTuple } from "./indexeddb-backend";
|
||||||
|
import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage";
|
||||||
|
|
||||||
const VERSION = 3;
|
const VERSION = 4;
|
||||||
|
|
||||||
function createDatabase(db: IDBDatabase): void {
|
function createDatabase(db: IDBDatabase): void {
|
||||||
// Make user store, clobber based on user ID. (userId property of User objects)
|
// Make user store, clobber based on user ID. (userId property of User objects)
|
||||||
@@ -49,6 +50,10 @@ function upgradeSchemaV3(db: IDBDatabase): void {
|
|||||||
{ keyPath: ["clobber"] });
|
{ keyPath: ["clobber"] });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function upgradeSchemaV4(db: IDBDatabase): void {
|
||||||
|
db.createObjectStore("to_device_queue", { autoIncrement: true });
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method to collect results from a Cursor and promiseify it.
|
* Helper method to collect results from a Cursor and promiseify it.
|
||||||
* @param {ObjectStore|Index} store The store to perform openCursor on.
|
* @param {ObjectStore|Index} store The store to perform openCursor on.
|
||||||
@@ -112,7 +117,7 @@ function reqAsPromise(req: IDBRequest): Promise<IDBRequest> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function reqAsCursorPromise(req: IDBRequest<IDBCursor | null>): Promise<IDBCursor> {
|
function reqAsCursorPromise<T>(req: IDBRequest<T>): Promise<T> {
|
||||||
return reqAsEventPromise(req).then((event) => req.result);
|
return reqAsEventPromise(req).then((event) => req.result);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -177,6 +182,9 @@ export class LocalIndexedDBStoreBackend implements IIndexedDBBackend {
|
|||||||
if (oldVersion < 3) {
|
if (oldVersion < 3) {
|
||||||
upgradeSchemaV3(db);
|
upgradeSchemaV3(db);
|
||||||
}
|
}
|
||||||
|
if (oldVersion < 4) {
|
||||||
|
upgradeSchemaV4(db);
|
||||||
|
}
|
||||||
// Expand as needed.
|
// Expand as needed.
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -561,4 +569,36 @@ export class LocalIndexedDBStoreBackend implements IIndexedDBBackend {
|
|||||||
}); // put == UPSERT
|
}); // put == UPSERT
|
||||||
await txnAsPromise(txn);
|
await txnAsPromise(txn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise<void> {
|
||||||
|
const txn = this.db.transaction(["to_device_queue"], "readwrite");
|
||||||
|
const store = txn.objectStore("to_device_queue");
|
||||||
|
for (const batch of batches) {
|
||||||
|
store.add(batch);
|
||||||
|
}
|
||||||
|
await txnAsPromise(txn);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getOldestToDeviceBatch(): Promise<IndexedToDeviceBatch | null> {
|
||||||
|
const txn = this.db.transaction(["to_device_queue"], "readonly");
|
||||||
|
const store = txn.objectStore("to_device_queue");
|
||||||
|
const cursor = await reqAsCursorPromise(store.openCursor());
|
||||||
|
if (!cursor) return null;
|
||||||
|
|
||||||
|
const resultBatch = cursor.value as ToDeviceBatchWithTxnId;
|
||||||
|
|
||||||
|
return {
|
||||||
|
id: cursor.key as number,
|
||||||
|
txnId: resultBatch.txnId,
|
||||||
|
eventType: resultBatch.eventType,
|
||||||
|
batch: resultBatch.batch,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public async removeToDeviceBatch(id: number): Promise<void> {
|
||||||
|
const txn = this.db.transaction(["to_device_queue"], "readwrite");
|
||||||
|
const store = txn.objectStore("to_device_queue");
|
||||||
|
store.delete(id);
|
||||||
|
await txnAsPromise(txn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -20,6 +20,7 @@ import { ISavedSync } from "./index";
|
|||||||
import { IStartClientOpts } from "../client";
|
import { IStartClientOpts } from "../client";
|
||||||
import { IStateEventWithRoomId, ISyncResponse } from "..";
|
import { IStateEventWithRoomId, ISyncResponse } from "..";
|
||||||
import { IIndexedDBBackend, UserTuple } from "./indexeddb-backend";
|
import { IIndexedDBBackend, UserTuple } from "./indexeddb-backend";
|
||||||
|
import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage";
|
||||||
|
|
||||||
export class RemoteIndexedDBStoreBackend implements IIndexedDBBackend {
|
export class RemoteIndexedDBStoreBackend implements IIndexedDBBackend {
|
||||||
private worker: Worker;
|
private worker: Worker;
|
||||||
@@ -133,6 +134,18 @@ export class RemoteIndexedDBStoreBackend implements IIndexedDBBackend {
|
|||||||
return this.doCmd('getUserPresenceEvents');
|
return this.doCmd('getUserPresenceEvents');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise<void> {
|
||||||
|
return this.doCmd('saveToDeviceBatches', [batches]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getOldestToDeviceBatch(): Promise<IndexedToDeviceBatch> {
|
||||||
|
return this.doCmd('getOldestToDeviceBatch');
|
||||||
|
}
|
||||||
|
|
||||||
|
public async removeToDeviceBatch(id: number): Promise<void> {
|
||||||
|
return this.doCmd('removeToDeviceBatch', [id]);
|
||||||
|
}
|
||||||
|
|
||||||
private ensureStarted(): Promise<void> {
|
private ensureStarted(): Promise<void> {
|
||||||
if (this.startPromise === null) {
|
if (this.startPromise === null) {
|
||||||
this.worker = this.workerFactory();
|
this.worker = this.workerFactory();
|
||||||
|
@@ -103,6 +103,15 @@ export class IndexedDBStoreWorker {
|
|||||||
case 'storeClientOptions':
|
case 'storeClientOptions':
|
||||||
prom = this.backend.storeClientOptions(msg.args[0]);
|
prom = this.backend.storeClientOptions(msg.args[0]);
|
||||||
break;
|
break;
|
||||||
|
case 'saveToDeviceBatches':
|
||||||
|
prom = this.backend.saveToDeviceBatches(msg.args[0]);
|
||||||
|
break;
|
||||||
|
case 'getOldestToDeviceBatch':
|
||||||
|
prom = this.backend.getOldestToDeviceBatch();
|
||||||
|
break;
|
||||||
|
case 'removeToDeviceBatch':
|
||||||
|
prom = this.backend.removeToDeviceBatch(msg.args[0]);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (prom === undefined) {
|
if (prom === undefined) {
|
||||||
|
@@ -27,6 +27,7 @@ import { IIndexedDBBackend } from "./indexeddb-backend";
|
|||||||
import { ISyncResponse } from "../sync-accumulator";
|
import { ISyncResponse } from "../sync-accumulator";
|
||||||
import { TypedEventEmitter } from "../models/typed-event-emitter";
|
import { TypedEventEmitter } from "../models/typed-event-emitter";
|
||||||
import { IStateEventWithRoomId } from "../@types/search";
|
import { IStateEventWithRoomId } from "../@types/search";
|
||||||
|
import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is an internal module. See {@link IndexedDBStore} for the public class.
|
* This is an internal module. See {@link IndexedDBStore} for the public class.
|
||||||
@@ -351,6 +352,18 @@ export class IndexedDBStore extends MemoryStore {
|
|||||||
this.localStorage.removeItem(pendingEventsKey(roomId));
|
this.localStorage.removeItem(pendingEventsKey(roomId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise<void> {
|
||||||
|
return this.backend.saveToDeviceBatches(batches);
|
||||||
|
}
|
||||||
|
|
||||||
|
public getOldestToDeviceBatch(): Promise<IndexedToDeviceBatch> {
|
||||||
|
return this.backend.getOldestToDeviceBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
public removeToDeviceBatch(id: number): Promise<void> {
|
||||||
|
return this.backend.removeToDeviceBatch(id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@@ -30,6 +30,7 @@ import { ISavedSync, IStore } from "./index";
|
|||||||
import { RoomSummary } from "../models/room-summary";
|
import { RoomSummary } from "../models/room-summary";
|
||||||
import { ISyncResponse } from "../sync-accumulator";
|
import { ISyncResponse } from "../sync-accumulator";
|
||||||
import { IStateEventWithRoomId } from "../@types/search";
|
import { IStateEventWithRoomId } from "../@types/search";
|
||||||
|
import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage";
|
||||||
|
|
||||||
function isValidFilterId(filterId: string): boolean {
|
function isValidFilterId(filterId: string): boolean {
|
||||||
const isValidStr = typeof filterId === "string" &&
|
const isValidStr = typeof filterId === "string" &&
|
||||||
@@ -64,6 +65,8 @@ export class MemoryStore implements IStore {
|
|||||||
private oobMembers: Record<string, IStateEventWithRoomId[]> = {}; // roomId: [member events]
|
private oobMembers: Record<string, IStateEventWithRoomId[]> = {}; // roomId: [member events]
|
||||||
private pendingEvents: { [roomId: string]: Partial<IEvent>[] } = {};
|
private pendingEvents: { [roomId: string]: Partial<IEvent>[] } = {};
|
||||||
private clientOptions = {};
|
private clientOptions = {};
|
||||||
|
private pendingToDeviceBatches: IndexedToDeviceBatch[] = [];
|
||||||
|
private nextToDeviceBatchId = 0;
|
||||||
|
|
||||||
constructor(opts: IOpts = {}) {
|
constructor(opts: IOpts = {}) {
|
||||||
this.localStorage = opts.localStorage;
|
this.localStorage = opts.localStorage;
|
||||||
@@ -429,4 +432,26 @@ export class MemoryStore implements IStore {
|
|||||||
public async setPendingEvents(roomId: string, events: Partial<IEvent>[]): Promise<void> {
|
public async setPendingEvents(roomId: string, events: Partial<IEvent>[]): Promise<void> {
|
||||||
this.pendingEvents[roomId] = events;
|
this.pendingEvents[roomId] = events;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise<void> {
|
||||||
|
for (const batch of batches) {
|
||||||
|
this.pendingToDeviceBatches.push({
|
||||||
|
id: this.nextToDeviceBatchId++,
|
||||||
|
eventType: batch.eventType,
|
||||||
|
txnId: batch.txnId,
|
||||||
|
batch: batch.batch,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getOldestToDeviceBatch(): Promise<IndexedToDeviceBatch | null> {
|
||||||
|
if (this.pendingToDeviceBatches.length === 0) return null;
|
||||||
|
return this.pendingToDeviceBatches[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
public removeToDeviceBatch(id: number): Promise<void> {
|
||||||
|
this.pendingToDeviceBatches = this.pendingToDeviceBatches.filter(batch => batch.id !== id);
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -28,6 +28,7 @@ import { ISavedSync, IStore } from "./index";
|
|||||||
import { RoomSummary } from "../models/room-summary";
|
import { RoomSummary } from "../models/room-summary";
|
||||||
import { ISyncResponse } from "../sync-accumulator";
|
import { ISyncResponse } from "../sync-accumulator";
|
||||||
import { IStateEventWithRoomId } from "../@types/search";
|
import { IStateEventWithRoomId } from "../@types/search";
|
||||||
|
import { IndexedToDeviceBatch, ToDeviceBatch } from "../models/ToDeviceMessage";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a stub store. This does no-ops on most store methods.
|
* Construct a stub store. This does no-ops on most store methods.
|
||||||
@@ -270,4 +271,16 @@ export class StubStore implements IStore {
|
|||||||
public setPendingEvents(roomId: string, events: Partial<IEvent>[]): Promise<void> {
|
public setPendingEvents(roomId: string, events: Partial<IEvent>[]): Promise<void> {
|
||||||
return Promise.resolve();
|
return Promise.resolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async saveToDeviceBatches(batch: ToDeviceBatch[]): Promise<void> {
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
|
|
||||||
|
public getOldestToDeviceBatch(): Promise<IndexedToDeviceBatch | null> {
|
||||||
|
return Promise.resolve(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async removeToDeviceBatch(id: number): Promise<void> {
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -4802,10 +4802,10 @@ matrix-events-sdk@^0.0.1-beta.7:
|
|||||||
resolved "https://registry.yarnpkg.com/matrix-events-sdk/-/matrix-events-sdk-0.0.1-beta.7.tgz#5ffe45eba1f67cc8d7c2377736c728b322524934"
|
resolved "https://registry.yarnpkg.com/matrix-events-sdk/-/matrix-events-sdk-0.0.1-beta.7.tgz#5ffe45eba1f67cc8d7c2377736c728b322524934"
|
||||||
integrity sha512-9jl4wtWanUFSy2sr2lCjErN/oC8KTAtaeaozJtrgot1JiQcEI4Rda9OLgQ7nLKaqb4Z/QUx/fR3XpDzm5Jy1JA==
|
integrity sha512-9jl4wtWanUFSy2sr2lCjErN/oC8KTAtaeaozJtrgot1JiQcEI4Rda9OLgQ7nLKaqb4Z/QUx/fR3XpDzm5Jy1JA==
|
||||||
|
|
||||||
matrix-mock-request@^2.1.0:
|
matrix-mock-request@^2.1.1:
|
||||||
version "2.1.0"
|
version "2.1.1"
|
||||||
resolved "https://registry.yarnpkg.com/matrix-mock-request/-/matrix-mock-request-2.1.0.tgz#86f5b0ef846865d0767d3a8e64f5bcd6ca94c178"
|
resolved "https://registry.yarnpkg.com/matrix-mock-request/-/matrix-mock-request-2.1.1.tgz#a8fc03a2816464bb95445df4cc8885ac36786b23"
|
||||||
integrity sha512-Cjpl3yP6h0yu5GKG89m1XZXZlm69Kg/qHV41N/t6SrQsgcfM3Bfavqx9YrtG0UnuXGy4bBSZIe1QiWVeFPZw1A==
|
integrity sha512-CxdaUPRVB4o8JxTBMASstS2loRe+hlqeJu0Q7yyS1r36LkSSo/KAP4AuomsqxuKqaqYYnEJFJzkG0gOhxV7aqA==
|
||||||
dependencies:
|
dependencies:
|
||||||
expect "^28.1.0"
|
expect "^28.1.0"
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user