You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-12-01 04:43:29 +03:00
Merge pull request #347 from matrix-org/rav/rewrite_device_query_logic
Rewrite the device key query logic
This commit is contained in:
@@ -347,7 +347,11 @@ describe("MatrixClient", function() {
|
||||
*/
|
||||
|
||||
httpBackend.when("POST", "/keys/query").check(function(req) {
|
||||
expect(req.data).toEqual({device_keys: {boris: {}, chaz: {}}});
|
||||
expect(req.data).toEqual({device_keys: {
|
||||
'@alice:localhost': {},
|
||||
'boris': {},
|
||||
'chaz': {},
|
||||
}});
|
||||
}).respond(200, {
|
||||
device_keys: {
|
||||
boris: borisKeys,
|
||||
|
||||
@@ -25,7 +25,6 @@ import q from 'q';
|
||||
|
||||
import DeviceInfo from './deviceinfo';
|
||||
import olmlib from './olmlib';
|
||||
import utils from '../utils';
|
||||
|
||||
/**
|
||||
* @alias module:crypto/DeviceList
|
||||
@@ -36,10 +35,11 @@ export default class DeviceList {
|
||||
this._sessionStore = sessionStore;
|
||||
this._olmDevice = olmDevice;
|
||||
|
||||
// users with outdated device lists
|
||||
// userId -> true
|
||||
this._pendingUsersWithNewDevices = {};
|
||||
|
||||
// userId -> [promise, ...]
|
||||
// userId -> promise
|
||||
this._keyDownloadsInProgressByUser = {};
|
||||
}
|
||||
|
||||
@@ -53,45 +53,30 @@ export default class DeviceList {
|
||||
* module:crypto/deviceinfo|DeviceInfo}.
|
||||
*/
|
||||
downloadKeys(userIds, forceDownload) {
|
||||
const self = this;
|
||||
|
||||
// promises we need to wait for while the download happens
|
||||
const promises = [];
|
||||
|
||||
// list of userids we need to download keys for
|
||||
let downloadUsers = [];
|
||||
|
||||
function perUserCatch(u) {
|
||||
return function(e) {
|
||||
console.warn('Error downloading keys for user ' + u + ':', e);
|
||||
};
|
||||
}
|
||||
|
||||
if (forceDownload) {
|
||||
downloadUsers = userIds;
|
||||
let needsRefresh = false;
|
||||
userIds.forEach((u) => {
|
||||
if (this._keyDownloadsInProgressByUser[u]) {
|
||||
// just wait for the existing download to complete
|
||||
promises.push(this._keyDownloadsInProgressByUser[u]);
|
||||
} else {
|
||||
for (let i = 0; i < userIds.length; ++i) {
|
||||
const u = userIds[i];
|
||||
|
||||
const inprogress = this._keyDownloadsInProgressByUser[u];
|
||||
if (inprogress) {
|
||||
// wait for the download to complete
|
||||
promises.push(q.any(inprogress).catch(perUserCatch(u)));
|
||||
} else if (!this.getStoredDevicesForUser(u)) {
|
||||
downloadUsers.push(u);
|
||||
if (forceDownload || !this.getStoredDevicesForUser(u)) {
|
||||
this.invalidateUserDeviceList(u);
|
||||
}
|
||||
if (this._pendingUsersWithNewDevices[u]) {
|
||||
needsRefresh = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (downloadUsers.length > 0) {
|
||||
const r = this._doKeyDownloadForUsers(downloadUsers);
|
||||
downloadUsers.map(function(u) {
|
||||
promises.push(r[u].catch(perUserCatch(u)));
|
||||
});
|
||||
|
||||
if (needsRefresh) {
|
||||
promises.push(this.refreshOutdatedDeviceLists(true));
|
||||
}
|
||||
|
||||
return q.all(promises).then(function() {
|
||||
return self._getDevicesFromStore(userIds);
|
||||
return q.all(promises).then(() => {
|
||||
return this._getDevicesFromStore(userIds);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -205,139 +190,137 @@ export default class DeviceList {
|
||||
* Mark the cached device list for the given user outdated.
|
||||
*
|
||||
* This doesn't set off an update, so that several users can be batched
|
||||
* together. Call flushDeviceListRequests() for that.
|
||||
* together. Call refreshOutdatedDeviceLists() for that.
|
||||
*
|
||||
* @param {String} userId
|
||||
*/
|
||||
invalidateUserDeviceList(userId) {
|
||||
// sanity-check the userId. This is mostly paranoia, but if synapse
|
||||
// can't parse the userId we give it as an mxid, it 500s the whole
|
||||
// request and we can never update the device lists again (because
|
||||
// the broken userId is always 'invalid' and always included in any
|
||||
// refresh request).
|
||||
// By checking it is at least a string, we can eliminate a class of
|
||||
// silly errors.
|
||||
if (typeof userId !== 'string') {
|
||||
throw new Error('userId must be a string; was '+userId);
|
||||
}
|
||||
this._pendingUsersWithNewDevices[userId] = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start device queries for any users who sent us an m.new_device recently
|
||||
* Start device queries for any users with outdated device lists
|
||||
*
|
||||
* We tolerate multiple concurrent device queries, but only one query per
|
||||
* user.
|
||||
*
|
||||
* If any users already have downloads in progress, they are ignored - they
|
||||
* will be refreshed when the current download completes anyway, so
|
||||
* each user with outdated device lists will be updated eventually.
|
||||
*
|
||||
* The returned promise resolves immediately if there are no users with
|
||||
* outdated device lists, or if all users with outdated device lists already
|
||||
* have a query in progress.
|
||||
*
|
||||
* Otherwise, a new query request is made, and the promise resolves
|
||||
* once that query completes. If the query fails, the promise will reject
|
||||
* if rejectOnFailure was truthy, otherwise it will still resolve.
|
||||
*
|
||||
* @param {Boolean?} rejectOnFailure true to make the returned promise
|
||||
* reject if the device list query fails.
|
||||
*
|
||||
* @return {Promise}
|
||||
*/
|
||||
flushNewDeviceRequests() {
|
||||
const users = Object.keys(this._pendingUsersWithNewDevices);
|
||||
refreshOutdatedDeviceLists(rejectOnFailure) {
|
||||
const users = Object.keys(this._pendingUsersWithNewDevices).filter(
|
||||
(u) => !this._keyDownloadsInProgressByUser[u],
|
||||
);
|
||||
|
||||
if (users.length === 0) {
|
||||
return;
|
||||
return q();
|
||||
}
|
||||
|
||||
const r = this._doKeyDownloadForUsers(users);
|
||||
let prom = this._doKeyDownloadForUsers(users).then(() => {
|
||||
users.forEach((u) => {
|
||||
delete this._keyDownloadsInProgressByUser[u];
|
||||
});
|
||||
|
||||
// we've kicked off requests to these users: remove their
|
||||
// pending flag for now.
|
||||
this._pendingUsersWithNewDevices = {};
|
||||
|
||||
users.map((u) => {
|
||||
r[u] = r[u].catch((e) => {
|
||||
// flush out any more requests that were blocked up while that
|
||||
// was going on, but let the initial promise complete now.
|
||||
//
|
||||
this.refreshOutdatedDeviceLists().done();
|
||||
}, (e) => {
|
||||
console.error(
|
||||
'Error updating device keys for user ' + u + ':', e,
|
||||
'Error updating device key cache for ' + users + ":", e,
|
||||
);
|
||||
|
||||
// reinstate the pending flags on any users which failed; this will
|
||||
// mean that we will do another download in the future, but won't
|
||||
// tight-loop.
|
||||
//
|
||||
users.forEach((u) => {
|
||||
delete this._keyDownloadsInProgressByUser[u];
|
||||
this._pendingUsersWithNewDevices[u] = true;
|
||||
});
|
||||
|
||||
// TODO: schedule a retry.
|
||||
throw e;
|
||||
});
|
||||
|
||||
q.all(Object.values(r)).done();
|
||||
users.forEach((u) => {
|
||||
delete this._pendingUsersWithNewDevices[u];
|
||||
this._keyDownloadsInProgressByUser[u] = prom;
|
||||
});
|
||||
|
||||
if (!rejectOnFailure) {
|
||||
// normally we just want to swallow the exception - we've already
|
||||
// logged it futher up.
|
||||
prom = prom.catch((e) => {});
|
||||
}
|
||||
return prom;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string[]} downloadUsers list of userIds
|
||||
*
|
||||
* @return {Object} a map from userId to a promise for a result for that user
|
||||
* @return {Promise}
|
||||
*/
|
||||
_doKeyDownloadForUsers(downloadUsers) {
|
||||
const self = this;
|
||||
|
||||
console.log('Starting key download for ' + downloadUsers);
|
||||
|
||||
const deferMap = {};
|
||||
const promiseMap = {};
|
||||
|
||||
downloadUsers.map(function(u) {
|
||||
const deferred = q.defer();
|
||||
const promise = deferred.promise.finally(function() {
|
||||
const inProgress = self._keyDownloadsInProgressByUser[u];
|
||||
utils.removeElement(inProgress, function(e) {
|
||||
return e === promise;
|
||||
});
|
||||
if (inProgress.length === 0) {
|
||||
// no more downloads for this user; remove the element
|
||||
delete self._keyDownloadsInProgressByUser[u];
|
||||
}
|
||||
});
|
||||
|
||||
if (!self._keyDownloadsInProgressByUser[u]) {
|
||||
self._keyDownloadsInProgressByUser[u] = [];
|
||||
}
|
||||
self._keyDownloadsInProgressByUser[u].push(promise);
|
||||
|
||||
deferMap[u] = deferred;
|
||||
promiseMap[u] = promise;
|
||||
});
|
||||
|
||||
this._baseApis.downloadKeysForUsers(
|
||||
return this._baseApis.downloadKeysForUsers(
|
||||
downloadUsers,
|
||||
).done(function(res) {
|
||||
).then((res) => {
|
||||
const dk = res.device_keys || {};
|
||||
|
||||
for (let i = 0; i < downloadUsers.length; ++i) {
|
||||
const userId = downloadUsers[i];
|
||||
var deviceId;
|
||||
|
||||
for (const userId of downloadUsers) {
|
||||
console.log('got keys for ' + userId + ':', dk[userId]);
|
||||
|
||||
if (!dk[userId]) {
|
||||
// no result for this user
|
||||
const err = 'Unknown';
|
||||
// TODO: do something with res.failures
|
||||
deferMap[userId].reject(err);
|
||||
continue;
|
||||
}
|
||||
|
||||
// map from deviceid -> deviceinfo for this user
|
||||
const userStore = {};
|
||||
const devs = self._sessionStore.getEndToEndDevicesForUser(userId);
|
||||
const devs = this._sessionStore.getEndToEndDevicesForUser(userId);
|
||||
if (devs) {
|
||||
for (deviceId in devs) {
|
||||
if (devs.hasOwnProperty(deviceId)) {
|
||||
Object.keys(devs).forEach((deviceId) => {
|
||||
const d = DeviceInfo.fromStorage(devs[deviceId], deviceId);
|
||||
userStore[deviceId] = d;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
_updateStoredDeviceKeysForUser(
|
||||
self._olmDevice, userId, userStore, dk[userId],
|
||||
this._olmDevice, userId, userStore, dk[userId] || {},
|
||||
);
|
||||
|
||||
// update the session store
|
||||
const storage = {};
|
||||
for (deviceId in userStore) {
|
||||
if (!userStore.hasOwnProperty(deviceId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Object.keys(userStore).forEach((deviceId) => {
|
||||
storage[deviceId] = userStore[deviceId].toStorage();
|
||||
}
|
||||
self._sessionStore.storeEndToEndDevicesForUser(
|
||||
});
|
||||
|
||||
this._sessionStore.storeEndToEndDevicesForUser(
|
||||
userId, storage,
|
||||
);
|
||||
|
||||
deferMap[userId].resolve();
|
||||
}
|
||||
}, function(err) {
|
||||
downloadUsers.map(function(u) {
|
||||
deferMap[u].reject(err);
|
||||
});
|
||||
});
|
||||
|
||||
return promiseMap;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -740,7 +740,7 @@ Crypto.prototype._onInitialSyncCompleted = function(rooms) {
|
||||
this._initialSyncCompleted = true;
|
||||
|
||||
// catch up on any m.new_device events which arrived during the initial sync.
|
||||
this._deviceList.flushNewDeviceRequests();
|
||||
this._deviceList.refreshOutdatedDeviceLists().done();
|
||||
|
||||
if (this._sessionStore.getDeviceAnnounced()) {
|
||||
return;
|
||||
@@ -877,7 +877,7 @@ Crypto.prototype._onNewDeviceEvent = function(event) {
|
||||
// we delay handling these until the intialsync has completed, so that we
|
||||
// can do all of them together.
|
||||
if (this._initialSyncCompleted) {
|
||||
this._deviceList.flushNewDeviceRequests();
|
||||
this._deviceList.refreshOutdatedDeviceLists().done();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user