You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-08-09 10:22:46 +03:00
Fix race condition in device list query
Fix a race where device list queries completing out-of-order could lead to us thinking that we were more in-sync than we actually were.
This commit is contained in:
@@ -914,6 +914,99 @@ describe("megolm", function() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
it("We should not get confused by out-of-order device query responses",
|
||||||
|
() => {
|
||||||
|
// https://github.com/vector-im/riot-web/issues/3126
|
||||||
|
return aliceTestClient.start().then(() => {
|
||||||
|
aliceTestClient.httpBackend.when('GET', '/sync').respond(
|
||||||
|
200, getSyncResponse(['@bob:xyz', '@chris:abc']));
|
||||||
|
return aliceTestClient.httpBackend.flush('/sync', 1);
|
||||||
|
}).then(() => {
|
||||||
|
// to make sure the initial device queries are flushed out, we
|
||||||
|
// attempt to send a message.
|
||||||
|
|
||||||
|
aliceTestClient.httpBackend.when('POST', '/keys/query').respond(
|
||||||
|
200, {
|
||||||
|
device_keys: {
|
||||||
|
'@bob:xyz': {},
|
||||||
|
'@chris:abc': {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
aliceTestClient.httpBackend.when('PUT', '/send/').respond(
|
||||||
|
200, {event_id: '$event1'});
|
||||||
|
|
||||||
|
return q.all([
|
||||||
|
aliceTestClient.client.sendTextMessage(ROOM_ID, 'test'),
|
||||||
|
aliceTestClient.httpBackend.flush('/keys/query', 1).then(
|
||||||
|
() => aliceTestClient.httpBackend.flush('/send/', 1, 20),
|
||||||
|
),
|
||||||
|
]);
|
||||||
|
}).then(() => {
|
||||||
|
expect(aliceTestClient.storage.getEndToEndDeviceSyncToken()).toEqual(1);
|
||||||
|
|
||||||
|
// invalidate bob's and chris's device lists in separate syncs
|
||||||
|
aliceTestClient.httpBackend.when('GET', '/sync').respond(200, {
|
||||||
|
next_batch: '2',
|
||||||
|
device_lists: {
|
||||||
|
changed: ['@bob:xyz'],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
aliceTestClient.httpBackend.when('GET', '/sync').respond(200, {
|
||||||
|
next_batch: '3',
|
||||||
|
device_lists: {
|
||||||
|
changed: ['@chris:abc'],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return aliceTestClient.httpBackend.flush('/sync', 2);
|
||||||
|
}).then(() => {
|
||||||
|
// check that we don't yet have a request for chris's devices.
|
||||||
|
aliceTestClient.httpBackend.when('POST', '/keys/query', {
|
||||||
|
device_keys: {
|
||||||
|
'@chris:abc': {},
|
||||||
|
},
|
||||||
|
token: '3',
|
||||||
|
}).respond(200, {
|
||||||
|
device_keys: {'@chris:abc': {}},
|
||||||
|
});
|
||||||
|
return aliceTestClient.httpBackend.flush('/keys/query', 1);
|
||||||
|
}).then((flushed) => {
|
||||||
|
expect(flushed).toEqual(0);
|
||||||
|
expect(aliceTestClient.storage.getEndToEndDeviceSyncToken()).toEqual(1);
|
||||||
|
|
||||||
|
// now add an expectation for a query for bob's devices, and let
|
||||||
|
// it complete.
|
||||||
|
aliceTestClient.httpBackend.when('POST', '/keys/query', {
|
||||||
|
device_keys: {
|
||||||
|
'@bob:xyz': {},
|
||||||
|
},
|
||||||
|
token: '2',
|
||||||
|
}).respond(200, {
|
||||||
|
device_keys: {'@bob:xyz': {}},
|
||||||
|
});
|
||||||
|
return aliceTestClient.httpBackend.flush('/keys/query', 1);
|
||||||
|
}).then((flushed) => {
|
||||||
|
expect(flushed).toEqual(1);
|
||||||
|
|
||||||
|
// wait for the client to stop processing the response
|
||||||
|
return aliceTestClient.client.downloadKeys(['@bob:xyz']);
|
||||||
|
}).then(() => {
|
||||||
|
expect(aliceTestClient.storage.getEndToEndDeviceSyncToken()).toEqual(2);
|
||||||
|
|
||||||
|
// now let the query for chris's devices complete.
|
||||||
|
return aliceTestClient.httpBackend.flush('/keys/query', 1);
|
||||||
|
}).then((flushed) => {
|
||||||
|
expect(flushed).toEqual(1);
|
||||||
|
|
||||||
|
// wait for the client to stop processing the response
|
||||||
|
return aliceTestClient.client.downloadKeys(['@chris:abc']);
|
||||||
|
}).then(() => {
|
||||||
|
expect(aliceTestClient.storage.getEndToEndDeviceSyncToken()).toEqual(3);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
it("Alice exports megolm keys and imports them to a new device", function(done) {
|
it("Alice exports megolm keys and imports them to a new device", function(done) {
|
||||||
let messageEncrypted;
|
let messageEncrypted;
|
||||||
|
|
||||||
|
@@ -39,9 +39,22 @@ export default class DeviceList {
|
|||||||
// userId -> true
|
// userId -> true
|
||||||
this._pendingUsersWithNewDevices = {};
|
this._pendingUsersWithNewDevices = {};
|
||||||
|
|
||||||
// userId -> promise
|
// userId -> true
|
||||||
this._keyDownloadsInProgressByUser = {};
|
this._keyDownloadsInProgressByUser = {};
|
||||||
|
|
||||||
|
// deferred which is resolved when the current device query resolves.
|
||||||
|
// (null if there is no current request).
|
||||||
|
this._currentQueryDeferred = null;
|
||||||
|
|
||||||
|
// deferred which is resolved when the *next* device query resolves.
|
||||||
|
//
|
||||||
|
// Normally it is meaningless for this to be non-null when
|
||||||
|
// _currentQueryDeferred is null, but it can happen if the previous
|
||||||
|
// query has finished but the next one has not yet started (because the
|
||||||
|
// previous query failed, in which case we deliberately delay starting
|
||||||
|
// the next query to avoid tight-looping).
|
||||||
|
this._queuedQueryDeferred = null;
|
||||||
|
|
||||||
this.lastKnownSyncToken = null;
|
this.lastKnownSyncToken = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -55,35 +68,43 @@ export default class DeviceList {
|
|||||||
* module:crypto/deviceinfo|DeviceInfo}.
|
* module:crypto/deviceinfo|DeviceInfo}.
|
||||||
*/
|
*/
|
||||||
downloadKeys(userIds, forceDownload) {
|
downloadKeys(userIds, forceDownload) {
|
||||||
// promises we need to wait for while the download happens
|
|
||||||
const promises = [];
|
|
||||||
|
|
||||||
let needsRefresh = false;
|
let needsRefresh = false;
|
||||||
|
let waitForCurrentQuery = false;
|
||||||
|
|
||||||
userIds.forEach((u) => {
|
userIds.forEach((u) => {
|
||||||
if (this._keyDownloadsInProgressByUser[u]) {
|
if (this._pendingUsersWithNewDevices[u]) {
|
||||||
// just wait for the existing download to complete
|
// we already know this user's devices are outdated
|
||||||
promises.push(this._keyDownloadsInProgressByUser[u]);
|
needsRefresh = true;
|
||||||
} else {
|
} else if (this._keyDownloadsInProgressByUser[u]) {
|
||||||
if (forceDownload) {
|
// already a download in progress - just wait for it.
|
||||||
console.log("Invalidating device list for " + u +
|
// (even if forceDownload is true)
|
||||||
" for forceDownload");
|
waitForCurrentQuery = true;
|
||||||
this.invalidateUserDeviceList(u);
|
} else if (forceDownload) {
|
||||||
} else if (!this.getStoredDevicesForUser(u)) {
|
console.log("Invalidating device list for " + u +
|
||||||
console.log("Invalidating device list for " + u +
|
" for forceDownload");
|
||||||
" due to empty cache");
|
this.invalidateUserDeviceList(u);
|
||||||
this.invalidateUserDeviceList(u);
|
needsRefresh = true;
|
||||||
}
|
} else if (!this.getStoredDevicesForUser(u)) {
|
||||||
if (this._pendingUsersWithNewDevices[u]) {
|
console.log("Invalidating device list for " + u +
|
||||||
needsRefresh = true;
|
" due to empty cache");
|
||||||
}
|
this.invalidateUserDeviceList(u);
|
||||||
|
needsRefresh = true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let promise;
|
||||||
if (needsRefresh) {
|
if (needsRefresh) {
|
||||||
promises.push(this.refreshOutdatedDeviceLists(true));
|
console.log("downloadKeys: waiting for next key query");
|
||||||
|
promise = this._startOrQueueDeviceQuery();
|
||||||
|
} else if(waitForCurrentQuery) {
|
||||||
|
console.log("downloadKeys: waiting for in-flight query to complete");
|
||||||
|
promise = this._currentQueryDeferred.promise;
|
||||||
|
} else {
|
||||||
|
// we're all up-to-date.
|
||||||
|
promise = q();
|
||||||
}
|
}
|
||||||
|
|
||||||
return q.all(promises).then(() => {
|
return promise.then(() => {
|
||||||
return this._getDevicesFromStore(userIds);
|
return this._getDevicesFromStore(userIds);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -217,75 +238,97 @@ export default class DeviceList {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start device queries for any users with outdated device lists
|
* If there is not already a device list query in progress, and we have
|
||||||
*
|
* users who have outdated device lists, start a query now.
|
||||||
* We tolerate multiple concurrent device queries, but only one query per
|
*/
|
||||||
* user.
|
refreshOutdatedDeviceLists() {
|
||||||
*
|
if (this._currentQueryDeferred) {
|
||||||
* If any users already have downloads in progress, they are ignored - they
|
// request already in progress - do nothing. (We will automatically
|
||||||
* will be refreshed when the current download completes anyway, so
|
// make another request if there are more users with outdated
|
||||||
* each user with outdated device lists will be updated eventually.
|
// device lists when the current request completes).
|
||||||
*
|
return;
|
||||||
* The returned promise resolves immediately if there are no users with
|
}
|
||||||
* outdated device lists, or if all users with outdated device lists already
|
|
||||||
* have a query in progress.
|
this._startDeviceQuery();
|
||||||
*
|
}
|
||||||
* Otherwise, a new query request is made, and the promise resolves
|
|
||||||
* once that query completes. If the query fails, the promise will reject
|
/**
|
||||||
* if rejectOnFailure was truthy, otherwise it will still resolve.
|
* If there is currently a device list query in progress, returns a promise
|
||||||
*
|
* which will resolve when the *next* query completes. Otherwise, starts
|
||||||
* @param {Boolean?} rejectOnFailure true to make the returned promise
|
* a new query, and returns a promise which resolves when it completes.
|
||||||
* reject if the device list query fails.
|
|
||||||
*
|
*
|
||||||
* @return {Promise}
|
* @return {Promise}
|
||||||
*/
|
*/
|
||||||
refreshOutdatedDeviceLists(rejectOnFailure) {
|
_startOrQueueDeviceQuery() {
|
||||||
const users = Object.keys(this._pendingUsersWithNewDevices).filter(
|
if (!this._currentQueryDeferred) {
|
||||||
(u) => !this._keyDownloadsInProgressByUser[u],
|
this._startDeviceQuery();
|
||||||
);
|
if (!this._currentQueryDeferred) {
|
||||||
|
return q();
|
||||||
|
}
|
||||||
|
|
||||||
if (users.length === 0) {
|
return this._currentQueryDeferred.promise;
|
||||||
return q();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let prom = this._doKeyDownloadForUsers(users).then(() => {
|
if (!this._queuedQueryDeferred) {
|
||||||
|
this._queuedQueryDeferred = q.defer();
|
||||||
|
}
|
||||||
|
|
||||||
|
return this._queuedQueryDeferred.promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* kick off a new device query
|
||||||
|
*
|
||||||
|
* Throws if there is already a query in progress.
|
||||||
|
*/
|
||||||
|
_startDeviceQuery() {
|
||||||
|
if (this._currentQueryDeferred) {
|
||||||
|
throw new Error("DeviceList._startDeviceQuery called with request active");
|
||||||
|
}
|
||||||
|
|
||||||
|
this._currentQueryDeferred = this._queuedQueryDeferred || q.defer();
|
||||||
|
this._queuedQueryDeferred = null;
|
||||||
|
|
||||||
|
const users = Object.keys(this._pendingUsersWithNewDevices);
|
||||||
|
if (users.length === 0) {
|
||||||
|
// nothing to do
|
||||||
|
this._currentQueryDeferred.resolve();
|
||||||
|
this._currentQueryDeferred = null;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this._doKeyDownloadForUsers(users).done(() => {
|
||||||
users.forEach((u) => {
|
users.forEach((u) => {
|
||||||
delete this._keyDownloadsInProgressByUser[u];
|
delete this._keyDownloadsInProgressByUser[u];
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this._currentQueryDeferred.resolve();
|
||||||
|
this._currentQueryDeferred = null;
|
||||||
|
|
||||||
// flush out any more requests that were blocked up while that
|
// flush out any more requests that were blocked up while that
|
||||||
// was going on, but let the initial promise complete now.
|
// was going on.
|
||||||
//
|
this._startDeviceQuery();
|
||||||
this.refreshOutdatedDeviceLists().done();
|
|
||||||
}, (e) => {
|
}, (e) => {
|
||||||
console.error(
|
console.error(
|
||||||
'Error updating device key cache for ' + users + ":", e,
|
'Error updating device key cache for ' + users + ":", e,
|
||||||
);
|
);
|
||||||
|
|
||||||
// reinstate the pending flags on any users which failed; this will
|
// reinstate the pending flags on any users which failed; this will
|
||||||
// mean that we will do another download in the future, but won't
|
// mean that we will do another download in the future (actually on
|
||||||
// tight-loop.
|
// the next /sync).
|
||||||
//
|
|
||||||
users.forEach((u) => {
|
users.forEach((u) => {
|
||||||
delete this._keyDownloadsInProgressByUser[u];
|
delete this._keyDownloadsInProgressByUser[u];
|
||||||
this._pendingUsersWithNewDevices[u] = true;
|
this._pendingUsersWithNewDevices[u] = true;
|
||||||
});
|
});
|
||||||
|
|
||||||
// TODO: schedule a retry.
|
this._currentQueryDeferred.reject(e);
|
||||||
throw e;
|
this._currentQueryDeferred = null;
|
||||||
});
|
});
|
||||||
|
|
||||||
users.forEach((u) => {
|
users.forEach((u) => {
|
||||||
delete this._pendingUsersWithNewDevices[u];
|
delete this._pendingUsersWithNewDevices[u];
|
||||||
this._keyDownloadsInProgressByUser[u] = prom;
|
this._keyDownloadsInProgressByUser[u] = true;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!rejectOnFailure) {
|
|
||||||
// normally we just want to swallow the exception - we've already
|
|
||||||
// logged it futher up.
|
|
||||||
prom = prom.catch((e) => {});
|
|
||||||
}
|
|
||||||
return prom;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@@ -778,7 +778,7 @@ Crypto.prototype._onSyncCompleted = function(syncData) {
|
|||||||
// if that failed, we fall back to invalidating everyone.
|
// if that failed, we fall back to invalidating everyone.
|
||||||
console.warn("Error fetching changed device list", e);
|
console.warn("Error fetching changed device list", e);
|
||||||
this._invalidateDeviceListForAllActiveUsers();
|
this._invalidateDeviceListForAllActiveUsers();
|
||||||
return this._deviceList.refreshOutdatedDeviceLists();
|
this._deviceList.refreshOutdatedDeviceLists();
|
||||||
}).done();
|
}).done();
|
||||||
} else {
|
} else {
|
||||||
// otherwise, we have to invalidate all devices for all users we
|
// otherwise, we have to invalidate all devices for all users we
|
||||||
@@ -790,7 +790,7 @@ Crypto.prototype._onSyncCompleted = function(syncData) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// catch up on any new devices we got told about during the sync.
|
// catch up on any new devices we got told about during the sync.
|
||||||
this._deviceList.refreshOutdatedDeviceLists().done();
|
this._deviceList.refreshOutdatedDeviceLists();
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -869,7 +869,7 @@ Crypto.prototype._invalidateDeviceListsSince = function(oldSyncToken) {
|
|||||||
this._deviceList.invalidateUserDeviceList(u);
|
this._deviceList.invalidateUserDeviceList(u);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return this._deviceList.refreshOutdatedDeviceLists();
|
this._deviceList.refreshOutdatedDeviceLists();
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user