diff --git a/spec/unit/matrix-client.spec.js b/spec/unit/matrix-client.spec.js index 6585bb5a7..8ef22f6b0 100644 --- a/spec/unit/matrix-client.spec.js +++ b/spec/unit/matrix-client.spec.js @@ -137,6 +137,7 @@ describe("MatrixClient", function() { "getSyncAccumulator", "startup", "deleteAllData", ].reduce((r, k) => { r[k] = expect.createSpy(); return r; }, {}); store.getSavedSync = expect.createSpy().andReturn(Promise.resolve(null)); + store.getSavedSyncToken = expect.createSpy().andReturn(Promise.resolve(null)); store.setSyncData = expect.createSpy().andReturn(Promise.resolve(null)); client = new MatrixClient({ baseUrl: "https://my.home.server", diff --git a/src/store/indexeddb-local-backend.js b/src/store/indexeddb-local-backend.js index 338631695..a928e1aa6 100644 --- a/src/store/indexeddb-local-backend.js +++ b/src/store/indexeddb-local-backend.js @@ -1,5 +1,6 @@ /* Copyright 2017 Vector Creations Ltd +Copyright 2018 New Vector Ltd Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -243,6 +244,10 @@ LocalIndexedDBStoreBackend.prototype = { } }, + getNextBatchToken: function() { + return Promise.resolve(this._syncAccumulator.getNextBatchToken()); + }, + setSyncData: function(syncData) { return Promise.resolve().then(() => { this._syncAccumulator.accumulate(syncData); diff --git a/src/store/indexeddb-remote-backend.js b/src/store/indexeddb-remote-backend.js index 4decfc3f3..2221633e4 100644 --- a/src/store/indexeddb-remote-backend.js +++ b/src/store/indexeddb-remote-backend.js @@ -1,5 +1,6 @@ /* Copyright 2017 Vector Creations Ltd +Copyright 2018 New Vector Ltd Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -74,6 +75,10 @@ RemoteIndexedDBStoreBackend.prototype = { return this._doCmd('getSavedSync'); }, + getNextBatchToken: function() { + return this._doCmd('getNextBatchToken'); + }, + setSyncData: function(syncData) { return this._doCmd('setSyncData', [syncData]); }, diff --git a/src/store/indexeddb-store-worker.js b/src/store/indexeddb-store-worker.js index 3dc16e855..d32df7274 100644 --- a/src/store/indexeddb-store-worker.js +++ b/src/store/indexeddb-store-worker.js @@ -1,5 +1,6 @@ /* Copyright 2017 Vector Creations Ltd +Copyright 2018 New Vector Ltd Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -88,6 +89,9 @@ class IndexedDBStoreWorker { case 'getUserPresenceEvents': prom = this.backend.getUserPresenceEvents(); break; + case 'getNextBatchToken': + prom = this.backend.getNextBatchToken(); + break; } if (prom === undefined) { diff --git a/src/store/indexeddb.js b/src/store/indexeddb.js index 133492065..d47e35544 100644 --- a/src/store/indexeddb.js +++ b/src/store/indexeddb.js @@ -1,5 +1,6 @@ /* Copyright 2017 Vector Creations Ltd +Copyright 2018 New Vector Ltd Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -145,6 +146,14 @@ IndexedDBStore.prototype.getSavedSync = function() { return this.backend.getSavedSync(); }; +/** + * @return {Promise} If there is a saved sync, the nextBatch token + * for this sync, otherwise null. + */ +IndexedDBStore.prototype.getSavedSyncToken = function() { + return this.backend.getNextBatchToken(); +}, + /** * Delete all data from this store. * @return {Promise} Resolves if the data was deleted from the database. diff --git a/src/store/memory.js b/src/store/memory.js index 8b6be29ae..4f2f7c754 100644 --- a/src/store/memory.js +++ b/src/store/memory.js @@ -1,6 +1,7 @@ /* Copyright 2015, 2016 OpenMarket Ltd Copyright 2017 Vector Creations Ltd +Copyright 2018 New Vector Ltd Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -346,6 +347,14 @@ module.exports.MatrixInMemoryStore.prototype = { return Promise.resolve(null); }, + /** + * @return {Promise} If there is a saved sync, the nextBatch token + * for this sync, otherwise null. + */ + getSavedSyncToken: function() { + return Promise.resolve(null); + }, + /** * Delete all data from this store. * @return {Promise} An immediately resolved promise. diff --git a/src/store/stub.js b/src/store/stub.js index f611be394..f41e0c5e0 100644 --- a/src/store/stub.js +++ b/src/store/stub.js @@ -1,6 +1,7 @@ /* Copyright 2015, 2016 OpenMarket Ltd Copyright 2017 Vector Creations Ltd +Copyright 2018 New Vector Ltd Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -247,6 +248,14 @@ StubStore.prototype = { return Promise.resolve(null); }, + /** + * @return {Promise} If there is a saved sync, the nextBatch token + * for this sync, otherwise null. + */ + getSavedSyncToken: function() { + return Promise.resolve(null); + }, + /** * Delete all data from this store. Does nothing since this store * doesn't store anything. diff --git a/src/sync-accumulator.js b/src/sync-accumulator.js index c1266a9b0..7369c9b8a 100644 --- a/src/sync-accumulator.js +++ b/src/sync-accumulator.js @@ -1,5 +1,6 @@ /* Copyright 2017 Vector Creations Ltd +Copyright 2018 New Vector Ltd Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -520,6 +521,10 @@ class SyncAccumulator { accountData: accData, }; } + + getNextBatchToken() { + return this.nextBatch; + } } function setState(eventMap, event) { diff --git a/src/sync.js b/src/sync.js index a89649010..087f8c803 100644 --- a/src/sync.js +++ b/src/sync.js @@ -396,6 +396,17 @@ SyncApi.prototype.getSyncState = function() { return this._syncState; }; +SyncApi.prototype.recoverFromSyncStartupError = async function(savedSyncPromise, err) { + // Wait for the saved sync to complete - we send the pushrules and filter requests + // before the saved sync has finished so they can run in parallel, but only process + // the results after the saved sync is done. Equivalently, we wait for it to finish + // before reporting failures from these functions. + await savedSyncPromise; + const keepaliveProm = this._startKeepAlives(); + this._updateSyncState("ERROR", { error: err }); + await keepaliveProm; +}; + /** * Main entry point */ @@ -410,28 +421,32 @@ SyncApi.prototype.sync = function() { global.document.addEventListener("online", this._onOnlineBound, false); } + let savedSyncPromise = Promise.resolve(); + let savedSyncToken = null; + // We need to do one-off checks before we can begin the /sync loop. // These are: // 1) We need to get push rules so we can check if events should bing as we get // them from /sync. // 2) We need to get/create a filter which we can use for /sync. - function getPushRules() { - client.getPushRules().done((result) => { + async function getPushRules() { + try { + const result = await client.getPushRules(); debuglog("Got push rules"); client.pushRules = result; - - getFilter(); // Now get the filter and start syncing - }, (err) => { - self._startKeepAlives().done(() => { - getPushRules(); - }); - self._updateSyncState("ERROR", { error: err }); - }); + } catch (err) { + // wait for saved sync to complete before doing anything else, + // otherwise the sync state will end up being incorrect + await self.recoverFromSyncStartupError(savedSyncPromise, err); + getPushRules(); + return; + } + getFilter(); // Now get the filter and start syncing } - function getFilter() { + async function getFilter() { let filter; if (self.opts.filter) { filter = self.opts.filter; @@ -440,40 +455,55 @@ SyncApi.prototype.sync = function() { filter.setTimelineLimit(self.opts.initialSyncLimit); } - client.getOrCreateFilter( - getFilterName(client.credentials.userId), filter, - ).done((filterId) => { - // reset the notifications timeline to prepare it to paginate from - // the current point in time. - // The right solution would be to tie /sync pagination tokens into - // /notifications API somehow. - client.resetNotifTimelineSet(); + let filterId; + try { + filterId = await client.getOrCreateFilter( + getFilterName(client.credentials.userId), filter, + ); + } catch (err) { + // wait for saved sync to complete before doing anything else, + // otherwise the sync state will end up being incorrect + await self.recoverFromSyncStartupError(savedSyncPromise, err); + getFilter(); + return; + } + // reset the notifications timeline to prepare it to paginate from + // the current point in time. + // The right solution would be to tie /sync pagination tokens into + // /notifications API somehow. + client.resetNotifTimelineSet(); - self._sync({ filterId }); - }, (err) => { - self._startKeepAlives().done(function() { - getFilter(); - }); - self._updateSyncState("ERROR", { error: err }); - }); + if (self._currentSyncRequest === null) { + // Send this first sync request here so we can then wait for the saved + // sync data to finish processing before we process the results of this one. + console.log("Sending first sync request..."); + self._currentSyncRequest = self._doSyncRequest({ filterId }, savedSyncToken); + } + + // Now wait for the saved sync to finish... + await savedSyncPromise; + self._sync({ filterId }); } if (client.isGuest()) { // no push rules for guests, no access to POST filter for guests. self._sync({}); } else { - // Before fetching push rules, fetching the filter and syncing, check - // for persisted /sync data and use that if present. - client.store.getSavedSync().then((savedSync) => { + // Pull the saved sync token out first, before the worker starts sending + // all the sync data which could take a while. This will let us send our + // first incremental sync request before we've processed our saved data. + savedSyncPromise = client.store.getSavedSyncToken().then((tok) => { + savedSyncToken = tok; + return client.store.getSavedSync(); + }).then((savedSync) => { if (savedSync) { return self._syncFromCache(savedSync); } - }).then(() => { - // Get push rules and start syncing after getting the saved sync - // to handle the case where we needed the `nextBatch` token to - // start syncing from. - getPushRules(); }); + // Now start the first incremental sync request: this can also + // take a while so if we set it going now, we can wait for it + // to finish while we process our saved sync data. + getPushRules(); } }; @@ -565,70 +595,20 @@ SyncApi.prototype._sync = async function(syncOptions) { return; } - let filterId = syncOptions.filterId; - if (client.isGuest() && !filterId) { - filterId = this._getGuestFilter(); - } - const syncToken = client.store.getSyncToken(); - let pollTimeout = this.opts.pollTimeout; - - if (this.getSyncState() !== 'SYNCING' || this._catchingUp) { - // unless we are happily syncing already, we want the server to return - // as quickly as possible, even if there are no events queued. This - // serves two purposes: - // - // * When the connection dies, we want to know asap when it comes back, - // so that we can hide the error from the user. (We don't want to - // have to wait for an event or a timeout). - // - // * We want to know if the server has any to_device messages queued up - // for us. We do that by calling it with a zero timeout until it - // doesn't give us any more to_device messages. - this._catchingUp = true; - pollTimeout = 0; - } - - // normal timeout= plus buffer time - const clientSideTimeoutMs = pollTimeout + BUFFER_PERIOD_MS; - - const qps = { - filter: filterId, - timeout: pollTimeout, - }; - - if (this.opts.disablePresence) { - qps.set_presence = "offline"; - } - - if (syncToken) { - qps.since = syncToken; - } else { - // use a cachebuster for initialsyncs, to make sure that - // we don't get a stale sync - // (https://github.com/vector-im/vector-web/issues/1354) - qps._cacheBuster = Date.now(); - } - - if (this.getSyncState() == 'ERROR' || this.getSyncState() == 'RECONNECTING') { - // we think the connection is dead. If it comes back up, we won't know - // about it till /sync returns. If the timeout= is high, this could - // be a long time. Set it to 0 when doing retries so we don't have to wait - // for an event or a timeout before emiting the SYNCING event. - qps.timeout = 0; - } - let data; try { //debuglog('Starting sync since=' + syncToken); - this._currentSyncRequest = client._http.authedRequest( - undefined, "GET", "/sync", qps, undefined, clientSideTimeoutMs, - ); + if (this._currentSyncRequest === null) { + this._currentSyncRequest = this._doSyncRequest(syncOptions, syncToken); + } data = await this._currentSyncRequest; } catch (e) { this._onSyncError(e, syncOptions); return; + } finally { + this._currentSyncRequest = null; } //debuglog('Completed sync, next_batch=' + data.next_batch); @@ -699,6 +679,67 @@ SyncApi.prototype._sync = async function(syncOptions) { this._sync(syncOptions); }; +SyncApi.prototype._doSyncRequest = function(syncOptions, syncToken) { + const qps = this._getSyncParams(syncOptions, syncToken); + return this.client._http.authedRequest( + undefined, "GET", "/sync", qps, undefined, + qps.timeout + BUFFER_PERIOD_MS, + ); +}; + +SyncApi.prototype._getSyncParams = function(syncOptions, syncToken) { + let pollTimeout = this.opts.pollTimeout; + + if (this.getSyncState() !== 'SYNCING' || this._catchingUp) { + // unless we are happily syncing already, we want the server to return + // as quickly as possible, even if there are no events queued. This + // serves two purposes: + // + // * When the connection dies, we want to know asap when it comes back, + // so that we can hide the error from the user. (We don't want to + // have to wait for an event or a timeout). + // + // * We want to know if the server has any to_device messages queued up + // for us. We do that by calling it with a zero timeout until it + // doesn't give us any more to_device messages. + this._catchingUp = true; + pollTimeout = 0; + } + + let filterId = syncOptions.filterId; + if (this.client.isGuest() && !filterId) { + filterId = this._getGuestFilter(); + } + + const qps = { + filter: filterId, + timeout: pollTimeout, + }; + + if (this.opts.disablePresence) { + qps.set_presence = "offline"; + } + + if (syncToken) { + qps.since = syncToken; + } else { + // use a cachebuster for initialsyncs, to make sure that + // we don't get a stale sync + // (https://github.com/vector-im/vector-web/issues/1354) + qps._cacheBuster = Date.now(); + } + + if (this.getSyncState() == 'ERROR' || this.getSyncState() == 'RECONNECTING') { + // we think the connection is dead. If it comes back up, we won't know + // about it till /sync returns. If the timeout= is high, this could + // be a long time. Set it to 0 when doing retries so we don't have to wait + // for an event or a timeout before emiting the SYNCING event. + qps.timeout = 0; + } + + return qps; +}; + SyncApi.prototype._onSyncError = function(err, syncOptions) { if (!this._running) { debuglog("Sync no longer running: exiting");