You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-08-06 12:02:40 +03:00
Add basic retry for rust crypto outgoing requests (#4061)
* Add basic retry for outgoing requests * Update doc * Remove 504 from retryable * Retry all 5xx and clarify client timeouts * code review cleaning * do not retry rust request if M_TOO_LARGE * refactor use common retry alg between scheduler and rust requests * Code review, cleaning and doc
This commit is contained in:
@@ -27,8 +27,9 @@ import {
|
|||||||
UploadSigningKeysRequest,
|
UploadSigningKeysRequest,
|
||||||
ToDeviceRequest,
|
ToDeviceRequest,
|
||||||
} from "@matrix-org/matrix-sdk-crypto-wasm";
|
} from "@matrix-org/matrix-sdk-crypto-wasm";
|
||||||
|
import fetchMock from "fetch-mock-jest";
|
||||||
|
|
||||||
import { TypedEventEmitter } from "../../../src/models/typed-event-emitter";
|
import { TypedEventEmitter } from "../../../src";
|
||||||
import { HttpApiEvent, HttpApiEventHandlerMap, IHttpOpts, MatrixHttpApi, UIAuthCallback } from "../../../src";
|
import { HttpApiEvent, HttpApiEventHandlerMap, IHttpOpts, MatrixHttpApi, UIAuthCallback } from "../../../src";
|
||||||
import { OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingRequestProcessor";
|
import { OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingRequestProcessor";
|
||||||
import { defer } from "../../../src/utils";
|
import { defer } from "../../../src/utils";
|
||||||
@@ -274,4 +275,302 @@ describe("OutgoingRequestProcessor", () => {
|
|||||||
// ... and `makeOutgoingRequest` resolves satisfactorily
|
// ... and `makeOutgoingRequest` resolves satisfactorily
|
||||||
await result;
|
await result;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("Should retry requests", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
jest.useFakeTimers();
|
||||||
|
|
||||||
|
// here we use another httpApi instance in order to use fetchMock
|
||||||
|
const dummyEventEmitter = new TypedEventEmitter<HttpApiEvent, HttpApiEventHandlerMap>();
|
||||||
|
const httpApi = new MatrixHttpApi(dummyEventEmitter, {
|
||||||
|
baseUrl: "https://example.com",
|
||||||
|
prefix: "/_matrix",
|
||||||
|
onlyData: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
processor = new OutgoingRequestProcessor(olmMachine, httpApi);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
jest.useRealTimers();
|
||||||
|
fetchMock.reset();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Should retry on retryable errors", () => {
|
||||||
|
const retryableErrors: Array<[number, { status: number; body: { error: string } }]> = [
|
||||||
|
[429, { status: 429, body: { error: "Too Many Requests" } }],
|
||||||
|
[500, { status: 500, body: { error: "Internal Server Error" } }],
|
||||||
|
[502, { status: 502, body: { error: "Bad Gateway" } }],
|
||||||
|
[503, { status: 503, body: { error: "Service Unavailable" } }],
|
||||||
|
[504, { status: 504, body: { error: "Gateway timeout" } }],
|
||||||
|
[505, { status: 505, body: { error: "HTTP Version Not Supported" } }],
|
||||||
|
[506, { status: 506, body: { error: "Variant Also Negotiates" } }],
|
||||||
|
[507, { status: 507, body: { error: "Insufficient Storage" } }],
|
||||||
|
[508, { status: 508, body: { error: "Loop Detected" } }],
|
||||||
|
[510, { status: 510, body: { error: "Not Extended" } }],
|
||||||
|
[511, { status: 511, body: { error: "Network Authentication Required" } }],
|
||||||
|
[525, { status: 525, body: { error: "SSL Handshake Failed" } }],
|
||||||
|
];
|
||||||
|
describe.each(retryableErrors)(`When status code is %s`, (_, error) => {
|
||||||
|
test.each(tests)(`for request of type %ss`, async (_, RequestClass, expectedMethod, expectedPath) => {
|
||||||
|
// first, mock up a request as we might expect to receive it from the Rust layer ...
|
||||||
|
const testBody = '{ "foo": "bar" }';
|
||||||
|
const outgoingRequest = new RequestClass("1234", testBody);
|
||||||
|
|
||||||
|
fetchMock.mock(expectedPath, error, { method: expectedMethod });
|
||||||
|
|
||||||
|
const requestPromise = processor.makeOutgoingRequest(outgoingRequest);
|
||||||
|
|
||||||
|
// Run all timers and wait for the request promise to resolve/reject
|
||||||
|
await Promise.all([jest.runAllTimersAsync(), requestPromise.catch(() => {})]);
|
||||||
|
|
||||||
|
await expect(requestPromise).rejects.toThrow();
|
||||||
|
|
||||||
|
// Should have ultimately made 5 requests (1 initial + 4 retries)
|
||||||
|
const calls = fetchMock.calls(expectedPath);
|
||||||
|
expect(calls).toHaveLength(5);
|
||||||
|
|
||||||
|
// The promise should have been rejected
|
||||||
|
await expect(requestPromise).rejects.toThrow();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should not retry if M_TOO_LARGE", async () => {
|
||||||
|
const testBody = '{ "messages": { "user": {"device": "bar" }}}';
|
||||||
|
const outgoingRequest = new ToDeviceRequest("1234", "custom.type", "12345", testBody);
|
||||||
|
|
||||||
|
fetchMock.put("express:/_matrix/client/v3/sendToDevice/:type/:txnId", {
|
||||||
|
status: 502,
|
||||||
|
body: {
|
||||||
|
errcode: "M_TOO_LARGE",
|
||||||
|
error: "Request too large",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const requestPromise = processor.makeOutgoingRequest(outgoingRequest);
|
||||||
|
|
||||||
|
await Promise.all([requestPromise.catch(() => {}), jest.runAllTimersAsync()]);
|
||||||
|
|
||||||
|
await expect(requestPromise).rejects.toThrow();
|
||||||
|
|
||||||
|
const calls = fetchMock.calls("express:/_matrix/client/v3/sendToDevice/:type/:txnId");
|
||||||
|
expect(calls).toHaveLength(1);
|
||||||
|
|
||||||
|
// The promise should have been rejected
|
||||||
|
await expect(requestPromise).rejects.toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should retry on Failed to fetch connection errors", async () => {
|
||||||
|
let callCount = 0;
|
||||||
|
fetchMock.post("path:/_matrix/client/v3/keys/upload", (url, opts) => {
|
||||||
|
callCount++;
|
||||||
|
if (callCount == 2) {
|
||||||
|
return {
|
||||||
|
status: 200,
|
||||||
|
body: "{}",
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
throw new Error("Failed to fetch");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const outgoingRequest = new KeysUploadRequest("1234", "{}");
|
||||||
|
|
||||||
|
const requestPromise = processor.makeOutgoingRequest(outgoingRequest);
|
||||||
|
|
||||||
|
await Promise.all([requestPromise, jest.runAllTimersAsync()]);
|
||||||
|
|
||||||
|
const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload");
|
||||||
|
expect(calls).toHaveLength(2);
|
||||||
|
expect(olmMachine.markRequestAsSent).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should retry to send to-device", async () => {
|
||||||
|
let callCount = 0;
|
||||||
|
const testBody = '{ "messages": { "user": {"device": "bar" }}}';
|
||||||
|
const outgoingRequest = new ToDeviceRequest("1234", "custom.type", "12345", testBody);
|
||||||
|
|
||||||
|
fetchMock.put("express:/_matrix/client/v3/sendToDevice/:type/:txnId", (url, opts) => {
|
||||||
|
callCount++;
|
||||||
|
if (callCount == 2) {
|
||||||
|
return {
|
||||||
|
status: 200,
|
||||||
|
body: "{}",
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
throw new Error("Failed to fetch");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const requestPromise = processor.makeOutgoingRequest(outgoingRequest);
|
||||||
|
|
||||||
|
await Promise.all([requestPromise, jest.runAllTimersAsync()]);
|
||||||
|
|
||||||
|
const calls = fetchMock.calls("express:/_matrix/client/v3/sendToDevice/:type/:txnId");
|
||||||
|
expect(calls).toHaveLength(2);
|
||||||
|
expect(olmMachine.markRequestAsSent).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should retry to call with UIA", async () => {
|
||||||
|
let callCount = 0;
|
||||||
|
const testBody = '{ "foo": "bar" }';
|
||||||
|
const outgoingRequest = new UploadSigningKeysRequest(testBody);
|
||||||
|
|
||||||
|
fetchMock.post("path:/_matrix/client/v3/keys/device_signing/upload", (url, opts) => {
|
||||||
|
callCount++;
|
||||||
|
if (callCount == 2) {
|
||||||
|
return {
|
||||||
|
status: 200,
|
||||||
|
body: "{}",
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
throw new Error("Failed to fetch");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
const authCallback: UIAuthCallback<Object> = async (makeRequest) => {
|
||||||
|
return await makeRequest({ type: "test" });
|
||||||
|
};
|
||||||
|
const requestPromise = processor.makeOutgoingRequest(outgoingRequest, authCallback);
|
||||||
|
|
||||||
|
await Promise.all([requestPromise, jest.runAllTimersAsync()]);
|
||||||
|
|
||||||
|
const calls = fetchMock.calls("path:/_matrix/client/v3/keys/device_signing/upload");
|
||||||
|
expect(calls).toHaveLength(2);
|
||||||
|
// Will not mark as sent as it's a UIA request
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should retry on respect server cool down on LIMIT_EXCEEDED", async () => {
|
||||||
|
const retryAfterMs = 5000;
|
||||||
|
let callCount = 0;
|
||||||
|
|
||||||
|
fetchMock.post("path:/_matrix/client/v3/keys/upload", (url, opts) => {
|
||||||
|
callCount++;
|
||||||
|
if (callCount == 2) {
|
||||||
|
return {
|
||||||
|
status: 200,
|
||||||
|
body: "{}",
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
return {
|
||||||
|
status: 429,
|
||||||
|
body: {
|
||||||
|
errcode: "M_LIMIT_EXCEEDED",
|
||||||
|
error: "Too many requests",
|
||||||
|
retry_after_ms: retryAfterMs,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const outgoingRequest = new KeysUploadRequest("1234", "{}");
|
||||||
|
|
||||||
|
const requestPromise = processor.makeOutgoingRequest(outgoingRequest);
|
||||||
|
|
||||||
|
// advanced by less than the retryAfterMs
|
||||||
|
await jest.advanceTimersByTimeAsync(retryAfterMs - 1000);
|
||||||
|
|
||||||
|
// should not have made a second request yet
|
||||||
|
{
|
||||||
|
const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload");
|
||||||
|
expect(calls).toHaveLength(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// advanced by the remaining time
|
||||||
|
await jest.advanceTimersByTimeAsync(retryAfterMs + 1000);
|
||||||
|
|
||||||
|
await requestPromise;
|
||||||
|
|
||||||
|
const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload");
|
||||||
|
expect(calls).toHaveLength(2);
|
||||||
|
expect(olmMachine.markRequestAsSent).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
const nonRetryableErrors: Array<[number, { status: number; body: { errcode: string } }]> = [
|
||||||
|
[400, { status: 400, body: { errcode: "Bad Request" } }],
|
||||||
|
[401, { status: 401, body: { errcode: "M_UNKNOWN_TOKEN" } }],
|
||||||
|
[403, { status: 403, body: { errcode: "M_FORBIDDEN" } }],
|
||||||
|
];
|
||||||
|
|
||||||
|
describe.each(nonRetryableErrors)("Should not retry all sort of errors", (_, error) => {
|
||||||
|
test.each(tests)("for %ss", async (_, RequestClass, expectedMethod, expectedPath) => {
|
||||||
|
const testBody = '{ "foo": "bar" }';
|
||||||
|
const outgoingRequest = new RequestClass("1234", testBody);
|
||||||
|
|
||||||
|
// @ts-ignore to avoid having to do if else to switch the method (.put/.post)
|
||||||
|
fetchMock[expectedMethod.toLowerCase()](expectedPath, {
|
||||||
|
status: error.status,
|
||||||
|
body: error.body,
|
||||||
|
});
|
||||||
|
|
||||||
|
const requestPromise = processor.makeOutgoingRequest(outgoingRequest);
|
||||||
|
|
||||||
|
// Run all timers and wait for the request promise to resolve/reject
|
||||||
|
await Promise.all([jest.runAllTimersAsync(), requestPromise.catch(() => {})]);
|
||||||
|
|
||||||
|
await expect(requestPromise).rejects.toThrow();
|
||||||
|
|
||||||
|
// Should have only tried once
|
||||||
|
const calls = fetchMock.calls(expectedPath);
|
||||||
|
expect(calls).toHaveLength(1);
|
||||||
|
|
||||||
|
await expect(requestPromise).rejects.toThrow();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Should not retry client timeouts", () => {
|
||||||
|
test.each(tests)("for %ss", async (_, RequestClass, expectedMethod, expectedPath) => {
|
||||||
|
const testBody = '{ "foo": "bar" }';
|
||||||
|
const outgoingRequest = new RequestClass("1234", testBody);
|
||||||
|
|
||||||
|
// @ts-ignore to avoid having to do if else to switch the method (.put/.post)
|
||||||
|
fetchMock[expectedMethod.toLowerCase()](expectedPath, () => {
|
||||||
|
// This is what a client timeout error will throw
|
||||||
|
throw new DOMException("The user aborted a request.", "AbortError");
|
||||||
|
});
|
||||||
|
|
||||||
|
const requestPromise = processor.makeOutgoingRequest(outgoingRequest);
|
||||||
|
|
||||||
|
// Run all timers and wait for the request promise to resolve/reject
|
||||||
|
await Promise.all([jest.runAllTimersAsync(), requestPromise.catch(() => {})]);
|
||||||
|
|
||||||
|
await expect(requestPromise).rejects.toThrow();
|
||||||
|
|
||||||
|
// Should have only tried once
|
||||||
|
const calls = fetchMock.calls(expectedPath);
|
||||||
|
expect(calls).toHaveLength(1);
|
||||||
|
await expect(requestPromise).rejects.toThrow();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Should retry until it works", () => {
|
||||||
|
it.each([1, 2, 3, 4])("should succeed if the call number %s is ok", async (successfulCall) => {
|
||||||
|
let callCount = 0;
|
||||||
|
fetchMock.post("path:/_matrix/client/v3/keys/upload", (url, opts) => {
|
||||||
|
callCount++;
|
||||||
|
if (callCount == successfulCall) {
|
||||||
|
return {
|
||||||
|
status: 200,
|
||||||
|
body: "{}",
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
return {
|
||||||
|
status: 500,
|
||||||
|
body: { error: "Internal server error" },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const outgoingRequest = new KeysUploadRequest("1234", "{}");
|
||||||
|
|
||||||
|
const requestPromise = processor.makeOutgoingRequest(outgoingRequest);
|
||||||
|
|
||||||
|
await Promise.all([requestPromise, jest.runAllTimersAsync()]);
|
||||||
|
|
||||||
|
const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload");
|
||||||
|
expect(calls).toHaveLength(successfulCall);
|
||||||
|
expect(olmMachine.markRequestAsSent).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
@@ -151,3 +151,49 @@ export async function retryNetworkOperation<T>(maxAttempts: number, callback: ()
|
|||||||
}
|
}
|
||||||
throw lastConnectionError;
|
throw lastConnectionError;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate the backoff time for a request retry attempt.
|
||||||
|
* This produces wait times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the
|
||||||
|
* failure was due to a rate limited request, the time specified in the error is returned.
|
||||||
|
*
|
||||||
|
* Returns -1 if the error is not retryable, or if we reach the maximum number of attempts.
|
||||||
|
*
|
||||||
|
* @param err - The error thrown by the http call
|
||||||
|
* @param attempts - The number of attempts made so far, including the one that just failed.
|
||||||
|
* @param retryConnectionError - Whether to retry on {@link ConnectionError} (CORS, connection is down, etc.)
|
||||||
|
*/
|
||||||
|
export function calculateRetryBackoff(err: any, attempts: number, retryConnectionError: boolean): number {
|
||||||
|
if (attempts > 4) {
|
||||||
|
return -1; // give up
|
||||||
|
}
|
||||||
|
|
||||||
|
if (err instanceof ConnectionError && !retryConnectionError) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (err.httpStatus && (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401)) {
|
||||||
|
// client error; no amount of retrying will save you now.
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (err.name === "AbortError") {
|
||||||
|
// this is a client timeout, that is already very high 60s/80s
|
||||||
|
// we don't want to retry, as it could do it for very long
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we are trying to send an event (or similar) that is too large in any way, then retrying won't help
|
||||||
|
if (err.name === "M_TOO_LARGE") {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (err.name === "M_LIMIT_EXCEEDED") {
|
||||||
|
const waitTime = err.data.retry_after_ms;
|
||||||
|
if (waitTime > 0) {
|
||||||
|
return waitTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1000 * Math.pow(2, attempts);
|
||||||
|
}
|
||||||
|
@@ -15,11 +15,11 @@ limitations under the License.
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import {
|
import {
|
||||||
OlmMachine,
|
|
||||||
KeysBackupRequest,
|
KeysBackupRequest,
|
||||||
KeysClaimRequest,
|
KeysClaimRequest,
|
||||||
KeysQueryRequest,
|
KeysQueryRequest,
|
||||||
KeysUploadRequest,
|
KeysUploadRequest,
|
||||||
|
OlmMachine,
|
||||||
RoomMessageRequest,
|
RoomMessageRequest,
|
||||||
SignatureUploadRequest,
|
SignatureUploadRequest,
|
||||||
ToDeviceRequest,
|
ToDeviceRequest,
|
||||||
@@ -27,8 +27,8 @@ import {
|
|||||||
} from "@matrix-org/matrix-sdk-crypto-wasm";
|
} from "@matrix-org/matrix-sdk-crypto-wasm";
|
||||||
|
|
||||||
import { logger } from "../logger";
|
import { logger } from "../logger";
|
||||||
import { IHttpOpts, MatrixHttpApi, Method } from "../http-api";
|
import { calculateRetryBackoff, IHttpOpts, MatrixHttpApi, Method } from "../http-api";
|
||||||
import { logDuration, QueryDict } from "../utils";
|
import { logDuration, QueryDict, sleep } from "../utils";
|
||||||
import { IAuthDict, UIAuthCallback } from "../interactive-auth";
|
import { IAuthDict, UIAuthCallback } from "../interactive-auth";
|
||||||
import { UIAResponse } from "../@types/uia";
|
import { UIAResponse } from "../@types/uia";
|
||||||
import { ToDeviceMessageId } from "../@types/event";
|
import { ToDeviceMessageId } from "../@types/event";
|
||||||
@@ -71,15 +71,15 @@ export class OutgoingRequestProcessor {
|
|||||||
* for the complete list of request types
|
* for the complete list of request types
|
||||||
*/
|
*/
|
||||||
if (msg instanceof KeysUploadRequest) {
|
if (msg instanceof KeysUploadRequest) {
|
||||||
resp = await this.rawJsonRequest(Method.Post, "/_matrix/client/v3/keys/upload", {}, msg.body);
|
resp = await this.requestWithRetry(Method.Post, "/_matrix/client/v3/keys/upload", {}, msg.body);
|
||||||
} else if (msg instanceof KeysQueryRequest) {
|
} else if (msg instanceof KeysQueryRequest) {
|
||||||
resp = await this.rawJsonRequest(Method.Post, "/_matrix/client/v3/keys/query", {}, msg.body);
|
resp = await this.requestWithRetry(Method.Post, "/_matrix/client/v3/keys/query", {}, msg.body);
|
||||||
} else if (msg instanceof KeysClaimRequest) {
|
} else if (msg instanceof KeysClaimRequest) {
|
||||||
resp = await this.rawJsonRequest(Method.Post, "/_matrix/client/v3/keys/claim", {}, msg.body);
|
resp = await this.requestWithRetry(Method.Post, "/_matrix/client/v3/keys/claim", {}, msg.body);
|
||||||
} else if (msg instanceof SignatureUploadRequest) {
|
} else if (msg instanceof SignatureUploadRequest) {
|
||||||
resp = await this.rawJsonRequest(Method.Post, "/_matrix/client/v3/keys/signatures/upload", {}, msg.body);
|
resp = await this.requestWithRetry(Method.Post, "/_matrix/client/v3/keys/signatures/upload", {}, msg.body);
|
||||||
} else if (msg instanceof KeysBackupRequest) {
|
} else if (msg instanceof KeysBackupRequest) {
|
||||||
resp = await this.rawJsonRequest(
|
resp = await this.requestWithRetry(
|
||||||
Method.Put,
|
Method.Put,
|
||||||
"/_matrix/client/v3/room_keys/keys",
|
"/_matrix/client/v3/room_keys/keys",
|
||||||
{ version: msg.version },
|
{ version: msg.version },
|
||||||
@@ -91,7 +91,7 @@ export class OutgoingRequestProcessor {
|
|||||||
const path =
|
const path =
|
||||||
`/_matrix/client/v3/rooms/${encodeURIComponent(msg.room_id)}/send/` +
|
`/_matrix/client/v3/rooms/${encodeURIComponent(msg.room_id)}/send/` +
|
||||||
`${encodeURIComponent(msg.event_type)}/${encodeURIComponent(msg.txn_id)}`;
|
`${encodeURIComponent(msg.event_type)}/${encodeURIComponent(msg.txn_id)}`;
|
||||||
resp = await this.rawJsonRequest(Method.Put, path, {}, msg.body);
|
resp = await this.requestWithRetry(Method.Put, path, {}, msg.body);
|
||||||
} else if (msg instanceof UploadSigningKeysRequest) {
|
} else if (msg instanceof UploadSigningKeysRequest) {
|
||||||
await this.makeRequestWithUIA(
|
await this.makeRequestWithUIA(
|
||||||
Method.Post,
|
Method.Post,
|
||||||
@@ -154,7 +154,7 @@ export class OutgoingRequestProcessor {
|
|||||||
const path =
|
const path =
|
||||||
`/_matrix/client/v3/sendToDevice/${encodeURIComponent(request.event_type)}/` +
|
`/_matrix/client/v3/sendToDevice/${encodeURIComponent(request.event_type)}/` +
|
||||||
encodeURIComponent(request.txn_id);
|
encodeURIComponent(request.txn_id);
|
||||||
return await this.rawJsonRequest(Method.Put, path, {}, request.body);
|
return await this.requestWithRetry(Method.Put, path, {}, request.body);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async makeRequestWithUIA<T>(
|
private async makeRequestWithUIA<T>(
|
||||||
@@ -165,7 +165,7 @@ export class OutgoingRequestProcessor {
|
|||||||
uiaCallback: UIAuthCallback<T> | undefined,
|
uiaCallback: UIAuthCallback<T> | undefined,
|
||||||
): Promise<string> {
|
): Promise<string> {
|
||||||
if (!uiaCallback) {
|
if (!uiaCallback) {
|
||||||
return await this.rawJsonRequest(method, path, queryParams, body);
|
return await this.requestWithRetry(method, path, queryParams, body);
|
||||||
}
|
}
|
||||||
|
|
||||||
const parsedBody = JSON.parse(body);
|
const parsedBody = JSON.parse(body);
|
||||||
@@ -176,7 +176,7 @@ export class OutgoingRequestProcessor {
|
|||||||
if (auth !== null) {
|
if (auth !== null) {
|
||||||
newBody.auth = auth;
|
newBody.auth = auth;
|
||||||
}
|
}
|
||||||
const resp = await this.rawJsonRequest(method, path, queryParams, JSON.stringify(newBody));
|
const resp = await this.requestWithRetry(method, path, queryParams, JSON.stringify(newBody));
|
||||||
return JSON.parse(resp) as T;
|
return JSON.parse(resp) as T;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -184,6 +184,31 @@ export class OutgoingRequestProcessor {
|
|||||||
return JSON.stringify(resp);
|
return JSON.stringify(resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async requestWithRetry(
|
||||||
|
method: Method,
|
||||||
|
path: string,
|
||||||
|
queryParams: QueryDict,
|
||||||
|
body: string,
|
||||||
|
): Promise<string> {
|
||||||
|
let currentRetryCount = 0;
|
||||||
|
|
||||||
|
// eslint-disable-next-line no-constant-condition
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return await this.rawJsonRequest(method, path, queryParams, body);
|
||||||
|
} catch (e) {
|
||||||
|
currentRetryCount++;
|
||||||
|
const backoff = calculateRetryBackoff(e, currentRetryCount, true);
|
||||||
|
if (backoff < 0) {
|
||||||
|
// Max number of retries reached, or error is not retryable. rethrow the error
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
// wait for the specified time and then retry the request
|
||||||
|
await sleep(backoff);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private async rawJsonRequest(method: Method, path: string, queryParams: QueryDict, body: string): Promise<string> {
|
private async rawJsonRequest(method: Method, path: string, queryParams: QueryDict, body: string): Promise<string> {
|
||||||
const opts = {
|
const opts = {
|
||||||
// inhibit the JSON stringification and parsing within HttpApi.
|
// inhibit the JSON stringification and parsing within HttpApi.
|
||||||
|
@@ -22,7 +22,7 @@ import { logger } from "./logger";
|
|||||||
import { MatrixEvent } from "./models/event";
|
import { MatrixEvent } from "./models/event";
|
||||||
import { EventType } from "./@types/event";
|
import { EventType } from "./@types/event";
|
||||||
import { defer, IDeferred, removeElement } from "./utils";
|
import { defer, IDeferred, removeElement } from "./utils";
|
||||||
import { ConnectionError, MatrixError } from "./http-api";
|
import { calculateRetryBackoff, MatrixError } from "./http-api";
|
||||||
import { ISendEventResponse } from "./@types/requests";
|
import { ISendEventResponse } from "./@types/requests";
|
||||||
|
|
||||||
const DEBUG = false; // set true to enable console logging.
|
const DEBUG = false; // set true to enable console logging.
|
||||||
@@ -43,38 +43,13 @@ type ProcessFunction<T> = (event: MatrixEvent) => Promise<T>;
|
|||||||
// eslint-disable-next-line camelcase
|
// eslint-disable-next-line camelcase
|
||||||
export class MatrixScheduler<T = ISendEventResponse> {
|
export class MatrixScheduler<T = ISendEventResponse> {
|
||||||
/**
|
/**
|
||||||
* Retries events up to 4 times using exponential backoff. This produces wait
|
* Default retry algorithm for the matrix scheduler. Retries events up to 4 times with exponential backoff.
|
||||||
* times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the
|
|
||||||
* failure was due to a rate limited request, the time specified in the error is
|
|
||||||
* waited before being retried.
|
|
||||||
* @param attempts - Number of attempts that have been made, including the one that just failed (ie. starting at 1)
|
* @param attempts - Number of attempts that have been made, including the one that just failed (ie. starting at 1)
|
||||||
* @see retryAlgorithm
|
* @see retryAlgorithm
|
||||||
*/
|
*/
|
||||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||||
public static RETRY_BACKOFF_RATELIMIT(event: MatrixEvent | null, attempts: number, err: MatrixError): number {
|
public static RETRY_BACKOFF_RATELIMIT(event: MatrixEvent | null, attempts: number, err: MatrixError): number {
|
||||||
if (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401) {
|
return calculateRetryBackoff(err, attempts, false);
|
||||||
// client error; no amount of retrying with save you now.
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (err instanceof ConnectionError) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if event that we are trying to send is too large in any way then retrying won't help
|
|
||||||
if (err.name === "M_TOO_LARGE") {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (err.name === "M_LIMIT_EXCEEDED") {
|
|
||||||
const waitTime = err.data.retry_after_ms;
|
|
||||||
if (waitTime > 0) {
|
|
||||||
return waitTime;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (attempts > 4) {
|
|
||||||
return -1; // give up
|
|
||||||
}
|
|
||||||
return 1000 * Math.pow(2, attempts);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Reference in New Issue
Block a user