From 94addb6315ab43c1f0506c1571ad52103bb33784 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 30 Jan 2017 18:58:37 +0000 Subject: [PATCH 1/2] Rewrite the device key query logic Only permit one query per user at a time. --- spec/integ/matrix-client-methods.spec.js | 6 +- src/crypto/DeviceList.js | 215 ++++++++++------------- src/crypto/index.js | 4 +- 3 files changed, 102 insertions(+), 123 deletions(-) diff --git a/spec/integ/matrix-client-methods.spec.js b/spec/integ/matrix-client-methods.spec.js index 4acec1aa9..b87d255d2 100644 --- a/spec/integ/matrix-client-methods.spec.js +++ b/spec/integ/matrix-client-methods.spec.js @@ -347,7 +347,11 @@ describe("MatrixClient", function() { */ httpBackend.when("POST", "/keys/query").check(function(req) { - expect(req.data).toEqual({device_keys: {boris: {}, chaz: {}}}); + expect(req.data).toEqual({device_keys: { + '@alice:localhost': {}, + 'boris': {}, + 'chaz': {}, + }}); }).respond(200, { device_keys: { boris: borisKeys, diff --git a/src/crypto/DeviceList.js b/src/crypto/DeviceList.js index 175286b34..df493b124 100644 --- a/src/crypto/DeviceList.js +++ b/src/crypto/DeviceList.js @@ -25,7 +25,6 @@ import q from 'q'; import DeviceInfo from './deviceinfo'; import olmlib from './olmlib'; -import utils from '../utils'; /** * @alias module:crypto/DeviceList @@ -36,10 +35,11 @@ export default class DeviceList { this._sessionStore = sessionStore; this._olmDevice = olmDevice; + // users with outdated device lists // userId -> true this._pendingUsersWithNewDevices = {}; - // userId -> [promise, ...] + // userId -> promise this._keyDownloadsInProgressByUser = {}; } @@ -53,45 +53,30 @@ export default class DeviceList { * module:crypto/deviceinfo|DeviceInfo}. */ downloadKeys(userIds, forceDownload) { - const self = this; - // promises we need to wait for while the download happens const promises = []; - // list of userids we need to download keys for - let downloadUsers = []; - - function perUserCatch(u) { - return function(e) { - console.warn('Error downloading keys for user ' + u + ':', e); - }; - } - - if (forceDownload) { - downloadUsers = userIds; - } else { - for (let i = 0; i < userIds.length; ++i) { - const u = userIds[i]; - - const inprogress = this._keyDownloadsInProgressByUser[u]; - if (inprogress) { - // wait for the download to complete - promises.push(q.any(inprogress).catch(perUserCatch(u))); - } else if (!this.getStoredDevicesForUser(u)) { - downloadUsers.push(u); + let needsFlush = false; + userIds.forEach((u) => { + if (this._keyDownloadsInProgressByUser[u]) { + // just wait for the existing download to complete + promises.push(this._keyDownloadsInProgressByUser[u]); + } else { + if (forceDownload || !this.getStoredDevicesForUser(u)) { + this.invalidateUserDeviceList(u); + } + if (this._pendingUsersWithNewDevices[u]) { + needsFlush = true; } } + }); + + if (needsFlush) { + promises.push(this.flushNewDeviceRequests(true)); } - if (downloadUsers.length > 0) { - const r = this._doKeyDownloadForUsers(downloadUsers); - downloadUsers.map(function(u) { - promises.push(r[u].catch(perUserCatch(u))); - }); - } - - return q.all(promises).then(function() { - return self._getDevicesFromStore(userIds); + return q.all(promises).then(() => { + return this._getDevicesFromStore(userIds); }); } @@ -210,134 +195,124 @@ export default class DeviceList { * @param {String} userId */ invalidateUserDeviceList(userId) { + if (typeof userId !== 'string') { + throw new Error('userId must be a string; was '+userId); + } this._pendingUsersWithNewDevices[userId] = true; } /** - * Start device queries for any users who sent us an m.new_device recently + * Start device queries for any users with outdated device lists + * + * We tolerate multiple concurrent device queries, but only one query per + * user. + * + * If any users already have downloads in progress, they are ignored - + * they will be refreshed when the current download completes anyway. + * + * The returned promise resolves immediately if there are no users with + * outdated device lists, or if all users with outdated device lists already + * have a query in progress. + * + * Otherwise, a new query request is made, and the promise resolves or + * once that query completes. If the query fails, the promise will reject + * if rejectOnFailure was truthy, otherwise it will still resolve. + * + * @param {Boolean?} rejectOnFailure true to make the returned promise + * reject if the device list query fails. + * + * @return {Promise} */ - flushNewDeviceRequests() { - const users = Object.keys(this._pendingUsersWithNewDevices); + flushNewDeviceRequests(rejectOnFailure) { + const users = Object.keys(this._pendingUsersWithNewDevices).filter( + (u) => !this._keyDownloadsInProgressByUser[u], + ); if (users.length === 0) { - return; + return q(); } - const r = this._doKeyDownloadForUsers(users); + let prom = this._doKeyDownloadForUsers(users).then(() => { + users.forEach((u) => { + delete this._keyDownloadsInProgressByUser[u]; + }); - // we've kicked off requests to these users: remove their - // pending flag for now. - this._pendingUsersWithNewDevices = {}; + // flush out any more requests that were blocked up while that + // was going on, but let the initial promise complete now. + // + this.flushNewDeviceRequests().done(); + }, (e) => { + console.error( + 'Error updating device key cache for ' + users + ":", e, + ); - users.map((u) => { - r[u] = r[u].catch((e) => { - console.error( - 'Error updating device keys for user ' + u + ':', e, - ); - - // reinstate the pending flags on any users which failed; this will - // mean that we will do another download in the future, but won't - // tight-loop. - // + // reinstate the pending flags on any users which failed; this will + // mean that we will do another download in the future, but won't + // tight-loop. + // + users.forEach((u) => { + delete this._keyDownloadsInProgressByUser[u]; this._pendingUsersWithNewDevices[u] = true; }); + + // TODO: schedule a retry. + throw e; }); - q.all(Object.values(r)).done(); + users.forEach((u) => { + delete this._pendingUsersWithNewDevices[u]; + this._keyDownloadsInProgressByUser[u] = prom; + }); + + if (!rejectOnFailure) { + // normally we just want to swallow the exception - we've already + // logged it futher up. + prom = prom.catch((e) => {}); + } + return prom; } /** * @param {string[]} downloadUsers list of userIds * - * @return {Object} a map from userId to a promise for a result for that user + * @return {Promise} */ _doKeyDownloadForUsers(downloadUsers) { - const self = this; - console.log('Starting key download for ' + downloadUsers); - const deferMap = {}; - const promiseMap = {}; - - downloadUsers.map(function(u) { - const deferred = q.defer(); - const promise = deferred.promise.finally(function() { - const inProgress = self._keyDownloadsInProgressByUser[u]; - utils.removeElement(inProgress, function(e) { - return e === promise; - }); - if (inProgress.length === 0) { - // no more downloads for this user; remove the element - delete self._keyDownloadsInProgressByUser[u]; - } - }); - - if (!self._keyDownloadsInProgressByUser[u]) { - self._keyDownloadsInProgressByUser[u] = []; - } - self._keyDownloadsInProgressByUser[u].push(promise); - - deferMap[u] = deferred; - promiseMap[u] = promise; - }); - - this._baseApis.downloadKeysForUsers( + return this._baseApis.downloadKeysForUsers( downloadUsers, - ).done(function(res) { + ).then((res) => { const dk = res.device_keys || {}; - for (let i = 0; i < downloadUsers.length; ++i) { - const userId = downloadUsers[i]; - var deviceId; - + for (const userId of downloadUsers) { console.log('got keys for ' + userId + ':', dk[userId]); - if (!dk[userId]) { - // no result for this user - const err = 'Unknown'; - // TODO: do something with res.failures - deferMap[userId].reject(err); - continue; - } - // map from deviceid -> deviceinfo for this user const userStore = {}; - const devs = self._sessionStore.getEndToEndDevicesForUser(userId); + const devs = this._sessionStore.getEndToEndDevicesForUser(userId); if (devs) { - for (deviceId in devs) { - if (devs.hasOwnProperty(deviceId)) { - const d = DeviceInfo.fromStorage(devs[deviceId], deviceId); - userStore[deviceId] = d; - } - } + Object.keys(devs).forEach((deviceId) => { + const d = DeviceInfo.fromStorage(devs[deviceId], deviceId); + userStore[deviceId] = d; + }); } _updateStoredDeviceKeysForUser( - self._olmDevice, userId, userStore, dk[userId], - ); + this._olmDevice, userId, userStore, dk[userId] || {}, + ); // update the session store const storage = {}; - for (deviceId in userStore) { - if (!userStore.hasOwnProperty(deviceId)) { - continue; - } - + Object.keys(userStore).forEach((deviceId) => { storage[deviceId] = userStore[deviceId].toStorage(); - } - self._sessionStore.storeEndToEndDevicesForUser( + }); + + this._sessionStore.storeEndToEndDevicesForUser( userId, storage, - ); - - deferMap[userId].resolve(); + ); } - }, function(err) { - downloadUsers.map(function(u) { - deferMap[u].reject(err); - }); }); - - return promiseMap; } } diff --git a/src/crypto/index.js b/src/crypto/index.js index 527fd9f80..2abd6a1a3 100644 --- a/src/crypto/index.js +++ b/src/crypto/index.js @@ -708,7 +708,7 @@ Crypto.prototype._onInitialSyncCompleted = function(rooms) { this._initialSyncCompleted = true; // catch up on any m.new_device events which arrived during the initial sync. - this._deviceList.flushNewDeviceRequests(); + this._deviceList.flushNewDeviceRequests().done(); if (this._sessionStore.getDeviceAnnounced()) { return; @@ -845,7 +845,7 @@ Crypto.prototype._onNewDeviceEvent = function(event) { // we delay handling these until the intialsync has completed, so that we // can do all of them together. if (this._initialSyncCompleted) { - this._deviceList.flushNewDeviceRequests(); + this._deviceList.flushNewDeviceRequests().done(); } }; From c3440c506cf98b4b29d158c9d8fa6da0e93d41b0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 3 Feb 2017 00:10:13 +0000 Subject: [PATCH 2/2] Address review comments Update some comments, and s/flushNewDeviceRequests/refreshOutdatedDeviceLists/. --- src/crypto/DeviceList.js | 28 ++++++++++++++++++---------- src/crypto/index.js | 4 ++-- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/crypto/DeviceList.js b/src/crypto/DeviceList.js index df493b124..f90e117e1 100644 --- a/src/crypto/DeviceList.js +++ b/src/crypto/DeviceList.js @@ -56,7 +56,7 @@ export default class DeviceList { // promises we need to wait for while the download happens const promises = []; - let needsFlush = false; + let needsRefresh = false; userIds.forEach((u) => { if (this._keyDownloadsInProgressByUser[u]) { // just wait for the existing download to complete @@ -66,13 +66,13 @@ export default class DeviceList { this.invalidateUserDeviceList(u); } if (this._pendingUsersWithNewDevices[u]) { - needsFlush = true; + needsRefresh = true; } } }); - if (needsFlush) { - promises.push(this.flushNewDeviceRequests(true)); + if (needsRefresh) { + promises.push(this.refreshOutdatedDeviceLists(true)); } return q.all(promises).then(() => { @@ -190,11 +190,18 @@ export default class DeviceList { * Mark the cached device list for the given user outdated. * * This doesn't set off an update, so that several users can be batched - * together. Call flushDeviceListRequests() for that. + * together. Call refreshOutdatedDeviceLists() for that. * * @param {String} userId */ invalidateUserDeviceList(userId) { + // sanity-check the userId. This is mostly paranoia, but if synapse + // can't parse the userId we give it as an mxid, it 500s the whole + // request and we can never update the device lists again (because + // the broken userId is always 'invalid' and always included in any + // refresh request). + // By checking it is at least a string, we can eliminate a class of + // silly errors. if (typeof userId !== 'string') { throw new Error('userId must be a string; was '+userId); } @@ -207,14 +214,15 @@ export default class DeviceList { * We tolerate multiple concurrent device queries, but only one query per * user. * - * If any users already have downloads in progress, they are ignored - - * they will be refreshed when the current download completes anyway. + * If any users already have downloads in progress, they are ignored - they + * will be refreshed when the current download completes anyway, so + * each user with outdated device lists will be updated eventually. * * The returned promise resolves immediately if there are no users with * outdated device lists, or if all users with outdated device lists already * have a query in progress. * - * Otherwise, a new query request is made, and the promise resolves or + * Otherwise, a new query request is made, and the promise resolves * once that query completes. If the query fails, the promise will reject * if rejectOnFailure was truthy, otherwise it will still resolve. * @@ -223,7 +231,7 @@ export default class DeviceList { * * @return {Promise} */ - flushNewDeviceRequests(rejectOnFailure) { + refreshOutdatedDeviceLists(rejectOnFailure) { const users = Object.keys(this._pendingUsersWithNewDevices).filter( (u) => !this._keyDownloadsInProgressByUser[u], ); @@ -240,7 +248,7 @@ export default class DeviceList { // flush out any more requests that were blocked up while that // was going on, but let the initial promise complete now. // - this.flushNewDeviceRequests().done(); + this.refreshOutdatedDeviceLists().done(); }, (e) => { console.error( 'Error updating device key cache for ' + users + ":", e, diff --git a/src/crypto/index.js b/src/crypto/index.js index 2abd6a1a3..f007ae401 100644 --- a/src/crypto/index.js +++ b/src/crypto/index.js @@ -708,7 +708,7 @@ Crypto.prototype._onInitialSyncCompleted = function(rooms) { this._initialSyncCompleted = true; // catch up on any m.new_device events which arrived during the initial sync. - this._deviceList.flushNewDeviceRequests().done(); + this._deviceList.refreshOutdatedDeviceLists().done(); if (this._sessionStore.getDeviceAnnounced()) { return; @@ -845,7 +845,7 @@ Crypto.prototype._onNewDeviceEvent = function(event) { // we delay handling these until the intialsync has completed, so that we // can do all of them together. if (this._initialSyncCompleted) { - this._deviceList.flushNewDeviceRequests().done(); + this._deviceList.refreshOutdatedDeviceLists().done(); } };