diff --git a/spec/integ/megolm.spec.js b/spec/integ/megolm.spec.js index 30c965b9f..da69a64a7 100644 --- a/spec/integ/megolm.spec.js +++ b/spec/integ/megolm.spec.js @@ -914,6 +914,99 @@ describe("megolm", function() { }); + it("We should not get confused by out-of-order device query responses", + () => { + // https://github.com/vector-im/riot-web/issues/3126 + return aliceTestClient.start().then(() => { + aliceTestClient.httpBackend.when('GET', '/sync').respond( + 200, getSyncResponse(['@bob:xyz', '@chris:abc'])); + return aliceTestClient.httpBackend.flush('/sync', 1); + }).then(() => { + // to make sure the initial device queries are flushed out, we + // attempt to send a message. + + aliceTestClient.httpBackend.when('POST', '/keys/query').respond( + 200, { + device_keys: { + '@bob:xyz': {}, + '@chris:abc': {}, + }, + }, + ); + + aliceTestClient.httpBackend.when('PUT', '/send/').respond( + 200, {event_id: '$event1'}); + + return q.all([ + aliceTestClient.client.sendTextMessage(ROOM_ID, 'test'), + aliceTestClient.httpBackend.flush('/keys/query', 1).then( + () => aliceTestClient.httpBackend.flush('/send/', 1, 20), + ), + ]); + }).then(() => { + expect(aliceTestClient.storage.getEndToEndDeviceSyncToken()).toEqual(1); + + // invalidate bob's and chris's device lists in separate syncs + aliceTestClient.httpBackend.when('GET', '/sync').respond(200, { + next_batch: '2', + device_lists: { + changed: ['@bob:xyz'], + }, + }); + aliceTestClient.httpBackend.when('GET', '/sync').respond(200, { + next_batch: '3', + device_lists: { + changed: ['@chris:abc'], + }, + }); + return aliceTestClient.httpBackend.flush('/sync', 2); + }).then(() => { + // check that we don't yet have a request for chris's devices. + aliceTestClient.httpBackend.when('POST', '/keys/query', { + device_keys: { + '@chris:abc': {}, + }, + token: '3', + }).respond(200, { + device_keys: {'@chris:abc': {}}, + }); + return aliceTestClient.httpBackend.flush('/keys/query', 1); + }).then((flushed) => { + expect(flushed).toEqual(0); + expect(aliceTestClient.storage.getEndToEndDeviceSyncToken()).toEqual(1); + + // now add an expectation for a query for bob's devices, and let + // it complete. + aliceTestClient.httpBackend.when('POST', '/keys/query', { + device_keys: { + '@bob:xyz': {}, + }, + token: '2', + }).respond(200, { + device_keys: {'@bob:xyz': {}}, + }); + return aliceTestClient.httpBackend.flush('/keys/query', 1); + }).then((flushed) => { + expect(flushed).toEqual(1); + + // wait for the client to stop processing the response + return aliceTestClient.client.downloadKeys(['@bob:xyz']); + }).then(() => { + expect(aliceTestClient.storage.getEndToEndDeviceSyncToken()).toEqual(2); + + // now let the query for chris's devices complete. + return aliceTestClient.httpBackend.flush('/keys/query', 1); + }).then((flushed) => { + expect(flushed).toEqual(1); + + // wait for the client to stop processing the response + return aliceTestClient.client.downloadKeys(['@chris:abc']); + }).then(() => { + expect(aliceTestClient.storage.getEndToEndDeviceSyncToken()).toEqual(3); + }); + }); + + it("Alice exports megolm keys and imports them to a new device", function(done) { let messageEncrypted; diff --git a/src/crypto/DeviceList.js b/src/crypto/DeviceList.js index 6cb7735e6..fe042d744 100644 --- a/src/crypto/DeviceList.js +++ b/src/crypto/DeviceList.js @@ -39,9 +39,22 @@ export default class DeviceList { // userId -> true this._pendingUsersWithNewDevices = {}; - // userId -> promise + // userId -> true this._keyDownloadsInProgressByUser = {}; + // deferred which is resolved when the current device query resolves. + // (null if there is no current request). + this._currentQueryDeferred = null; + + // deferred which is resolved when the *next* device query resolves. + // + // Normally it is meaningless for this to be non-null when + // _currentQueryDeferred is null, but it can happen if the previous + // query has finished but the next one has not yet started (because the + // previous query failed, in which case we deliberately delay starting + // the next query to avoid tight-looping). + this._queuedQueryDeferred = null; + this.lastKnownSyncToken = null; } @@ -55,35 +68,43 @@ export default class DeviceList { * module:crypto/deviceinfo|DeviceInfo}. */ downloadKeys(userIds, forceDownload) { - // promises we need to wait for while the download happens - const promises = []; - let needsRefresh = false; + let waitForCurrentQuery = false; + userIds.forEach((u) => { - if (this._keyDownloadsInProgressByUser[u]) { - // just wait for the existing download to complete - promises.push(this._keyDownloadsInProgressByUser[u]); - } else { - if (forceDownload) { - console.log("Invalidating device list for " + u + - " for forceDownload"); - this.invalidateUserDeviceList(u); - } else if (!this.getStoredDevicesForUser(u)) { - console.log("Invalidating device list for " + u + - " due to empty cache"); - this.invalidateUserDeviceList(u); - } - if (this._pendingUsersWithNewDevices[u]) { - needsRefresh = true; - } + if (this._pendingUsersWithNewDevices[u]) { + // we already know this user's devices are outdated + needsRefresh = true; + } else if (this._keyDownloadsInProgressByUser[u]) { + // already a download in progress - just wait for it. + // (even if forceDownload is true) + waitForCurrentQuery = true; + } else if (forceDownload) { + console.log("Invalidating device list for " + u + + " for forceDownload"); + this.invalidateUserDeviceList(u); + needsRefresh = true; + } else if (!this.getStoredDevicesForUser(u)) { + console.log("Invalidating device list for " + u + + " due to empty cache"); + this.invalidateUserDeviceList(u); + needsRefresh = true; } }); + let promise; if (needsRefresh) { - promises.push(this.refreshOutdatedDeviceLists(true)); + console.log("downloadKeys: waiting for next key query"); + promise = this._startOrQueueDeviceQuery(); + } else if(waitForCurrentQuery) { + console.log("downloadKeys: waiting for in-flight query to complete"); + promise = this._currentQueryDeferred.promise; + } else { + // we're all up-to-date. + promise = q(); } - return q.all(promises).then(() => { + return promise.then(() => { return this._getDevicesFromStore(userIds); }); } @@ -217,75 +238,97 @@ export default class DeviceList { } /** - * Start device queries for any users with outdated device lists - * - * We tolerate multiple concurrent device queries, but only one query per - * user. - * - * If any users already have downloads in progress, they are ignored - they - * will be refreshed when the current download completes anyway, so - * each user with outdated device lists will be updated eventually. - * - * The returned promise resolves immediately if there are no users with - * outdated device lists, or if all users with outdated device lists already - * have a query in progress. - * - * Otherwise, a new query request is made, and the promise resolves - * once that query completes. If the query fails, the promise will reject - * if rejectOnFailure was truthy, otherwise it will still resolve. - * - * @param {Boolean?} rejectOnFailure true to make the returned promise - * reject if the device list query fails. + * If there is not already a device list query in progress, and we have + * users who have outdated device lists, start a query now. + */ + refreshOutdatedDeviceLists() { + if (this._currentQueryDeferred) { + // request already in progress - do nothing. (We will automatically + // make another request if there are more users with outdated + // device lists when the current request completes). + return; + } + + this._startDeviceQuery(); + } + + /** + * If there is currently a device list query in progress, returns a promise + * which will resolve when the *next* query completes. Otherwise, starts + * a new query, and returns a promise which resolves when it completes. * * @return {Promise} */ - refreshOutdatedDeviceLists(rejectOnFailure) { - const users = Object.keys(this._pendingUsersWithNewDevices).filter( - (u) => !this._keyDownloadsInProgressByUser[u], - ); + _startOrQueueDeviceQuery() { + if (!this._currentQueryDeferred) { + this._startDeviceQuery(); + if (!this._currentQueryDeferred) { + return q(); + } - if (users.length === 0) { - return q(); + return this._currentQueryDeferred.promise; } - let prom = this._doKeyDownloadForUsers(users).then(() => { + if (!this._queuedQueryDeferred) { + this._queuedQueryDeferred = q.defer(); + } + + return this._queuedQueryDeferred.promise; + } + + /** + * kick off a new device query + * + * Throws if there is already a query in progress. + */ + _startDeviceQuery() { + if (this._currentQueryDeferred) { + throw new Error("DeviceList._startDeviceQuery called with request active"); + } + + this._currentQueryDeferred = this._queuedQueryDeferred || q.defer(); + this._queuedQueryDeferred = null; + + const users = Object.keys(this._pendingUsersWithNewDevices); + if (users.length === 0) { + // nothing to do + this._currentQueryDeferred.resolve(); + this._currentQueryDeferred = null; + return; + } + + this._doKeyDownloadForUsers(users).done(() => { users.forEach((u) => { delete this._keyDownloadsInProgressByUser[u]; }); + this._currentQueryDeferred.resolve(); + this._currentQueryDeferred = null; + // flush out any more requests that were blocked up while that - // was going on, but let the initial promise complete now. - // - this.refreshOutdatedDeviceLists().done(); + // was going on. + this._startDeviceQuery(); }, (e) => { console.error( 'Error updating device key cache for ' + users + ":", e, ); // reinstate the pending flags on any users which failed; this will - // mean that we will do another download in the future, but won't - // tight-loop. - // + // mean that we will do another download in the future (actually on + // the next /sync). users.forEach((u) => { delete this._keyDownloadsInProgressByUser[u]; this._pendingUsersWithNewDevices[u] = true; }); - // TODO: schedule a retry. - throw e; + this._currentQueryDeferred.reject(e); + this._currentQueryDeferred = null; }); users.forEach((u) => { delete this._pendingUsersWithNewDevices[u]; - this._keyDownloadsInProgressByUser[u] = prom; + this._keyDownloadsInProgressByUser[u] = true; }); - - if (!rejectOnFailure) { - // normally we just want to swallow the exception - we've already - // logged it futher up. - prom = prom.catch((e) => {}); - } - return prom; } /** diff --git a/src/crypto/index.js b/src/crypto/index.js index 4f43eafb2..5d5104997 100644 --- a/src/crypto/index.js +++ b/src/crypto/index.js @@ -778,7 +778,7 @@ Crypto.prototype._onSyncCompleted = function(syncData) { // if that failed, we fall back to invalidating everyone. console.warn("Error fetching changed device list", e); this._invalidateDeviceListForAllActiveUsers(); - return this._deviceList.refreshOutdatedDeviceLists(); + this._deviceList.refreshOutdatedDeviceLists(); }).done(); } else { // otherwise, we have to invalidate all devices for all users we @@ -790,7 +790,7 @@ Crypto.prototype._onSyncCompleted = function(syncData) { } // catch up on any new devices we got told about during the sync. - this._deviceList.refreshOutdatedDeviceLists().done(); + this._deviceList.refreshOutdatedDeviceLists(); }; /** @@ -869,7 +869,7 @@ Crypto.prototype._invalidateDeviceListsSince = function(oldSyncToken) { this._deviceList.invalidateUserDeviceList(u); } }); - return this._deviceList.refreshOutdatedDeviceLists(); + this._deviceList.refreshOutdatedDeviceLists(); }); };