You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-11-29 16:43:09 +03:00
Start first incremental sync request early (#629)
* Start first incremental sync request early So it can run while we process our sync data.
This commit is contained in:
committed by
Luke Barnard
parent
349297e495
commit
16c062c069
@@ -137,6 +137,7 @@ describe("MatrixClient", function() {
|
||||
"getSyncAccumulator", "startup", "deleteAllData",
|
||||
].reduce((r, k) => { r[k] = expect.createSpy(); return r; }, {});
|
||||
store.getSavedSync = expect.createSpy().andReturn(Promise.resolve(null));
|
||||
store.getSavedSyncToken = expect.createSpy().andReturn(Promise.resolve(null));
|
||||
store.setSyncData = expect.createSpy().andReturn(Promise.resolve(null));
|
||||
client = new MatrixClient({
|
||||
baseUrl: "https://my.home.server",
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
/*
|
||||
Copyright 2017 Vector Creations Ltd
|
||||
Copyright 2018 New Vector Ltd
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -243,6 +244,10 @@ LocalIndexedDBStoreBackend.prototype = {
|
||||
}
|
||||
},
|
||||
|
||||
getNextBatchToken: function() {
|
||||
return Promise.resolve(this._syncAccumulator.getNextBatchToken());
|
||||
},
|
||||
|
||||
setSyncData: function(syncData) {
|
||||
return Promise.resolve().then(() => {
|
||||
this._syncAccumulator.accumulate(syncData);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
/*
|
||||
Copyright 2017 Vector Creations Ltd
|
||||
Copyright 2018 New Vector Ltd
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -74,6 +75,10 @@ RemoteIndexedDBStoreBackend.prototype = {
|
||||
return this._doCmd('getSavedSync');
|
||||
},
|
||||
|
||||
getNextBatchToken: function() {
|
||||
return this._doCmd('getNextBatchToken');
|
||||
},
|
||||
|
||||
setSyncData: function(syncData) {
|
||||
return this._doCmd('setSyncData', [syncData]);
|
||||
},
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
/*
|
||||
Copyright 2017 Vector Creations Ltd
|
||||
Copyright 2018 New Vector Ltd
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -88,6 +89,9 @@ class IndexedDBStoreWorker {
|
||||
case 'getUserPresenceEvents':
|
||||
prom = this.backend.getUserPresenceEvents();
|
||||
break;
|
||||
case 'getNextBatchToken':
|
||||
prom = this.backend.getNextBatchToken();
|
||||
break;
|
||||
}
|
||||
|
||||
if (prom === undefined) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
/*
|
||||
Copyright 2017 Vector Creations Ltd
|
||||
Copyright 2018 New Vector Ltd
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -145,6 +146,14 @@ IndexedDBStore.prototype.getSavedSync = function() {
|
||||
return this.backend.getSavedSync();
|
||||
};
|
||||
|
||||
/**
|
||||
* @return {Promise} If there is a saved sync, the nextBatch token
|
||||
* for this sync, otherwise null.
|
||||
*/
|
||||
IndexedDBStore.prototype.getSavedSyncToken = function() {
|
||||
return this.backend.getNextBatchToken();
|
||||
},
|
||||
|
||||
/**
|
||||
* Delete all data from this store.
|
||||
* @return {Promise} Resolves if the data was deleted from the database.
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
/*
|
||||
Copyright 2015, 2016 OpenMarket Ltd
|
||||
Copyright 2017 Vector Creations Ltd
|
||||
Copyright 2018 New Vector Ltd
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -346,6 +347,14 @@ module.exports.MatrixInMemoryStore.prototype = {
|
||||
return Promise.resolve(null);
|
||||
},
|
||||
|
||||
/**
|
||||
* @return {Promise} If there is a saved sync, the nextBatch token
|
||||
* for this sync, otherwise null.
|
||||
*/
|
||||
getSavedSyncToken: function() {
|
||||
return Promise.resolve(null);
|
||||
},
|
||||
|
||||
/**
|
||||
* Delete all data from this store.
|
||||
* @return {Promise} An immediately resolved promise.
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
/*
|
||||
Copyright 2015, 2016 OpenMarket Ltd
|
||||
Copyright 2017 Vector Creations Ltd
|
||||
Copyright 2018 New Vector Ltd
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -247,6 +248,14 @@ StubStore.prototype = {
|
||||
return Promise.resolve(null);
|
||||
},
|
||||
|
||||
/**
|
||||
* @return {Promise} If there is a saved sync, the nextBatch token
|
||||
* for this sync, otherwise null.
|
||||
*/
|
||||
getSavedSyncToken: function() {
|
||||
return Promise.resolve(null);
|
||||
},
|
||||
|
||||
/**
|
||||
* Delete all data from this store. Does nothing since this store
|
||||
* doesn't store anything.
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
/*
|
||||
Copyright 2017 Vector Creations Ltd
|
||||
Copyright 2018 New Vector Ltd
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
@@ -520,6 +521,10 @@ class SyncAccumulator {
|
||||
accountData: accData,
|
||||
};
|
||||
}
|
||||
|
||||
getNextBatchToken() {
|
||||
return this.nextBatch;
|
||||
}
|
||||
}
|
||||
|
||||
function setState(eventMap, event) {
|
||||
|
||||
219
src/sync.js
219
src/sync.js
@@ -396,6 +396,17 @@ SyncApi.prototype.getSyncState = function() {
|
||||
return this._syncState;
|
||||
};
|
||||
|
||||
SyncApi.prototype.recoverFromSyncStartupError = async function(savedSyncPromise, err) {
|
||||
// Wait for the saved sync to complete - we send the pushrules and filter requests
|
||||
// before the saved sync has finished so they can run in parallel, but only process
|
||||
// the results after the saved sync is done. Equivalently, we wait for it to finish
|
||||
// before reporting failures from these functions.
|
||||
await savedSyncPromise;
|
||||
const keepaliveProm = this._startKeepAlives();
|
||||
this._updateSyncState("ERROR", { error: err });
|
||||
await keepaliveProm;
|
||||
};
|
||||
|
||||
/**
|
||||
* Main entry point
|
||||
*/
|
||||
@@ -410,28 +421,32 @@ SyncApi.prototype.sync = function() {
|
||||
global.document.addEventListener("online", this._onOnlineBound, false);
|
||||
}
|
||||
|
||||
let savedSyncPromise = Promise.resolve();
|
||||
let savedSyncToken = null;
|
||||
|
||||
// We need to do one-off checks before we can begin the /sync loop.
|
||||
// These are:
|
||||
// 1) We need to get push rules so we can check if events should bing as we get
|
||||
// them from /sync.
|
||||
// 2) We need to get/create a filter which we can use for /sync.
|
||||
|
||||
function getPushRules() {
|
||||
client.getPushRules().done((result) => {
|
||||
async function getPushRules() {
|
||||
try {
|
||||
const result = await client.getPushRules();
|
||||
debuglog("Got push rules");
|
||||
|
||||
client.pushRules = result;
|
||||
|
||||
getFilter(); // Now get the filter and start syncing
|
||||
}, (err) => {
|
||||
self._startKeepAlives().done(() => {
|
||||
getPushRules();
|
||||
});
|
||||
self._updateSyncState("ERROR", { error: err });
|
||||
});
|
||||
} catch (err) {
|
||||
// wait for saved sync to complete before doing anything else,
|
||||
// otherwise the sync state will end up being incorrect
|
||||
await self.recoverFromSyncStartupError(savedSyncPromise, err);
|
||||
getPushRules();
|
||||
return;
|
||||
}
|
||||
getFilter(); // Now get the filter and start syncing
|
||||
}
|
||||
|
||||
function getFilter() {
|
||||
async function getFilter() {
|
||||
let filter;
|
||||
if (self.opts.filter) {
|
||||
filter = self.opts.filter;
|
||||
@@ -440,40 +455,55 @@ SyncApi.prototype.sync = function() {
|
||||
filter.setTimelineLimit(self.opts.initialSyncLimit);
|
||||
}
|
||||
|
||||
client.getOrCreateFilter(
|
||||
getFilterName(client.credentials.userId), filter,
|
||||
).done((filterId) => {
|
||||
// reset the notifications timeline to prepare it to paginate from
|
||||
// the current point in time.
|
||||
// The right solution would be to tie /sync pagination tokens into
|
||||
// /notifications API somehow.
|
||||
client.resetNotifTimelineSet();
|
||||
let filterId;
|
||||
try {
|
||||
filterId = await client.getOrCreateFilter(
|
||||
getFilterName(client.credentials.userId), filter,
|
||||
);
|
||||
} catch (err) {
|
||||
// wait for saved sync to complete before doing anything else,
|
||||
// otherwise the sync state will end up being incorrect
|
||||
await self.recoverFromSyncStartupError(savedSyncPromise, err);
|
||||
getFilter();
|
||||
return;
|
||||
}
|
||||
// reset the notifications timeline to prepare it to paginate from
|
||||
// the current point in time.
|
||||
// The right solution would be to tie /sync pagination tokens into
|
||||
// /notifications API somehow.
|
||||
client.resetNotifTimelineSet();
|
||||
|
||||
self._sync({ filterId });
|
||||
}, (err) => {
|
||||
self._startKeepAlives().done(function() {
|
||||
getFilter();
|
||||
});
|
||||
self._updateSyncState("ERROR", { error: err });
|
||||
});
|
||||
if (self._currentSyncRequest === null) {
|
||||
// Send this first sync request here so we can then wait for the saved
|
||||
// sync data to finish processing before we process the results of this one.
|
||||
console.log("Sending first sync request...");
|
||||
self._currentSyncRequest = self._doSyncRequest({ filterId }, savedSyncToken);
|
||||
}
|
||||
|
||||
// Now wait for the saved sync to finish...
|
||||
await savedSyncPromise;
|
||||
self._sync({ filterId });
|
||||
}
|
||||
|
||||
if (client.isGuest()) {
|
||||
// no push rules for guests, no access to POST filter for guests.
|
||||
self._sync({});
|
||||
} else {
|
||||
// Before fetching push rules, fetching the filter and syncing, check
|
||||
// for persisted /sync data and use that if present.
|
||||
client.store.getSavedSync().then((savedSync) => {
|
||||
// Pull the saved sync token out first, before the worker starts sending
|
||||
// all the sync data which could take a while. This will let us send our
|
||||
// first incremental sync request before we've processed our saved data.
|
||||
savedSyncPromise = client.store.getSavedSyncToken().then((tok) => {
|
||||
savedSyncToken = tok;
|
||||
return client.store.getSavedSync();
|
||||
}).then((savedSync) => {
|
||||
if (savedSync) {
|
||||
return self._syncFromCache(savedSync);
|
||||
}
|
||||
}).then(() => {
|
||||
// Get push rules and start syncing after getting the saved sync
|
||||
// to handle the case where we needed the `nextBatch` token to
|
||||
// start syncing from.
|
||||
getPushRules();
|
||||
});
|
||||
// Now start the first incremental sync request: this can also
|
||||
// take a while so if we set it going now, we can wait for it
|
||||
// to finish while we process our saved sync data.
|
||||
getPushRules();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -565,70 +595,20 @@ SyncApi.prototype._sync = async function(syncOptions) {
|
||||
return;
|
||||
}
|
||||
|
||||
let filterId = syncOptions.filterId;
|
||||
if (client.isGuest() && !filterId) {
|
||||
filterId = this._getGuestFilter();
|
||||
}
|
||||
|
||||
const syncToken = client.store.getSyncToken();
|
||||
|
||||
let pollTimeout = this.opts.pollTimeout;
|
||||
|
||||
if (this.getSyncState() !== 'SYNCING' || this._catchingUp) {
|
||||
// unless we are happily syncing already, we want the server to return
|
||||
// as quickly as possible, even if there are no events queued. This
|
||||
// serves two purposes:
|
||||
//
|
||||
// * When the connection dies, we want to know asap when it comes back,
|
||||
// so that we can hide the error from the user. (We don't want to
|
||||
// have to wait for an event or a timeout).
|
||||
//
|
||||
// * We want to know if the server has any to_device messages queued up
|
||||
// for us. We do that by calling it with a zero timeout until it
|
||||
// doesn't give us any more to_device messages.
|
||||
this._catchingUp = true;
|
||||
pollTimeout = 0;
|
||||
}
|
||||
|
||||
// normal timeout= plus buffer time
|
||||
const clientSideTimeoutMs = pollTimeout + BUFFER_PERIOD_MS;
|
||||
|
||||
const qps = {
|
||||
filter: filterId,
|
||||
timeout: pollTimeout,
|
||||
};
|
||||
|
||||
if (this.opts.disablePresence) {
|
||||
qps.set_presence = "offline";
|
||||
}
|
||||
|
||||
if (syncToken) {
|
||||
qps.since = syncToken;
|
||||
} else {
|
||||
// use a cachebuster for initialsyncs, to make sure that
|
||||
// we don't get a stale sync
|
||||
// (https://github.com/vector-im/vector-web/issues/1354)
|
||||
qps._cacheBuster = Date.now();
|
||||
}
|
||||
|
||||
if (this.getSyncState() == 'ERROR' || this.getSyncState() == 'RECONNECTING') {
|
||||
// we think the connection is dead. If it comes back up, we won't know
|
||||
// about it till /sync returns. If the timeout= is high, this could
|
||||
// be a long time. Set it to 0 when doing retries so we don't have to wait
|
||||
// for an event or a timeout before emiting the SYNCING event.
|
||||
qps.timeout = 0;
|
||||
}
|
||||
|
||||
let data;
|
||||
try {
|
||||
//debuglog('Starting sync since=' + syncToken);
|
||||
this._currentSyncRequest = client._http.authedRequest(
|
||||
undefined, "GET", "/sync", qps, undefined, clientSideTimeoutMs,
|
||||
);
|
||||
if (this._currentSyncRequest === null) {
|
||||
this._currentSyncRequest = this._doSyncRequest(syncOptions, syncToken);
|
||||
}
|
||||
data = await this._currentSyncRequest;
|
||||
} catch (e) {
|
||||
this._onSyncError(e, syncOptions);
|
||||
return;
|
||||
} finally {
|
||||
this._currentSyncRequest = null;
|
||||
}
|
||||
|
||||
//debuglog('Completed sync, next_batch=' + data.next_batch);
|
||||
@@ -699,6 +679,67 @@ SyncApi.prototype._sync = async function(syncOptions) {
|
||||
this._sync(syncOptions);
|
||||
};
|
||||
|
||||
SyncApi.prototype._doSyncRequest = function(syncOptions, syncToken) {
|
||||
const qps = this._getSyncParams(syncOptions, syncToken);
|
||||
return this.client._http.authedRequest(
|
||||
undefined, "GET", "/sync", qps, undefined,
|
||||
qps.timeout + BUFFER_PERIOD_MS,
|
||||
);
|
||||
};
|
||||
|
||||
SyncApi.prototype._getSyncParams = function(syncOptions, syncToken) {
|
||||
let pollTimeout = this.opts.pollTimeout;
|
||||
|
||||
if (this.getSyncState() !== 'SYNCING' || this._catchingUp) {
|
||||
// unless we are happily syncing already, we want the server to return
|
||||
// as quickly as possible, even if there are no events queued. This
|
||||
// serves two purposes:
|
||||
//
|
||||
// * When the connection dies, we want to know asap when it comes back,
|
||||
// so that we can hide the error from the user. (We don't want to
|
||||
// have to wait for an event or a timeout).
|
||||
//
|
||||
// * We want to know if the server has any to_device messages queued up
|
||||
// for us. We do that by calling it with a zero timeout until it
|
||||
// doesn't give us any more to_device messages.
|
||||
this._catchingUp = true;
|
||||
pollTimeout = 0;
|
||||
}
|
||||
|
||||
let filterId = syncOptions.filterId;
|
||||
if (this.client.isGuest() && !filterId) {
|
||||
filterId = this._getGuestFilter();
|
||||
}
|
||||
|
||||
const qps = {
|
||||
filter: filterId,
|
||||
timeout: pollTimeout,
|
||||
};
|
||||
|
||||
if (this.opts.disablePresence) {
|
||||
qps.set_presence = "offline";
|
||||
}
|
||||
|
||||
if (syncToken) {
|
||||
qps.since = syncToken;
|
||||
} else {
|
||||
// use a cachebuster for initialsyncs, to make sure that
|
||||
// we don't get a stale sync
|
||||
// (https://github.com/vector-im/vector-web/issues/1354)
|
||||
qps._cacheBuster = Date.now();
|
||||
}
|
||||
|
||||
if (this.getSyncState() == 'ERROR' || this.getSyncState() == 'RECONNECTING') {
|
||||
// we think the connection is dead. If it comes back up, we won't know
|
||||
// about it till /sync returns. If the timeout= is high, this could
|
||||
// be a long time. Set it to 0 when doing retries so we don't have to wait
|
||||
// for an event or a timeout before emiting the SYNCING event.
|
||||
qps.timeout = 0;
|
||||
}
|
||||
|
||||
return qps;
|
||||
};
|
||||
|
||||
SyncApi.prototype._onSyncError = function(err, syncOptions) {
|
||||
if (!this._running) {
|
||||
debuglog("Sync no longer running: exiting");
|
||||
|
||||
Reference in New Issue
Block a user