You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-08-07 23:02:56 +03:00
Use client logger in more places (core code) (#4899)
* Use client logger for sync Use the logger attached to the MatrixClient when writing log messages out of the sync api. This helps figure out what's going on when multiple clients are running in the same JS environment. * Use client logger for to-device message queue * Use client logger in `PushProcessor.rewriteDefaultRules` * use client logger in `ServerCapabilities` * Mark global `logger` as deprecated
This commit is contained in:
committed by
GitHub
parent
940d358b0e
commit
b4672e26ec
@@ -126,7 +126,7 @@ describe("SlidingSyncSdk", () => {
|
|||||||
// assign client/httpBackend globals
|
// assign client/httpBackend globals
|
||||||
const setupClient = async (testOpts?: Partial<IStoredClientOpts & { withCrypto: boolean }>) => {
|
const setupClient = async (testOpts?: Partial<IStoredClientOpts & { withCrypto: boolean }>) => {
|
||||||
testOpts = testOpts || {};
|
testOpts = testOpts || {};
|
||||||
const syncOpts: SyncApiOptions = {};
|
const syncOpts: SyncApiOptions = { logger };
|
||||||
const testClient = new TestClient(selfUserId, "DEVICE", selfAccessToken);
|
const testClient = new TestClient(selfUserId, "DEVICE", selfAccessToken);
|
||||||
httpBackend = testClient.httpBackend;
|
httpBackend = testClient.httpBackend;
|
||||||
client = testClient.client;
|
client = testClient.client;
|
||||||
|
@@ -5,6 +5,7 @@ import { getMockClientWithEventEmitter } from "../test-utils/client";
|
|||||||
import { StubStore } from "../../src/store/stub";
|
import { StubStore } from "../../src/store/stub";
|
||||||
import { type IndexedToDeviceBatch } from "../../src/models/ToDeviceMessage";
|
import { type IndexedToDeviceBatch } from "../../src/models/ToDeviceMessage";
|
||||||
import { SyncState } from "../../src/sync";
|
import { SyncState } from "../../src/sync";
|
||||||
|
import { logger } from "../../src/logger.ts";
|
||||||
|
|
||||||
describe("onResumedSync", () => {
|
describe("onResumedSync", () => {
|
||||||
let batch: IndexedToDeviceBatch | null;
|
let batch: IndexedToDeviceBatch | null;
|
||||||
@@ -55,7 +56,7 @@ describe("onResumedSync", () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
queue = new ToDeviceMessageQueue(mockClient);
|
queue = new ToDeviceMessageQueue(mockClient, logger);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("resends queue after connectivity restored", async () => {
|
it("resends queue after connectivity restored", async () => {
|
||||||
|
@@ -28,6 +28,7 @@ import {
|
|||||||
TweakName,
|
TweakName,
|
||||||
} from "../../src";
|
} from "../../src";
|
||||||
import { mockClientMethodsUser } from "../test-utils/client";
|
import { mockClientMethodsUser } from "../test-utils/client";
|
||||||
|
import { logger } from "../../src/logger.ts";
|
||||||
|
|
||||||
const msc3914RoomCallRule: IPushRule = {
|
const msc3914RoomCallRule: IPushRule = {
|
||||||
rule_id: ".org.matrix.msc3914.rule.room.call",
|
rule_id: ".org.matrix.msc3914.rule.room.call",
|
||||||
@@ -209,7 +210,7 @@ describe("NotificationService", function () {
|
|||||||
msgtype: "m.text",
|
msgtype: "m.text",
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
matrixClient.pushRules = PushProcessor.rewriteDefaultRules(matrixClient.pushRules!);
|
matrixClient.pushRules = PushProcessor.rewriteDefaultRules(logger, matrixClient.pushRules!);
|
||||||
pushProcessor = new PushProcessor(matrixClient);
|
pushProcessor = new PushProcessor(matrixClient);
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -731,7 +732,7 @@ describe("Test PushProcessor.partsForDottedKey", function () {
|
|||||||
|
|
||||||
describe("rewriteDefaultRules", () => {
|
describe("rewriteDefaultRules", () => {
|
||||||
it("should add default rules in the correct order", () => {
|
it("should add default rules in the correct order", () => {
|
||||||
const pushRules = PushProcessor.rewriteDefaultRules({
|
const pushRules = PushProcessor.rewriteDefaultRules(logger, {
|
||||||
device: {},
|
device: {},
|
||||||
global: {
|
global: {
|
||||||
content: [],
|
content: [],
|
||||||
@@ -867,7 +868,7 @@ describe("rewriteDefaultRules", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it("should add missing msc3914 rule in correct place", () => {
|
it("should add missing msc3914 rule in correct place", () => {
|
||||||
const pushRules = PushProcessor.rewriteDefaultRules({
|
const pushRules = PushProcessor.rewriteDefaultRules(logger, {
|
||||||
device: {},
|
device: {},
|
||||||
global: {
|
global: {
|
||||||
// Sample push rules from a Synapse user.
|
// Sample push rules from a Synapse user.
|
||||||
|
@@ -15,7 +15,7 @@ limitations under the License.
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { ToDeviceMessageId } from "./@types/event.ts";
|
import { ToDeviceMessageId } from "./@types/event.ts";
|
||||||
import { logger } from "./logger.ts";
|
import { type Logger } from "./logger.ts";
|
||||||
import { type MatrixClient, ClientEvent } from "./client.ts";
|
import { type MatrixClient, ClientEvent } from "./client.ts";
|
||||||
import { type MatrixError } from "./http-api/index.ts";
|
import { type MatrixError } from "./http-api/index.ts";
|
||||||
import {
|
import {
|
||||||
@@ -40,7 +40,10 @@ export class ToDeviceMessageQueue {
|
|||||||
private retryTimeout: ReturnType<typeof setTimeout> | null = null;
|
private retryTimeout: ReturnType<typeof setTimeout> | null = null;
|
||||||
private retryAttempts = 0;
|
private retryAttempts = 0;
|
||||||
|
|
||||||
public constructor(private client: MatrixClient) {}
|
public constructor(
|
||||||
|
private client: MatrixClient,
|
||||||
|
private readonly logger: Logger,
|
||||||
|
) {}
|
||||||
|
|
||||||
public start(): void {
|
public start(): void {
|
||||||
this.running = true;
|
this.running = true;
|
||||||
@@ -67,7 +70,7 @@ export class ToDeviceMessageQueue {
|
|||||||
const msgmap = batchWithTxnId.batch.map(
|
const msgmap = batchWithTxnId.batch.map(
|
||||||
(msg) => `${msg.userId}/${msg.deviceId} (msgid ${msg.payload[ToDeviceMessageId]})`,
|
(msg) => `${msg.userId}/${msg.deviceId} (msgid ${msg.payload[ToDeviceMessageId]})`,
|
||||||
);
|
);
|
||||||
logger.info(
|
this.logger.info(
|
||||||
`Enqueuing batch of to-device messages. type=${batch.eventType} txnid=${batchWithTxnId.txnId}`,
|
`Enqueuing batch of to-device messages. type=${batch.eventType} txnid=${batchWithTxnId.txnId}`,
|
||||||
msgmap,
|
msgmap,
|
||||||
);
|
);
|
||||||
@@ -83,7 +86,7 @@ export class ToDeviceMessageQueue {
|
|||||||
|
|
||||||
if (this.sending || !this.running) return;
|
if (this.sending || !this.running) return;
|
||||||
|
|
||||||
logger.debug("Attempting to send queued to-device messages");
|
this.logger.debug("Attempting to send queued to-device messages");
|
||||||
|
|
||||||
this.sending = true;
|
this.sending = true;
|
||||||
let headBatch: IndexedToDeviceBatch | null;
|
let headBatch: IndexedToDeviceBatch | null;
|
||||||
@@ -99,7 +102,7 @@ export class ToDeviceMessageQueue {
|
|||||||
// Make sure we're still running after the async tasks: if not, stop.
|
// Make sure we're still running after the async tasks: if not, stop.
|
||||||
if (!this.running) return;
|
if (!this.running) return;
|
||||||
|
|
||||||
logger.debug("All queued to-device messages sent");
|
this.logger.debug("All queued to-device messages sent");
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
++this.retryAttempts;
|
++this.retryAttempts;
|
||||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||||
@@ -109,15 +112,15 @@ export class ToDeviceMessageQueue {
|
|||||||
// the scheduler function doesn't differentiate between fatal errors and just getting
|
// the scheduler function doesn't differentiate between fatal errors and just getting
|
||||||
// bored and giving up for now
|
// bored and giving up for now
|
||||||
if (Math.floor((<MatrixError>e).httpStatus! / 100) === 4) {
|
if (Math.floor((<MatrixError>e).httpStatus! / 100) === 4) {
|
||||||
logger.error("Fatal error when sending to-device message - dropping to-device batch!", e);
|
this.logger.error("Fatal error when sending to-device message - dropping to-device batch!", e);
|
||||||
await this.client.store.removeToDeviceBatch(headBatch!.id);
|
await this.client.store.removeToDeviceBatch(headBatch!.id);
|
||||||
} else {
|
} else {
|
||||||
logger.info("Automatic retry limit reached for to-device messages.");
|
this.logger.info("Automatic retry limit reached for to-device messages.");
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(`Failed to send batch of to-device messages. Will retry in ${retryDelay}ms`, e);
|
this.logger.info(`Failed to send batch of to-device messages. Will retry in ${retryDelay}ms`, e);
|
||||||
this.retryTimeout = setTimeout(this.sendQueue, retryDelay);
|
this.retryTimeout = setTimeout(this.sendQueue, retryDelay);
|
||||||
} finally {
|
} finally {
|
||||||
this.sending = false;
|
this.sending = false;
|
||||||
@@ -133,7 +136,7 @@ export class ToDeviceMessageQueue {
|
|||||||
contentMap.getOrCreate(item.userId).set(item.deviceId, item.payload);
|
contentMap.getOrCreate(item.userId).set(item.deviceId, item.payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(
|
this.logger.info(
|
||||||
`Sending batch of ${batch.batch.length} to-device messages with ID ${batch.id} and txnId ${batch.txnId}`,
|
`Sending batch of ${batch.batch.length} to-device messages with ID ${batch.id} and txnId ${batch.txnId}`,
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -146,7 +149,7 @@ export class ToDeviceMessageQueue {
|
|||||||
*/
|
*/
|
||||||
private onResumedSync = (state: SyncState | null, oldState: SyncState | null): void => {
|
private onResumedSync = (state: SyncState | null, oldState: SyncState | null): void => {
|
||||||
if (state === SyncState.Syncing && oldState !== SyncState.Syncing) {
|
if (state === SyncState.Syncing && oldState !== SyncState.Syncing) {
|
||||||
logger.info(`Resuming queue after resumed sync`);
|
this.logger.info(`Resuming queue after resumed sync`);
|
||||||
this.sendQueue();
|
this.sendQueue();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@@ -1364,7 +1364,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
// the underlying session management and doesn't use any actual media capabilities
|
// the underlying session management and doesn't use any actual media capabilities
|
||||||
this.matrixRTC = new MatrixRTCSessionManager(this);
|
this.matrixRTC = new MatrixRTCSessionManager(this);
|
||||||
|
|
||||||
this.serverCapabilitiesService = new ServerCapabilities(this.http);
|
this.serverCapabilitiesService = new ServerCapabilities(this.logger, this.http);
|
||||||
|
|
||||||
this.on(ClientEvent.Sync, this.fixupRoomNotifications);
|
this.on(ClientEvent.Sync, this.fixupRoomNotifications);
|
||||||
|
|
||||||
@@ -1386,7 +1386,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
|
|
||||||
this.roomNameGenerator = opts.roomNameGenerator;
|
this.roomNameGenerator = opts.roomNameGenerator;
|
||||||
|
|
||||||
this.toDeviceMessageQueue = new ToDeviceMessageQueue(this);
|
this.toDeviceMessageQueue = new ToDeviceMessageQueue(this, this.logger);
|
||||||
|
|
||||||
// The SDK doesn't really provide a clean way for events to recalculate the push
|
// The SDK doesn't really provide a clean way for events to recalculate the push
|
||||||
// actions for themselves, so we have to kinda help them out when they are encrypted.
|
// actions for themselves, so we have to kinda help them out when they are encrypted.
|
||||||
@@ -1503,6 +1503,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
}
|
}
|
||||||
return this.canResetTimelineCallback(roomId);
|
return this.canResetTimelineCallback(roomId);
|
||||||
},
|
},
|
||||||
|
logger: this.logger.getChild("sync"),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -7324,7 +7325,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
|
|||||||
*/
|
*/
|
||||||
public setPushRules(rules: IPushRules): void {
|
public setPushRules(rules: IPushRules): void {
|
||||||
// Fix-up defaults, if applicable.
|
// Fix-up defaults, if applicable.
|
||||||
this.pushRules = PushProcessor.rewriteDefaultRules(rules, this.getUserId()!);
|
this.pushRules = PushProcessor.rewriteDefaultRules(this.logger, rules, this.getUserId()!);
|
||||||
// Pre-calculate any necessary caches.
|
// Pre-calculate any necessary caches.
|
||||||
this.pushProcessor.updateCachedPushRuleKeys(this.pushRules);
|
this.pushProcessor.updateCachedPushRuleKeys(this.pushRules);
|
||||||
}
|
}
|
||||||
|
@@ -160,6 +160,9 @@ function getPrefixedLogger(prefix?: string): PrefixedLogger {
|
|||||||
/**
|
/**
|
||||||
* Drop-in replacement for `console` using {@link https://www.npmjs.com/package/loglevel|loglevel}.
|
* Drop-in replacement for `console` using {@link https://www.npmjs.com/package/loglevel|loglevel}.
|
||||||
* Can be tailored down to specific use cases if needed.
|
* Can be tailored down to specific use cases if needed.
|
||||||
|
*
|
||||||
|
* @deprecated avoid the use of this unless you are the constructor of `MatrixClient`: you should be using the logger
|
||||||
|
* associated with `MatrixClient`.
|
||||||
*/
|
*/
|
||||||
export const logger = getPrefixedLogger() as LoggerWithLogMethod;
|
export const logger = getPrefixedLogger() as LoggerWithLogMethod;
|
||||||
|
|
||||||
|
@@ -15,7 +15,7 @@ limitations under the License.
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { deepCompare, escapeRegExp, globToRegexp, isNullOrUndefined } from "./utils.ts";
|
import { deepCompare, escapeRegExp, globToRegexp, isNullOrUndefined } from "./utils.ts";
|
||||||
import { logger } from "./logger.ts";
|
import { type Logger } from "./logger.ts";
|
||||||
import { type MatrixClient } from "./client.ts";
|
import { type MatrixClient } from "./client.ts";
|
||||||
import { type MatrixEvent } from "./models/event.ts";
|
import { type MatrixEvent } from "./models/event.ts";
|
||||||
import {
|
import {
|
||||||
@@ -170,6 +170,7 @@ const EXPECTED_DEFAULT_UNDERRIDE_RULE_IDS: OrderedRules = [
|
|||||||
/**
|
/**
|
||||||
* Make sure that each of the rules listed in `defaultRuleIds` is listed in the given set of push rules.
|
* Make sure that each of the rules listed in `defaultRuleIds` is listed in the given set of push rules.
|
||||||
*
|
*
|
||||||
|
* @param logger - A `Logger` to write log messages to.
|
||||||
* @param kind - the kind of push rule set being merged.
|
* @param kind - the kind of push rule set being merged.
|
||||||
* @param incomingRules - the existing set of known push rules for the user.
|
* @param incomingRules - the existing set of known push rules for the user.
|
||||||
* @param defaultRules - a lookup table for the default definitions of push rules.
|
* @param defaultRules - a lookup table for the default definitions of push rules.
|
||||||
@@ -178,6 +179,7 @@ const EXPECTED_DEFAULT_UNDERRIDE_RULE_IDS: OrderedRules = [
|
|||||||
* @returns A copy of `incomingRules`, with any missing default rules inserted in the right place.
|
* @returns A copy of `incomingRules`, with any missing default rules inserted in the right place.
|
||||||
*/
|
*/
|
||||||
function mergeRulesWithDefaults(
|
function mergeRulesWithDefaults(
|
||||||
|
logger: Logger,
|
||||||
kind: PushRuleKind,
|
kind: PushRuleKind,
|
||||||
incomingRules: IPushRule[],
|
incomingRules: IPushRule[],
|
||||||
defaultRules: Record<string, IPushRule>,
|
defaultRules: Record<string, IPushRule>,
|
||||||
@@ -276,11 +278,17 @@ export class PushProcessor {
|
|||||||
* Rewrites conditions on a client's push rules to match the defaults
|
* Rewrites conditions on a client's push rules to match the defaults
|
||||||
* where applicable. Useful for upgrading push rules to more strict
|
* where applicable. Useful for upgrading push rules to more strict
|
||||||
* conditions when the server is falling behind on defaults.
|
* conditions when the server is falling behind on defaults.
|
||||||
|
*
|
||||||
|
* @param logger - A `Logger` to write log messages to.
|
||||||
* @param incomingRules - The client's existing push rules
|
* @param incomingRules - The client's existing push rules
|
||||||
* @param userId - The Matrix ID of the client.
|
* @param userId - The Matrix ID of the client.
|
||||||
* @returns The rewritten rules
|
* @returns The rewritten rules
|
||||||
*/
|
*/
|
||||||
public static rewriteDefaultRules(incomingRules: IPushRules, userId: string | undefined = undefined): IPushRules {
|
public static rewriteDefaultRules(
|
||||||
|
logger: Logger,
|
||||||
|
incomingRules: IPushRules,
|
||||||
|
userId: string | undefined = undefined,
|
||||||
|
): IPushRules {
|
||||||
let newRules: IPushRules = JSON.parse(JSON.stringify(incomingRules)); // deep clone
|
let newRules: IPushRules = JSON.parse(JSON.stringify(incomingRules)); // deep clone
|
||||||
|
|
||||||
// These lines are mostly to make the tests happy. We shouldn't run into these
|
// These lines are mostly to make the tests happy. We shouldn't run into these
|
||||||
@@ -292,6 +300,7 @@ export class PushProcessor {
|
|||||||
|
|
||||||
// Merge the client-level defaults with the ones from the server
|
// Merge the client-level defaults with the ones from the server
|
||||||
newRules.global.override = mergeRulesWithDefaults(
|
newRules.global.override = mergeRulesWithDefaults(
|
||||||
|
logger,
|
||||||
PushRuleKind.Override,
|
PushRuleKind.Override,
|
||||||
newRules.global.override,
|
newRules.global.override,
|
||||||
DEFAULT_OVERRIDE_RULES,
|
DEFAULT_OVERRIDE_RULES,
|
||||||
@@ -299,6 +308,7 @@ export class PushProcessor {
|
|||||||
);
|
);
|
||||||
|
|
||||||
newRules.global.underride = mergeRulesWithDefaults(
|
newRules.global.underride = mergeRulesWithDefaults(
|
||||||
|
logger,
|
||||||
PushRuleKind.Underride,
|
PushRuleKind.Underride,
|
||||||
newRules.global.underride,
|
newRules.global.underride,
|
||||||
DEFAULT_UNDERRIDE_RULES,
|
DEFAULT_UNDERRIDE_RULES,
|
||||||
|
@@ -15,7 +15,7 @@ limitations under the License.
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { type IHttpOpts, type MatrixHttpApi, Method } from "./http-api/index.ts";
|
import { type IHttpOpts, type MatrixHttpApi, Method } from "./http-api/index.ts";
|
||||||
import { logger } from "./logger.ts";
|
import { type Logger } from "./logger.ts";
|
||||||
|
|
||||||
// How often we update the server capabilities.
|
// How often we update the server capabilities.
|
||||||
// 6 hours - an arbitrary value, but they should change very infrequently.
|
// 6 hours - an arbitrary value, but they should change very infrequently.
|
||||||
@@ -78,7 +78,10 @@ export class ServerCapabilities {
|
|||||||
private retryTimeout?: ReturnType<typeof setTimeout>;
|
private retryTimeout?: ReturnType<typeof setTimeout>;
|
||||||
private refreshTimeout?: ReturnType<typeof setInterval>;
|
private refreshTimeout?: ReturnType<typeof setInterval>;
|
||||||
|
|
||||||
public constructor(private readonly http: MatrixHttpApi<IHttpOpts & { onlyData: true }>) {}
|
public constructor(
|
||||||
|
private readonly logger: Logger,
|
||||||
|
private readonly http: MatrixHttpApi<IHttpOpts & { onlyData: true }>,
|
||||||
|
) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts periodically fetching the server capabilities.
|
* Starts periodically fetching the server capabilities.
|
||||||
@@ -117,12 +120,12 @@ export class ServerCapabilities {
|
|||||||
await this.fetchCapabilities();
|
await this.fetchCapabilities();
|
||||||
this.clearTimeouts();
|
this.clearTimeouts();
|
||||||
this.refreshTimeout = setTimeout(this.poll, CAPABILITIES_CACHE_MS);
|
this.refreshTimeout = setTimeout(this.poll, CAPABILITIES_CACHE_MS);
|
||||||
logger.debug("Fetched new server capabilities");
|
this.logger.debug("Fetched new server capabilities");
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
this.clearTimeouts();
|
this.clearTimeouts();
|
||||||
const howLong = Math.floor(CAPABILITIES_RETRY_MS + Math.random() * 5000);
|
const howLong = Math.floor(CAPABILITIES_RETRY_MS + Math.random() * 5000);
|
||||||
this.retryTimeout = setTimeout(this.poll, howLong);
|
this.retryTimeout = setTimeout(this.poll, howLong);
|
||||||
logger.warn(`Failed to refresh capabilities: retrying in ${howLong}ms`, e);
|
this.logger.warn(`Failed to refresh capabilities: retrying in ${howLong}ms`, e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -326,8 +326,8 @@ export class SlidingSyncSdk {
|
|||||||
public constructor(
|
public constructor(
|
||||||
private readonly slidingSync: SlidingSync,
|
private readonly slidingSync: SlidingSync,
|
||||||
private readonly client: MatrixClient,
|
private readonly client: MatrixClient,
|
||||||
opts?: IStoredClientOpts,
|
opts: IStoredClientOpts | undefined,
|
||||||
syncOpts?: SyncApiOptions,
|
syncOpts: SyncApiOptions,
|
||||||
) {
|
) {
|
||||||
this.opts = defaultClientOpts(opts);
|
this.opts = defaultClientOpts(opts);
|
||||||
this.syncOpts = defaultSyncApiOpts(syncOpts);
|
this.syncOpts = defaultSyncApiOpts(syncOpts);
|
||||||
@@ -356,7 +356,11 @@ export class SlidingSyncSdk {
|
|||||||
let room = this.client.store.getRoom(roomId);
|
let room = this.client.store.getRoom(roomId);
|
||||||
if (!room) {
|
if (!room) {
|
||||||
if (!roomData.initial) {
|
if (!roomData.initial) {
|
||||||
logger.debug("initial flag not set but no stored room exists for room ", roomId, roomData);
|
this.syncOpts.logger.debug(
|
||||||
|
"initial flag not set but no stored room exists for room ",
|
||||||
|
roomId,
|
||||||
|
roomData,
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
room = _createAndReEmitRoom(this.client, roomId, this.opts);
|
room = _createAndReEmitRoom(this.client, roomId, this.opts);
|
||||||
@@ -366,7 +370,7 @@ export class SlidingSyncSdk {
|
|||||||
|
|
||||||
private onLifecycle(state: SlidingSyncState, resp: MSC3575SlidingSyncResponse | null, err?: Error): void {
|
private onLifecycle(state: SlidingSyncState, resp: MSC3575SlidingSyncResponse | null, err?: Error): void {
|
||||||
if (err) {
|
if (err) {
|
||||||
logger.debug("onLifecycle", state, err);
|
this.syncOpts.logger.debug("onLifecycle", state, err);
|
||||||
}
|
}
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case SlidingSyncState.Complete:
|
case SlidingSyncState.Complete:
|
||||||
@@ -407,7 +411,9 @@ export class SlidingSyncSdk {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
this.failCount = 0;
|
this.failCount = 0;
|
||||||
logger.log(`SlidingSyncState.RequestFinished with ${Object.keys(resp?.rooms || []).length} rooms`);
|
this.syncOpts.logger.debug(
|
||||||
|
`SlidingSyncState.RequestFinished with ${Object.keys(resp?.rooms || []).length} rooms`,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -526,7 +532,7 @@ export class SlidingSyncSdk {
|
|||||||
private shouldAbortSync(error: MatrixError): boolean {
|
private shouldAbortSync(error: MatrixError): boolean {
|
||||||
if (error.errcode === "M_UNKNOWN_TOKEN") {
|
if (error.errcode === "M_UNKNOWN_TOKEN") {
|
||||||
// The logout already happened, we just need to stop.
|
// The logout already happened, we just need to stop.
|
||||||
logger.warn("Token no longer valid - assuming logout");
|
this.syncOpts.logger.warn("Token no longer valid - assuming logout");
|
||||||
this.stop();
|
this.stop();
|
||||||
this.updateSyncState(SyncState.Error, { error });
|
this.updateSyncState(SyncState.Error, { error });
|
||||||
return true;
|
return true;
|
||||||
@@ -654,7 +660,7 @@ export class SlidingSyncSdk {
|
|||||||
for (let i = timelineEvents.length - 1; i >= 0; i--) {
|
for (let i = timelineEvents.length - 1; i >= 0; i--) {
|
||||||
const eventId = timelineEvents[i].getId();
|
const eventId = timelineEvents[i].getId();
|
||||||
if (room.getTimelineForEvent(eventId)) {
|
if (room.getTimelineForEvent(eventId)) {
|
||||||
logger.debug("Already have event " + eventId + " in limited " +
|
this.syncOpts.logger.debug("Already have event " + eventId + " in limited " +
|
||||||
"sync - not resetting");
|
"sync - not resetting");
|
||||||
limited = false;
|
limited = false;
|
||||||
|
|
||||||
@@ -863,19 +869,19 @@ export class SlidingSyncSdk {
|
|||||||
* Main entry point. Blocks until stop() is called.
|
* Main entry point. Blocks until stop() is called.
|
||||||
*/
|
*/
|
||||||
public async sync(): Promise<void> {
|
public async sync(): Promise<void> {
|
||||||
logger.debug("Sliding sync init loop");
|
this.syncOpts.logger.debug("Sliding sync init loop");
|
||||||
|
|
||||||
// 1) We need to get push rules so we can check if events should bing as we get
|
// 1) We need to get push rules so we can check if events should bing as we get
|
||||||
// them from /sync.
|
// them from /sync.
|
||||||
while (!this.client.isGuest()) {
|
while (!this.client.isGuest()) {
|
||||||
try {
|
try {
|
||||||
logger.debug("Getting push rules...");
|
this.syncOpts.logger.debug("Getting push rules...");
|
||||||
const result = await this.client.getPushRules();
|
const result = await this.client.getPushRules();
|
||||||
logger.debug("Got push rules");
|
this.syncOpts.logger.debug("Got push rules");
|
||||||
this.client.pushRules = result;
|
this.client.pushRules = result;
|
||||||
break;
|
break;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error("Getting push rules failed", err);
|
this.syncOpts.logger.error("Getting push rules failed", err);
|
||||||
if (this.shouldAbortSync(<MatrixError>err)) {
|
if (this.shouldAbortSync(<MatrixError>err)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -890,7 +896,7 @@ export class SlidingSyncSdk {
|
|||||||
* Stops the sync object from syncing.
|
* Stops the sync object from syncing.
|
||||||
*/
|
*/
|
||||||
public stop(): void {
|
public stop(): void {
|
||||||
logger.debug("SyncApi.stop");
|
this.syncOpts.logger.debug("SyncApi.stop");
|
||||||
this.slidingSync.stop();
|
this.slidingSync.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
95
src/sync.ts
95
src/sync.ts
@@ -31,7 +31,7 @@ import { NotificationCountType, Room, RoomEvent } from "./models/room.ts";
|
|||||||
import { deepCopy, noUnsafeEventProps, promiseMapSeries, unsafeProp } from "./utils.ts";
|
import { deepCopy, noUnsafeEventProps, promiseMapSeries, unsafeProp } from "./utils.ts";
|
||||||
import { Filter } from "./filter.ts";
|
import { Filter } from "./filter.ts";
|
||||||
import { EventTimeline } from "./models/event-timeline.ts";
|
import { EventTimeline } from "./models/event-timeline.ts";
|
||||||
import { logger } from "./logger.ts";
|
import { type Logger } from "./logger.ts";
|
||||||
import {
|
import {
|
||||||
ClientEvent,
|
ClientEvent,
|
||||||
type IStoredClientOpts,
|
type IStoredClientOpts,
|
||||||
@@ -68,8 +68,6 @@ import { UNREAD_THREAD_NOTIFICATIONS } from "./@types/sync.ts";
|
|||||||
import { Feature, ServerSupport } from "./feature.ts";
|
import { Feature, ServerSupport } from "./feature.ts";
|
||||||
import { KnownMembership } from "./@types/membership.ts";
|
import { KnownMembership } from "./@types/membership.ts";
|
||||||
|
|
||||||
const DEBUG = true;
|
|
||||||
|
|
||||||
// /sync requests allow you to set a timeout= but the request may continue
|
// /sync requests allow you to set a timeout= but the request may continue
|
||||||
// beyond that and wedge forever, so we need to track how long we are willing
|
// beyond that and wedge forever, so we need to track how long we are willing
|
||||||
// to keep open the connection. This constant is *ADDED* to the timeout= value
|
// to keep open the connection. This constant is *ADDED* to the timeout= value
|
||||||
@@ -112,12 +110,6 @@ function getFilterName(userId: string, suffix?: string): string {
|
|||||||
return `FILTER_SYNC_${userId}` + (suffix ? "_" + suffix : "");
|
return `FILTER_SYNC_${userId}` + (suffix ? "_" + suffix : "");
|
||||||
}
|
}
|
||||||
|
|
||||||
/* istanbul ignore next */
|
|
||||||
function debuglog(...params: any[]): void {
|
|
||||||
if (!DEBUG) return;
|
|
||||||
logger.log(...params);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Options passed into the constructor of SyncApi by MatrixClient
|
* Options passed into the constructor of SyncApi by MatrixClient
|
||||||
*/
|
*/
|
||||||
@@ -134,6 +126,9 @@ export interface SyncApiOptions {
|
|||||||
* there are other references to the timelines for this room.
|
* there are other references to the timelines for this room.
|
||||||
*/
|
*/
|
||||||
canResetEntireTimeline?: ResetTimelineCallback;
|
canResetEntireTimeline?: ResetTimelineCallback;
|
||||||
|
|
||||||
|
/** Logger instance to use for writing debug logs. */
|
||||||
|
logger: Logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface ISyncOptions {
|
interface ISyncOptions {
|
||||||
@@ -202,7 +197,7 @@ export function defaultClientOpts(opts?: IStoredClientOpts): IStoredClientOpts {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export function defaultSyncApiOpts(syncOpts?: SyncApiOptions): SyncApiOptions {
|
export function defaultSyncApiOpts(syncOpts: SyncApiOptions): SyncApiOptions {
|
||||||
return {
|
return {
|
||||||
canResetEntireTimeline: (_roomId): boolean => false,
|
canResetEntireTimeline: (_roomId): boolean => false,
|
||||||
...syncOpts,
|
...syncOpts,
|
||||||
@@ -236,8 +231,8 @@ export class SyncApi {
|
|||||||
*/
|
*/
|
||||||
public constructor(
|
public constructor(
|
||||||
private readonly client: MatrixClient,
|
private readonly client: MatrixClient,
|
||||||
opts?: IStoredClientOpts,
|
opts: IStoredClientOpts | undefined,
|
||||||
syncOpts?: SyncApiOptions,
|
syncOpts: SyncApiOptions,
|
||||||
) {
|
) {
|
||||||
this.opts = defaultClientOpts(opts);
|
this.opts = defaultClientOpts(opts);
|
||||||
this.syncOpts = defaultSyncApiOpts(syncOpts);
|
this.syncOpts = defaultSyncApiOpts(syncOpts);
|
||||||
@@ -277,7 +272,7 @@ export class SyncApi {
|
|||||||
// 2. If it's from the first state we're seeing after joining the room
|
// 2. If it's from the first state we're seeing after joining the room
|
||||||
// 3. Or whether it's coming from `syncFromCache`
|
// 3. Or whether it's coming from `syncFromCache`
|
||||||
if (timelineWasEmpty) {
|
if (timelineWasEmpty) {
|
||||||
logger.debug(
|
this.syncOpts.logger.debug(
|
||||||
`MarkerState: Ignoring markerEventId=${markerEvent.getId()} in roomId=${room.roomId} ` +
|
`MarkerState: Ignoring markerEventId=${markerEvent.getId()} in roomId=${room.roomId} ` +
|
||||||
`because the timeline was empty before the marker arrived which means there is nothing to refresh.`,
|
`because the timeline was empty before the marker arrived which means there is nothing to refresh.`,
|
||||||
);
|
);
|
||||||
@@ -309,14 +304,14 @@ export class SyncApi {
|
|||||||
if (isValidMsc2716Event) {
|
if (isValidMsc2716Event) {
|
||||||
// Saw new marker event, let's let the clients know they should
|
// Saw new marker event, let's let the clients know they should
|
||||||
// refresh the timeline.
|
// refresh the timeline.
|
||||||
logger.debug(
|
this.syncOpts.logger.debug(
|
||||||
`MarkerState: Timeline needs to be refreshed because ` +
|
`MarkerState: Timeline needs to be refreshed because ` +
|
||||||
`a new markerEventId=${markerEvent.getId()} was sent in roomId=${room.roomId}`,
|
`a new markerEventId=${markerEvent.getId()} was sent in roomId=${room.roomId}`,
|
||||||
);
|
);
|
||||||
room.setTimelineNeedsRefresh(true);
|
room.setTimelineNeedsRefresh(true);
|
||||||
room.emit(RoomEvent.HistoryImportedWithinTimeline, markerEvent, room);
|
room.emit(RoomEvent.HistoryImportedWithinTimeline, markerEvent, room);
|
||||||
} else {
|
} else {
|
||||||
logger.debug(
|
this.syncOpts.logger.debug(
|
||||||
`MarkerState: Ignoring markerEventId=${markerEvent.getId()} in roomId=${room.roomId} because ` +
|
`MarkerState: Ignoring markerEventId=${markerEvent.getId()} in roomId=${room.roomId} because ` +
|
||||||
`MSC2716 is not supported in the room version or for any room version, the marker wasn't sent ` +
|
`MSC2716 is not supported in the room version or for any room version, the marker wasn't sent ` +
|
||||||
`by the room creator.`,
|
`by the room creator.`,
|
||||||
@@ -489,7 +484,7 @@ export class SyncApi {
|
|||||||
*/
|
*/
|
||||||
private peekPoll(peekRoom: Room, token?: string): void {
|
private peekPoll(peekRoom: Room, token?: string): void {
|
||||||
if (this._peekRoom !== peekRoom) {
|
if (this._peekRoom !== peekRoom) {
|
||||||
debuglog("Stopped peeking in room %s", peekRoom.roomId);
|
this.syncOpts.logger.debug("Stopped peeking in room %s", peekRoom.roomId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -512,7 +507,7 @@ export class SyncApi {
|
|||||||
.then(
|
.then(
|
||||||
async (res) => {
|
async (res) => {
|
||||||
if (this._peekRoom !== peekRoom) {
|
if (this._peekRoom !== peekRoom) {
|
||||||
debuglog("Stopped peeking in room %s", peekRoom.roomId);
|
this.syncOpts.logger.debug("Stopped peeking in room %s", peekRoom.roomId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// We have a problem that we get presence both from /events and /sync
|
// We have a problem that we get presence both from /events and /sync
|
||||||
@@ -554,7 +549,7 @@ export class SyncApi {
|
|||||||
this.peekPoll(peekRoom, res.end);
|
this.peekPoll(peekRoom, res.end);
|
||||||
},
|
},
|
||||||
(err) => {
|
(err) => {
|
||||||
logger.error("[%s] Peek poll failed: %s", peekRoom.roomId, err);
|
this.syncOpts.logger.error("[%s] Peek poll failed: %s", peekRoom.roomId, err);
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
this.peekPoll(peekRoom, token);
|
this.peekPoll(peekRoom, token);
|
||||||
}, 30 * 1000);
|
}, 30 * 1000);
|
||||||
@@ -595,7 +590,7 @@ export class SyncApi {
|
|||||||
private shouldAbortSync(error: MatrixError): boolean {
|
private shouldAbortSync(error: MatrixError): boolean {
|
||||||
if (error.errcode === "M_UNKNOWN_TOKEN") {
|
if (error.errcode === "M_UNKNOWN_TOKEN") {
|
||||||
// The logout already happened, we just need to stop.
|
// The logout already happened, we just need to stop.
|
||||||
logger.warn("Token no longer valid - assuming logout");
|
this.syncOpts.logger.warn("Token no longer valid - assuming logout");
|
||||||
this.stop();
|
this.stop();
|
||||||
this.updateSyncState(SyncState.Error, { error });
|
this.updateSyncState(SyncState.Error, { error });
|
||||||
return true;
|
return true;
|
||||||
@@ -605,17 +600,17 @@ export class SyncApi {
|
|||||||
|
|
||||||
private getPushRules = async (): Promise<void> => {
|
private getPushRules = async (): Promise<void> => {
|
||||||
try {
|
try {
|
||||||
debuglog("Getting push rules...");
|
this.syncOpts.logger.debug("Getting push rules...");
|
||||||
const result = await this.client.getPushRules();
|
const result = await this.client.getPushRules();
|
||||||
debuglog("Got push rules");
|
this.syncOpts.logger.debug("Got push rules");
|
||||||
|
|
||||||
this.client.pushRules = result;
|
this.client.pushRules = result;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error("Getting push rules failed", err);
|
this.syncOpts.logger.error("Getting push rules failed", err);
|
||||||
if (this.shouldAbortSync(<MatrixError>err)) return;
|
if (this.shouldAbortSync(<MatrixError>err)) return;
|
||||||
// wait for saved sync to complete before doing anything else,
|
// wait for saved sync to complete before doing anything else,
|
||||||
// otherwise the sync state will end up being incorrect
|
// otherwise the sync state will end up being incorrect
|
||||||
debuglog("Waiting for saved sync before retrying push rules...");
|
this.syncOpts.logger.debug("Waiting for saved sync before retrying push rules...");
|
||||||
await this.recoverFromSyncStartupError(this.savedSyncPromise, <Error>err);
|
await this.recoverFromSyncStartupError(this.savedSyncPromise, <Error>err);
|
||||||
return this.getPushRules(); // try again
|
return this.getPushRules(); // try again
|
||||||
}
|
}
|
||||||
@@ -630,12 +625,12 @@ export class SyncApi {
|
|||||||
};
|
};
|
||||||
|
|
||||||
private prepareLazyLoadingForSync = async (): Promise<void> => {
|
private prepareLazyLoadingForSync = async (): Promise<void> => {
|
||||||
debuglog("Prepare lazy loading for sync...");
|
this.syncOpts.logger.debug("Prepare lazy loading for sync...");
|
||||||
if (this.client.isGuest()) {
|
if (this.client.isGuest()) {
|
||||||
this.opts.lazyLoadMembers = false;
|
this.opts.lazyLoadMembers = false;
|
||||||
}
|
}
|
||||||
if (this.opts.lazyLoadMembers) {
|
if (this.opts.lazyLoadMembers) {
|
||||||
debuglog("Enabling lazy load on sync filter...");
|
this.syncOpts.logger.debug("Enabling lazy load on sync filter...");
|
||||||
if (!this.opts.filter) {
|
if (!this.opts.filter) {
|
||||||
this.opts.filter = this.buildDefaultFilter();
|
this.opts.filter = this.buildDefaultFilter();
|
||||||
}
|
}
|
||||||
@@ -645,11 +640,11 @@ export class SyncApi {
|
|||||||
|
|
||||||
private storeClientOptions = async (): Promise<void> => {
|
private storeClientOptions = async (): Promise<void> => {
|
||||||
try {
|
try {
|
||||||
debuglog("Storing client options...");
|
this.syncOpts.logger.debug("Storing client options...");
|
||||||
await this.client.storeClientOptions();
|
await this.client.storeClientOptions();
|
||||||
debuglog("Stored client options");
|
this.syncOpts.logger.debug("Stored client options");
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error("Storing client options failed", err);
|
this.syncOpts.logger.error("Storing client options failed", err);
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -658,7 +653,7 @@ export class SyncApi {
|
|||||||
filterId?: string;
|
filterId?: string;
|
||||||
filter?: Filter;
|
filter?: Filter;
|
||||||
}> => {
|
}> => {
|
||||||
debuglog("Getting filter...");
|
this.syncOpts.logger.debug("Getting filter...");
|
||||||
let filter: Filter;
|
let filter: Filter;
|
||||||
if (this.opts.filter) {
|
if (this.opts.filter) {
|
||||||
filter = this.opts.filter;
|
filter = this.opts.filter;
|
||||||
@@ -670,11 +665,11 @@ export class SyncApi {
|
|||||||
try {
|
try {
|
||||||
filterId = await this.client.getOrCreateFilter(getFilterName(this.client.credentials.userId!), filter);
|
filterId = await this.client.getOrCreateFilter(getFilterName(this.client.credentials.userId!), filter);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error("Getting filter failed", err);
|
this.syncOpts.logger.error("Getting filter failed", err);
|
||||||
if (this.shouldAbortSync(<MatrixError>err)) return {};
|
if (this.shouldAbortSync(<MatrixError>err)) return {};
|
||||||
// wait for saved sync to complete before doing anything else,
|
// wait for saved sync to complete before doing anything else,
|
||||||
// otherwise the sync state will end up being incorrect
|
// otherwise the sync state will end up being incorrect
|
||||||
debuglog("Waiting for saved sync before retrying filter...");
|
this.syncOpts.logger.debug("Waiting for saved sync before retrying filter...");
|
||||||
await this.recoverFromSyncStartupError(this.savedSyncPromise, <Error>err);
|
await this.recoverFromSyncStartupError(this.savedSyncPromise, <Error>err);
|
||||||
return this.getFilter(); // try again
|
return this.getFilter(); // try again
|
||||||
}
|
}
|
||||||
@@ -700,22 +695,22 @@ export class SyncApi {
|
|||||||
// Pull the saved sync token out first, before the worker starts sending
|
// Pull the saved sync token out first, before the worker starts sending
|
||||||
// all the sync data which could take a while. This will let us send our
|
// all the sync data which could take a while. This will let us send our
|
||||||
// first incremental sync request before we've processed our saved data.
|
// first incremental sync request before we've processed our saved data.
|
||||||
debuglog("Getting saved sync token...");
|
this.syncOpts.logger.debug("Getting saved sync token...");
|
||||||
const savedSyncTokenPromise = this.client.store.getSavedSyncToken().then((tok) => {
|
const savedSyncTokenPromise = this.client.store.getSavedSyncToken().then((tok) => {
|
||||||
debuglog("Got saved sync token");
|
this.syncOpts.logger.debug("Got saved sync token");
|
||||||
return tok;
|
return tok;
|
||||||
});
|
});
|
||||||
|
|
||||||
this.savedSyncPromise = this.client.store
|
this.savedSyncPromise = this.client.store
|
||||||
.getSavedSync()
|
.getSavedSync()
|
||||||
.then((savedSync) => {
|
.then((savedSync) => {
|
||||||
debuglog(`Got reply from saved sync, exists? ${!!savedSync}`);
|
this.syncOpts.logger.debug(`Got reply from saved sync, exists? ${!!savedSync}`);
|
||||||
if (savedSync) {
|
if (savedSync) {
|
||||||
return this.syncFromCache(savedSync);
|
return this.syncFromCache(savedSync);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
logger.error("Getting saved sync failed", err);
|
this.syncOpts.logger.error("Getting saved sync failed", err);
|
||||||
});
|
});
|
||||||
|
|
||||||
// We need to do one-off checks before we can begin the /sync loop.
|
// We need to do one-off checks before we can begin the /sync loop.
|
||||||
@@ -747,9 +742,9 @@ export class SyncApi {
|
|||||||
const savedSyncToken = await savedSyncTokenPromise;
|
const savedSyncToken = await savedSyncTokenPromise;
|
||||||
|
|
||||||
if (savedSyncToken) {
|
if (savedSyncToken) {
|
||||||
debuglog("Sending first sync request...");
|
this.syncOpts.logger.debug("Sending first sync request...");
|
||||||
} else {
|
} else {
|
||||||
debuglog("Sending initial sync request...");
|
this.syncOpts.logger.debug("Sending initial sync request...");
|
||||||
const initialFilter = this.buildDefaultFilter();
|
const initialFilter = this.buildDefaultFilter();
|
||||||
initialFilter.setDefinition(filter.getDefinition());
|
initialFilter.setDefinition(filter.getDefinition());
|
||||||
initialFilter.setTimelineLimit(this.opts.initialSyncLimit!);
|
initialFilter.setTimelineLimit(this.opts.initialSyncLimit!);
|
||||||
@@ -763,7 +758,7 @@ export class SyncApi {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Now wait for the saved sync to finish...
|
// Now wait for the saved sync to finish...
|
||||||
debuglog("Waiting for saved sync before starting sync processing...");
|
this.syncOpts.logger.debug("Waiting for saved sync before starting sync processing...");
|
||||||
await this.savedSyncPromise;
|
await this.savedSyncPromise;
|
||||||
// process the first sync request and continue syncing with the normal filterId
|
// process the first sync request and continue syncing with the normal filterId
|
||||||
return this.doSync({ filter: filterId });
|
return this.doSync({ filter: filterId });
|
||||||
@@ -773,7 +768,7 @@ export class SyncApi {
|
|||||||
* Stops the sync object from syncing.
|
* Stops the sync object from syncing.
|
||||||
*/
|
*/
|
||||||
public stop(): void {
|
public stop(): void {
|
||||||
debuglog("SyncApi.stop");
|
this.syncOpts.logger.debug("SyncApi.stop");
|
||||||
// It is necessary to check for the existance of
|
// It is necessary to check for the existance of
|
||||||
// globalThis.window AND globalThis.window.removeEventListener.
|
// globalThis.window AND globalThis.window.removeEventListener.
|
||||||
// Some platforms (e.g. React Native) register globalThis.window,
|
// Some platforms (e.g. React Native) register globalThis.window,
|
||||||
@@ -805,7 +800,7 @@ export class SyncApi {
|
|||||||
* should have been acquired via client.store.getSavedSync().
|
* should have been acquired via client.store.getSavedSync().
|
||||||
*/
|
*/
|
||||||
private async syncFromCache(savedSync: ISavedSync): Promise<void> {
|
private async syncFromCache(savedSync: ISavedSync): Promise<void> {
|
||||||
debuglog("sync(): not doing HTTP hit, instead returning stored /sync data");
|
this.syncOpts.logger.debug("sync(): not doing HTTP hit, instead returning stored /sync data");
|
||||||
|
|
||||||
const nextSyncToken = savedSync.nextBatch;
|
const nextSyncToken = savedSync.nextBatch;
|
||||||
|
|
||||||
@@ -830,7 +825,7 @@ export class SyncApi {
|
|||||||
try {
|
try {
|
||||||
await this.processSyncResponse(syncEventData, data);
|
await this.processSyncResponse(syncEventData, data);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error("Error processing cached sync", e);
|
this.syncOpts.logger.error("Error processing cached sync", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't emit a prepared if we've bailed because the store is invalid:
|
// Don't emit a prepared if we've bailed because the store is invalid:
|
||||||
@@ -881,7 +876,7 @@ export class SyncApi {
|
|||||||
} catch (e) {
|
} catch (e) {
|
||||||
// log the exception with stack if we have it, else fall back
|
// log the exception with stack if we have it, else fall back
|
||||||
// to the plain description
|
// to the plain description
|
||||||
logger.error("Caught /sync error", e);
|
this.syncOpts.logger.error("Caught /sync error", e);
|
||||||
|
|
||||||
// Emit the exception for client handling
|
// Emit the exception for client handling
|
||||||
this.client.emit(ClientEvent.SyncUnexpectedError, <Error>e);
|
this.client.emit(ClientEvent.SyncUnexpectedError, <Error>e);
|
||||||
@@ -916,7 +911,7 @@ export class SyncApi {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!this.running) {
|
if (!this.running) {
|
||||||
debuglog("Sync no longer running: exiting.");
|
this.syncOpts.logger.debug("Sync no longer running: exiting.");
|
||||||
if (this.connectionReturnedResolvers) {
|
if (this.connectionReturnedResolvers) {
|
||||||
this.connectionReturnedResolvers.reject();
|
this.connectionReturnedResolvers.reject();
|
||||||
this.connectionReturnedResolvers = undefined;
|
this.connectionReturnedResolvers = undefined;
|
||||||
@@ -999,7 +994,7 @@ export class SyncApi {
|
|||||||
|
|
||||||
private async onSyncError(err: MatrixError): Promise<boolean> {
|
private async onSyncError(err: MatrixError): Promise<boolean> {
|
||||||
if (!this.running) {
|
if (!this.running) {
|
||||||
debuglog("Sync no longer running: exiting");
|
this.syncOpts.logger.debug("Sync no longer running: exiting");
|
||||||
if (this.connectionReturnedResolvers) {
|
if (this.connectionReturnedResolvers) {
|
||||||
this.connectionReturnedResolvers.reject();
|
this.connectionReturnedResolvers.reject();
|
||||||
this.connectionReturnedResolvers = undefined;
|
this.connectionReturnedResolvers = undefined;
|
||||||
@@ -1008,16 +1003,16 @@ export class SyncApi {
|
|||||||
return true; // abort
|
return true; // abort
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.error("/sync error %s", err);
|
this.syncOpts.logger.error("/sync error %s", err);
|
||||||
|
|
||||||
if (this.shouldAbortSync(err)) {
|
if (this.shouldAbortSync(err)) {
|
||||||
return true; // abort
|
return true; // abort
|
||||||
}
|
}
|
||||||
|
|
||||||
this.failedSyncCount++;
|
this.failedSyncCount++;
|
||||||
logger.log("Number of consecutive failed sync requests:", this.failedSyncCount);
|
this.syncOpts.logger.debug("Number of consecutive failed sync requests:", this.failedSyncCount);
|
||||||
|
|
||||||
debuglog("Starting keep-alive");
|
this.syncOpts.logger.debug("Starting keep-alive");
|
||||||
// Note that we do *not* mark the sync connection as
|
// Note that we do *not* mark the sync connection as
|
||||||
// lost yet: we only do this if a keepalive poke
|
// lost yet: we only do this if a keepalive poke
|
||||||
// fails, since long lived HTTP connections will
|
// fails, since long lived HTTP connections will
|
||||||
@@ -1324,7 +1319,7 @@ export class SyncApi {
|
|||||||
for (let i = timelineEvents.length - 1; i >= 0; i--) {
|
for (let i = timelineEvents.length - 1; i >= 0; i--) {
|
||||||
const eventId = timelineEvents[i].getId()!;
|
const eventId = timelineEvents[i].getId()!;
|
||||||
if (room.getTimelineForEvent(eventId)) {
|
if (room.getTimelineForEvent(eventId)) {
|
||||||
debuglog(`Already have event ${eventId} in limited sync - not resetting`);
|
this.syncOpts.logger.debug(`Already have event ${eventId} in limited sync - not resetting`);
|
||||||
limited = false;
|
limited = false;
|
||||||
|
|
||||||
// we might still be missing some of the events before i;
|
// we might still be missing some of the events before i;
|
||||||
@@ -1384,7 +1379,7 @@ export class SyncApi {
|
|||||||
await this.injectRoomEvents(room, stateEvents, undefined, timelineEvents, syncEventData.fromCache);
|
await this.injectRoomEvents(room, stateEvents, undefined, timelineEvents, syncEventData.fromCache);
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(`Failed to process events on room ${room.roomId}:`, e);
|
this.syncOpts.logger.error(`Failed to process events on room ${room.roomId}:`, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// set summary after processing events,
|
// set summary after processing events,
|
||||||
@@ -1865,7 +1860,7 @@ export class SyncApi {
|
|||||||
* but this might help us reconnect a little faster.
|
* but this might help us reconnect a little faster.
|
||||||
*/
|
*/
|
||||||
private onOnline = (): void => {
|
private onOnline = (): void => {
|
||||||
debuglog("Browser thinks we are back online");
|
this.syncOpts.logger.debug("Browser thinks we are back online");
|
||||||
this.startKeepAlives(0);
|
this.startKeepAlives(0);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user