1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-07-31 15:24:23 +03:00

Refactor Sync and fix initialSyncLimit (#2587)

* Small tidy-up to sync.ts

* Convert doSync into a while loop

* Apply `initialSyncLimit` only to initial syncs

* Convert matrix-client-syncing spec to TS

* Add tests around initial sync filtering

* Switch confusing filterId field for `filter`

* Tweak doSync error control flow

* Fix error control flow intricacies

* use includes

* Add tests

* Fix some strict mode errors

* Fix more strict mode errors

* Fix some strict mode errors
This commit is contained in:
Michael Telatynski
2022-08-23 16:25:54 +01:00
committed by GitHub
parent 5e4474b959
commit b789cc5933
4 changed files with 575 additions and 494 deletions

View File

@ -147,9 +147,9 @@ export function mkEventCustom<T>(base: T): T & GeneratedMetadata {
interface IPresenceOpts {
user?: string;
sender?: string;
url: string;
name: string;
ago: number;
url?: string;
name?: string;
ago?: number;
presence?: string;
event?: boolean;
}

View File

@ -396,8 +396,7 @@ export interface IStartClientOpts {
pollTimeout?: number;
/**
* The filter to apply to /sync calls. This will override the opts.initialSyncLimit, which would
* normally result in a timeline limit filter.
* The filter to apply to /sync calls.
*/
filter?: Filter;

View File

@ -23,6 +23,8 @@ limitations under the License.
* for HTTP and WS at some point.
*/
import { Optional } from "matrix-events-sdk";
import { User, UserEvent } from "./models/user";
import { NotificationCountType, Room, RoomEvent } from "./models/room";
import * as utils from "./utils";
@ -100,18 +102,16 @@ const MSC2716_ROOM_VERSIONS = [
function getFilterName(userId: string, suffix?: string): string {
// scope this on the user ID because people may login on many accounts
// and they all need to be stored!
return "FILTER_SYNC_" + userId + (suffix ? "_" + suffix : "");
return `FILTER_SYNC_${userId}` + suffix ? "_" + suffix : "";
}
function debuglog(...params) {
if (!DEBUG) {
return;
}
if (!DEBUG) return;
logger.log(...params);
}
interface ISyncOptions {
filterId?: string;
filter?: string;
hasSyncedBefore?: boolean;
}
@ -161,14 +161,14 @@ type WrappedRoom<T> = T & {
* updating presence.
*/
export class SyncApi {
private _peekRoom: Room = null;
private currentSyncRequest: IAbortablePromise<ISyncResponse> = null;
private syncState: SyncState = null;
private syncStateData: ISyncStateData = null; // additional data (eg. error object for failed sync)
private _peekRoom: Optional<Room> = null;
private currentSyncRequest: Optional<IAbortablePromise<ISyncResponse>> = null;
private syncState: Optional<SyncState> = null;
private syncStateData: Optional<ISyncStateData> = null; // additional data (eg. error object for failed sync)
private catchingUp = false;
private running = false;
private keepAliveTimer: ReturnType<typeof setTimeout> = null;
private connectionReturnedDefer: IDeferred<boolean> = null;
private keepAliveTimer: Optional<ReturnType<typeof setTimeout>> = null;
private connectionReturnedDefer: Optional<IDeferred<boolean>> = null;
private notifEvents: MatrixEvent[] = []; // accumulator of sync events in the current sync response
private failedSyncCount = 0; // Number of consecutive failed /sync requests
private storeIsInvalid = false; // flag set if the store needs to be cleared before we can start
@ -214,7 +214,7 @@ export class SyncApi {
* historical messages are shown when we paginate `/messages` again.
* @param {Room} room The room where the marker event was sent
* @param {MatrixEvent} markerEvent The new marker event
* @param {ISetStateOptions} setStateOptions When `timelineWasEmpty` is set
* @param {IMarkerFoundOptions} setStateOptions When `timelineWasEmpty` is set
* as `true`, the given marker event will be ignored
*/
private onMarkerStateEvent(
@ -367,7 +367,7 @@ export class SyncApi {
// XXX: copypasted from /sync until we kill off this minging v1 API stuff)
// handle presence events (User objects)
if (response.presence && Array.isArray(response.presence)) {
if (Array.isArray(response.presence)) {
response.presence.map(client.getEventMapper()).forEach(
function(presenceEvent) {
let user = client.store.getUser(presenceEvent.getContent().user_id);
@ -542,67 +542,40 @@ export class SyncApi {
return false;
}
/**
* Main entry point
*/
public sync(): void {
const client = this.client;
this.running = true;
if (global.window && global.window.addEventListener) {
global.window.addEventListener("online", this.onOnline, false);
}
let savedSyncPromise = Promise.resolve();
let savedSyncToken = null;
// We need to do one-off checks before we can begin the /sync loop.
// These are:
// 1) We need to get push rules so we can check if events should bing as we get
// them from /sync.
// 2) We need to get/create a filter which we can use for /sync.
// 3) We need to check the lazy loading option matches what was used in the
// stored sync. If it doesn't, we can't use the stored sync.
const getPushRules = async () => {
private getPushRules = async () => {
try {
debuglog("Getting push rules...");
const result = await client.getPushRules();
const result = await this.client.getPushRules();
debuglog("Got push rules");
client.pushRules = result;
this.client.pushRules = result;
} catch (err) {
logger.error("Getting push rules failed", err);
if (this.shouldAbortSync(err)) return;
// wait for saved sync to complete before doing anything else,
// otherwise the sync state will end up being incorrect
debuglog("Waiting for saved sync before retrying push rules...");
await this.recoverFromSyncStartupError(savedSyncPromise, err);
getPushRules();
return;
await this.recoverFromSyncStartupError(this.savedSyncPromise, err);
return this.getPushRules(); // try again
}
checkLazyLoadStatus(); // advance to the next stage
};
const buildDefaultFilter = () => {
const filter = new Filter(client.credentials.userId);
filter.setTimelineLimit(this.opts.initialSyncLimit);
return filter;
private buildDefaultFilter = () => {
return new Filter(this.client.credentials.userId);
};
const checkLazyLoadStatus = async () => {
private checkLazyLoadStatus = async () => {
debuglog("Checking lazy load status...");
if (this.opts.lazyLoadMembers && client.isGuest()) {
if (this.opts.lazyLoadMembers && this.client.isGuest()) {
this.opts.lazyLoadMembers = false;
}
if (this.opts.lazyLoadMembers) {
debuglog("Checking server lazy load support...");
const supported = await client.doesServerSupportLazyLoading();
const supported = await this.client.doesServerSupportLazyLoading();
if (supported) {
debuglog("Enabling lazy load on sync filter...");
if (!this.opts.filter) {
this.opts.filter = buildDefaultFilter();
this.opts.filter = this.buildDefaultFilter();
}
this.opts.filter.setLazyLoadMembers(true);
} else {
@ -626,8 +599,8 @@ export class SyncApi {
logger.warn("InvalidStoreError: store is not usable: stopping sync.");
return;
}
if (this.opts.lazyLoadMembers && this.opts.crypto) {
this.opts.crypto.enableLazyLoading();
if (this.opts.lazyLoadMembers) {
this.opts.crypto?.enableLazyLoading();
}
try {
debuglog("Storing client options...");
@ -637,65 +610,60 @@ export class SyncApi {
logger.error("Storing client options failed", err);
throw err;
}
getFilter(); // Now get the filter and start syncing
};
const getFilter = async () => {
private getFilter = async (): Promise<{
filterId?: string;
filter?: Filter;
}> => {
debuglog("Getting filter...");
let filter;
let filter: Filter;
if (this.opts.filter) {
filter = this.opts.filter;
} else {
filter = buildDefaultFilter();
filter = this.buildDefaultFilter();
}
let filterId;
let filterId: string;
try {
filterId = await client.getOrCreateFilter(getFilterName(client.credentials.userId), filter);
filterId = await this.client.getOrCreateFilter(getFilterName(this.client.credentials.userId), filter);
} catch (err) {
logger.error("Getting filter failed", err);
if (this.shouldAbortSync(err)) return;
if (this.shouldAbortSync(err)) return {};
// wait for saved sync to complete before doing anything else,
// otherwise the sync state will end up being incorrect
debuglog("Waiting for saved sync before retrying filter...");
await this.recoverFromSyncStartupError(savedSyncPromise, err);
getFilter();
return;
await this.recoverFromSyncStartupError(this.savedSyncPromise, err);
return this.getFilter(); // try again
}
// reset the notifications timeline to prepare it to paginate from
// the current point in time.
// The right solution would be to tie /sync pagination tokens into
// /notifications API somehow.
client.resetNotifTimelineSet();
if (this.currentSyncRequest === null) {
// Send this first sync request here so we can then wait for the saved
// sync data to finish processing before we process the results of this one.
debuglog("Sending first sync request...");
this.currentSyncRequest = this.doSyncRequest({ filterId }, savedSyncToken);
}
// Now wait for the saved sync to finish...
debuglog("Waiting for saved sync before starting sync processing...");
await savedSyncPromise;
this.doSync({ filterId });
return { filter, filterId };
};
if (client.isGuest()) {
private savedSyncPromise: Promise<void>;
/**
* Main entry point
*/
public async sync(): Promise<void> {
this.running = true;
global.window?.addEventListener?.("online", this.onOnline, false);
if (this.client.isGuest()) {
// no push rules for guests, no access to POST filter for guests.
this.doSync({});
} else {
return this.doSync({});
}
// Pull the saved sync token out first, before the worker starts sending
// all the sync data which could take a while. This will let us send our
// first incremental sync request before we've processed our saved data.
debuglog("Getting saved sync token...");
savedSyncPromise = client.store.getSavedSyncToken().then((tok) => {
const savedSyncTokenPromise = this.client.store.getSavedSyncToken().then(tok => {
debuglog("Got saved sync token");
savedSyncToken = tok;
debuglog("Getting saved sync...");
return client.store.getSavedSync();
}).then((savedSync) => {
return tok;
});
this.savedSyncPromise = this.client.store.getSavedSync().then((savedSync) => {
debuglog(`Got reply from saved sync, exists? ${!!savedSync}`);
if (savedSync) {
return this.syncFromCache(savedSync);
@ -703,11 +671,54 @@ export class SyncApi {
}).catch(err => {
logger.error("Getting saved sync failed", err);
});
// We need to do one-off checks before we can begin the /sync loop.
// These are:
// 1) We need to get push rules so we can check if events should bing as we get
// them from /sync.
// 2) We need to get/create a filter which we can use for /sync.
// 3) We need to check the lazy loading option matches what was used in the
// stored sync. If it doesn't, we can't use the stored sync.
// Now start the first incremental sync request: this can also
// take a while so if we set it going now, we can wait for it
// to finish while we process our saved sync data.
getPushRules();
await this.getPushRules();
await this.checkLazyLoadStatus();
const { filterId, filter } = await this.getFilter();
if (!filter) return; // bail, getFilter failed
// reset the notifications timeline to prepare it to paginate from
// the current point in time.
// The right solution would be to tie /sync pagination tokens into
// /notifications API somehow.
this.client.resetNotifTimelineSet();
if (this.currentSyncRequest === null) {
let firstSyncFilter = filterId;
const savedSyncToken = await savedSyncTokenPromise;
if (savedSyncToken) {
debuglog("Sending first sync request...");
} else {
debuglog("Sending initial sync request...");
const initialFilter = this.buildDefaultFilter();
initialFilter.setDefinition(filter.getDefinition());
initialFilter.setTimelineLimit(this.opts.initialSyncLimit);
// Use an inline filter, no point uploading it for a single usage
firstSyncFilter = JSON.stringify(initialFilter.getDefinition());
}
// Send this first sync request here so we can then wait for the saved
// sync data to finish processing before we process the results of this one.
this.currentSyncRequest = this.doSyncRequest({ filter: firstSyncFilter }, savedSyncToken);
}
// Now wait for the saved sync to finish...
debuglog("Waiting for saved sync before starting sync processing...");
await this.savedSyncPromise;
// process the first sync request and continue syncing with the normal filterId
return this.doSync({ filter: filterId });
}
/**
@ -719,9 +730,7 @@ export class SyncApi {
// global.window AND global.window.removeEventListener.
// Some platforms (e.g. React Native) register global.window,
// but do not have global.window.removeEventListener.
if (global.window && global.window.removeEventListener) {
global.window.removeEventListener("online", this.onOnline, false);
}
global.window?.removeEventListener?.("online", this.onOnline, false);
this.running = false;
this.currentSyncRequest?.abort();
if (this.keepAliveTimer) {
@ -756,8 +765,7 @@ export class SyncApi {
this.client.store.setSyncToken(nextSyncToken);
// No previous sync, set old token to null
const syncEventData = {
oldSyncToken: null,
const syncEventData: ISyncStateData = {
nextSyncToken,
catchingUp: false,
fromCache: true,
@ -792,21 +800,10 @@ export class SyncApi {
* @param {boolean} syncOptions.hasSyncedBefore
*/
private async doSync(syncOptions: ISyncOptions): Promise<void> {
const client = this.client;
while (this.running) {
const syncToken = this.client.store.getSyncToken();
if (!this.running) {
debuglog("Sync no longer running: exiting.");
if (this.connectionReturnedDefer) {
this.connectionReturnedDefer.reject();
this.connectionReturnedDefer = null;
}
this.updateSyncState(SyncState.Stopped);
return;
}
const syncToken = client.store.getSyncToken();
let data;
let data: ISyncResponse;
try {
//debuglog('Starting sync since=' + syncToken);
if (this.currentSyncRequest === null) {
@ -814,8 +811,9 @@ export class SyncApi {
}
data = await this.currentSyncRequest;
} catch (e) {
this.onSyncError(e, syncOptions);
return;
const abort = await this.onSyncError(e);
if (abort) return;
continue;
} finally {
this.currentSyncRequest = null;
}
@ -825,12 +823,12 @@ export class SyncApi {
// set the sync token NOW *before* processing the events. We do this so
// if something barfs on an event we can skip it rather than constantly
// polling with the same token.
client.store.setSyncToken(data.next_batch);
this.client.store.setSyncToken(data.next_batch);
// Reset after a successful sync
this.failedSyncCount = 0;
await client.store.setSyncData(data);
await this.client.store.setSyncData(data);
const syncEventData = {
oldSyncToken: syncToken,
@ -873,7 +871,7 @@ export class SyncApi {
// keep emitting SYNCING -> SYNCING for clients who want to do bulk updates
this.updateSyncState(SyncState.Syncing, syncEventData);
if (client.store.wantsSave()) {
if (this.client.store.wantsSave()) {
// We always save the device list (if it's dirty) before saving the sync data:
// this means we know the saved device list data is at least as fresh as the
// stored sync data which means we don't have to worry that we may have missed
@ -884,11 +882,18 @@ export class SyncApi {
}
// tell databases that everything is now in a consistent state and can be saved.
client.store.save();
this.client.store.save();
}
}
// Begin next sync
this.doSync(syncOptions);
if (!this.running) {
debuglog("Sync no longer running: exiting.");
if (this.connectionReturnedDefer) {
this.connectionReturnedDefer.reject();
this.connectionReturnedDefer = null;
}
this.updateSyncState(SyncState.Stopped);
}
}
private doSyncRequest(syncOptions: ISyncOptions, syncToken: string): IAbortablePromise<ISyncResponse> {
@ -902,7 +907,7 @@ export class SyncApi {
private getSyncParams(syncOptions: ISyncOptions, syncToken: string): ISyncParams {
let pollTimeout = this.opts.pollTimeout;
if (this.getSyncState() !== 'SYNCING' || this.catchingUp) {
if (this.getSyncState() !== SyncState.Syncing || this.catchingUp) {
// unless we are happily syncing already, we want the server to return
// as quickly as possible, even if there are no events queued. This
// serves two purposes:
@ -918,13 +923,13 @@ export class SyncApi {
pollTimeout = 0;
}
let filterId = syncOptions.filterId;
if (this.client.isGuest() && !filterId) {
filterId = this.getGuestFilter();
let filter = syncOptions.filter;
if (this.client.isGuest() && !filter) {
filter = this.getGuestFilter();
}
const qps: ISyncParams = {
filter: filterId,
filter,
timeout: pollTimeout,
};
@ -941,7 +946,7 @@ export class SyncApi {
qps._cacheBuster = Date.now();
}
if (this.getSyncState() == 'ERROR' || this.getSyncState() == 'RECONNECTING') {
if ([SyncState.Reconnecting, SyncState.Error].includes(this.getSyncState())) {
// we think the connection is dead. If it comes back up, we won't know
// about it till /sync returns. If the timeout= is high, this could
// be a long time. Set it to 0 when doing retries so we don't have to wait
@ -952,7 +957,7 @@ export class SyncApi {
return qps;
}
private onSyncError(err: MatrixError, syncOptions: ISyncOptions): void {
private async onSyncError(err: MatrixError): Promise<boolean> {
if (!this.running) {
debuglog("Sync no longer running: exiting");
if (this.connectionReturnedDefer) {
@ -960,14 +965,13 @@ export class SyncApi {
this.connectionReturnedDefer = null;
}
this.updateSyncState(SyncState.Stopped);
return;
return true; // abort
}
logger.error("/sync error %s", err);
logger.error(err);
if (this.shouldAbortSync(err)) {
return;
return true; // abort
}
this.failedSyncCount++;
@ -981,20 +985,7 @@ export class SyncApi {
// erroneous. We set the state to 'reconnecting'
// instead, so that clients can observe this state
// if they wish.
this.startKeepAlives().then((connDidFail) => {
// Only emit CATCHUP if we detected a connectivity error: if we didn't,
// it's quite likely the sync will fail again for the same reason and we
// want to stay in ERROR rather than keep flip-flopping between ERROR
// and CATCHUP.
if (connDidFail && this.getSyncState() === SyncState.Error) {
this.updateSyncState(SyncState.Catchup, {
oldSyncToken: null,
nextSyncToken: null,
catchingUp: true,
});
}
this.doSync(syncOptions);
});
const keepAlivePromise = this.startKeepAlives();
this.currentSyncRequest = null;
// Transition from RECONNECTING to ERROR after a given number of failed syncs
@ -1003,6 +994,19 @@ export class SyncApi {
SyncState.Error : SyncState.Reconnecting,
{ error: err },
);
const connDidFail = await keepAlivePromise;
// Only emit CATCHUP if we detected a connectivity error: if we didn't,
// it's quite likely the sync will fail again for the same reason and we
// want to stay in ERROR rather than keep flip-flopping between ERROR
// and CATCHUP.
if (connDidFail && this.getSyncState() === SyncState.Error) {
this.updateSyncState(SyncState.Catchup, {
catchingUp: true,
});
}
return false;
}
/**
@ -1061,7 +1065,7 @@ export class SyncApi {
// - The isBrandNewRoom boilerplate is boilerplatey.
// handle presence events (User objects)
if (data.presence && Array.isArray(data.presence.events)) {
if (Array.isArray(data.presence?.events)) {
data.presence.events.map(client.getEventMapper()).forEach(
function(presenceEvent) {
let user = client.store.getUser(presenceEvent.getSender());
@ -1077,7 +1081,7 @@ export class SyncApi {
}
// handle non-room account_data
if (data.account_data && Array.isArray(data.account_data.events)) {
if (Array.isArray(data.account_data?.events)) {
const events = data.account_data.events.map(client.getEventMapper());
const prevEventsMap = events.reduce((m, c) => {
m[c.getId()] = client.store.getAccountData(c.getType());
@ -1218,8 +1222,7 @@ export class SyncApi {
// bother setting it here. We trust our calculations better than the
// server's for this case, and therefore will assume that our non-zero
// count is accurate.
if (!encrypted
|| (encrypted && room.getUnreadNotificationCount(NotificationCountType.Highlight) <= 0)) {
if (!encrypted || room.getUnreadNotificationCount(NotificationCountType.Highlight) <= 0) {
room.setUnreadNotificationCount(
NotificationCountType.Highlight,
joinObj.unread_notifications.highlight_count,
@ -1232,8 +1235,7 @@ export class SyncApi {
if (joinObj.isBrandNewRoom) {
// set the back-pagination token. Do this *before* adding any
// events so that clients can start back-paginating.
room.getLiveTimeline().setPaginationToken(
joinObj.timeline.prev_batch, EventTimeline.BACKWARDS);
room.getLiveTimeline().setPaginationToken(joinObj.timeline.prev_batch, EventTimeline.BACKWARDS);
} else if (joinObj.timeline.limited) {
let limited = true;