You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-11-26 17:03:12 +03:00
Initial attempt at device tracking -> indexeddb
* Message sending works again, but * Marking multiple devices known (ie. 'send anyway') is very slow because it writes all device info out each time * Support for non-indexedb stores not written yet * No migration
This commit is contained in:
@@ -25,6 +25,7 @@ import Promise from 'bluebird';
|
||||
|
||||
import DeviceInfo from './deviceinfo';
|
||||
import olmlib from './olmlib';
|
||||
import IndexedDBCryptoStore from './store/indexeddb-crypto-store';
|
||||
|
||||
|
||||
/* State transition diagram for DeviceList._deviceTrackingStatus
|
||||
@@ -58,26 +59,95 @@ const TRACKING_STATUS_UP_TO_DATE = 3;
|
||||
* @alias module:crypto/DeviceList
|
||||
*/
|
||||
export default class DeviceList {
|
||||
constructor(baseApis, sessionStore, olmDevice) {
|
||||
this._sessionStore = sessionStore;
|
||||
this._serialiser = new DeviceListUpdateSerialiser(
|
||||
baseApis, sessionStore, olmDevice,
|
||||
);
|
||||
constructor(baseApis, cryptoStore, olmDevice) {
|
||||
this._cryptoStore = cryptoStore;
|
||||
|
||||
// userId -> {
|
||||
// deviceId -> {
|
||||
// [device info]
|
||||
// }
|
||||
// }
|
||||
this._devices = null;
|
||||
|
||||
// which users we are tracking device status for.
|
||||
// userId -> TRACKING_STATUS_*
|
||||
this._deviceTrackingStatus = sessionStore.getEndToEndDeviceTrackingStatus() || {};
|
||||
this._deviceTrackingStatus = null; // loaded from storage in load()
|
||||
|
||||
// The 'next_batch' sync token at the point the data was writen,
|
||||
// ie. a token represtenting the point immediately after the
|
||||
// moment represented by the snapshot in the db.
|
||||
this._syncToken = null;
|
||||
|
||||
this._serialiser = new DeviceListUpdateSerialiser(
|
||||
baseApis, olmDevice,
|
||||
);
|
||||
|
||||
// userId -> promise
|
||||
this._keyDownloadsInProgressByUser = {};
|
||||
|
||||
// Set whenever changes are made other than setting the sync token
|
||||
this._dirty = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the device tracking state from storage
|
||||
*/
|
||||
async load() {
|
||||
await this._cryptoStore.doTxn(
|
||||
'readonly', [IndexedDBCryptoStore.STORE_DEVICE_DATA], (txn) => {
|
||||
this._cryptoStore.getEndToEndDeviceData(txn, (deviceData) => {
|
||||
this._devices = deviceData ? deviceData.devices : {},
|
||||
this._deviceTrackingStatus = deviceData ? deviceData.trackingStatus : {};
|
||||
this._syncToken = deviceData ? deviceData.syncToken : null;
|
||||
});
|
||||
},
|
||||
);
|
||||
for (const u of Object.keys(this._deviceTrackingStatus)) {
|
||||
// if a download was in progress when we got shut down, it isn't any more.
|
||||
if (this._deviceTrackingStatus[u] == TRACKING_STATUS_DOWNLOAD_IN_PROGRESS) {
|
||||
this._deviceTrackingStatus[u] = TRACKING_STATUS_PENDING_DOWNLOAD;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// userId -> promise
|
||||
this._keyDownloadsInProgressByUser = {};
|
||||
/**
|
||||
* Save the device tracking state to storage, if any changes are
|
||||
* pending other than updating the sync token
|
||||
* Before calling this, the caller must ensure that the state it
|
||||
* has set this object to is consistent, ie. the appropriate sync
|
||||
* token has been set with setSyncToken for any device updates that
|
||||
* have occurred.
|
||||
*/
|
||||
async saveIfDirty() {
|
||||
if (!this._dirty) return;
|
||||
await this._cryptoStore.doTxn(
|
||||
'readwrite', [IndexedDBCryptoStore.STORE_DEVICE_DATA], (txn) => {
|
||||
this._cryptoStore.storeEndToEndDeviceData({
|
||||
devices: this._devices,
|
||||
trackingStatus: this._deviceTrackingStatus,
|
||||
syncToken: this._syncToken,
|
||||
}, txn);
|
||||
},
|
||||
);
|
||||
this._dirty = false;
|
||||
}
|
||||
|
||||
this.lastKnownSyncToken = null;
|
||||
/**
|
||||
* Gets the current sync token
|
||||
*
|
||||
* @return {string} The sync token
|
||||
*/
|
||||
getSyncToken() {
|
||||
return this._syncToken;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the current sync token
|
||||
*
|
||||
* @param {string} st The sync token
|
||||
*/
|
||||
setSyncToken(st) {
|
||||
this._syncToken = st;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -152,7 +222,7 @@ export default class DeviceList {
|
||||
* managed to get a list of devices for this user yet.
|
||||
*/
|
||||
getStoredDevicesForUser(userId) {
|
||||
const devs = this._sessionStore.getEndToEndDevicesForUser(userId);
|
||||
const devs = this._devices[userId];
|
||||
if (!devs) {
|
||||
return null;
|
||||
}
|
||||
@@ -165,6 +235,22 @@ export default class DeviceList {
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the stored device data for a user, in raw object form
|
||||
*
|
||||
* @param {string} userId the user to get data for
|
||||
*
|
||||
* @return {Object} userId->deviceId->{object} devices, or null if
|
||||
* there is no data for this user.
|
||||
*/
|
||||
getRawStoredDevicesForUser(userId) {
|
||||
const devs = this._devices[userId];
|
||||
if (!devs) {
|
||||
return null;
|
||||
}
|
||||
return devs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the stored keys for a single device
|
||||
*
|
||||
@@ -175,7 +261,7 @@ export default class DeviceList {
|
||||
* if we don't know about this device
|
||||
*/
|
||||
getStoredDevice(userId, deviceId) {
|
||||
const devs = this._sessionStore.getEndToEndDevicesForUser(userId);
|
||||
const devs = this._devices[userId];
|
||||
if (!devs || !devs[deviceId]) {
|
||||
return undefined;
|
||||
}
|
||||
@@ -200,7 +286,7 @@ export default class DeviceList {
|
||||
return null;
|
||||
}
|
||||
|
||||
const devices = this._sessionStore.getEndToEndDevicesForUser(userId);
|
||||
const devices = this._devices[userId];
|
||||
if (!devices) {
|
||||
return null;
|
||||
}
|
||||
@@ -229,6 +315,14 @@ export default class DeviceList {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces the list of devices for a user with the given device list
|
||||
*/
|
||||
storeDevicesForUser(u, devs) {
|
||||
this._devices[u] = devs;
|
||||
this._dirty = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* flag the given user for device-list tracking, if they are not already.
|
||||
*
|
||||
@@ -254,8 +348,8 @@ export default class DeviceList {
|
||||
this._deviceTrackingStatus[userId] = TRACKING_STATUS_PENDING_DOWNLOAD;
|
||||
}
|
||||
// we don't yet persist the tracking status, since there may be a lot
|
||||
// of calls; instead we wait for the forthcoming
|
||||
// refreshOutdatedDeviceLists.
|
||||
// of calls; we save all data together once the sync is done
|
||||
this._dirty = true;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -273,8 +367,9 @@ export default class DeviceList {
|
||||
this._deviceTrackingStatus[userId] = TRACKING_STATUS_NOT_TRACKED;
|
||||
}
|
||||
// we don't yet persist the tracking status, since there may be a lot
|
||||
// of calls; instead we wait for the forthcoming
|
||||
// refreshOutdatedDeviceLists.
|
||||
// of calls; we save all data together once the sync is done
|
||||
|
||||
this._dirty = true;
|
||||
}
|
||||
|
||||
|
||||
@@ -295,8 +390,9 @@ export default class DeviceList {
|
||||
this._deviceTrackingStatus[userId] = TRACKING_STATUS_PENDING_DOWNLOAD;
|
||||
}
|
||||
// we don't yet persist the tracking status, since there may be a lot
|
||||
// of calls; instead we wait for the forthcoming
|
||||
// refreshOutdatedDeviceLists.
|
||||
// of calls; we save all data together once the sync is done
|
||||
|
||||
this._dirty = true;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -318,6 +414,8 @@ export default class DeviceList {
|
||||
* is no need to wait for this (it's mostly for the unit tests).
|
||||
*/
|
||||
refreshOutdatedDeviceLists() {
|
||||
this.saveIfDirty();
|
||||
|
||||
const usersToDownload = [];
|
||||
for (const userId of Object.keys(this._deviceTrackingStatus)) {
|
||||
const stat = this._deviceTrackingStatus[userId];
|
||||
@@ -326,10 +424,6 @@ export default class DeviceList {
|
||||
}
|
||||
}
|
||||
|
||||
// we didn't persist the tracking status during
|
||||
// invalidateUserDeviceList, so do it now.
|
||||
this._persistDeviceTrackingStatus();
|
||||
|
||||
return this._doKeyDownload(usersToDownload);
|
||||
}
|
||||
|
||||
@@ -352,14 +446,14 @@ export default class DeviceList {
|
||||
}
|
||||
|
||||
const prom = this._serialiser.updateDevicesForUsers(
|
||||
users, this.lastKnownSyncToken,
|
||||
).then(() => {
|
||||
finished(true);
|
||||
this._devices, users, this._syncToken,
|
||||
).then((newDevices) => {
|
||||
finished(newDevices);
|
||||
}, (e) => {
|
||||
console.error(
|
||||
'Error downloading keys for ' + users + ":", e,
|
||||
);
|
||||
finished(false);
|
||||
finished(null);
|
||||
throw e;
|
||||
});
|
||||
|
||||
@@ -371,7 +465,7 @@ export default class DeviceList {
|
||||
}
|
||||
});
|
||||
|
||||
const finished = (success) => {
|
||||
const finished = (newDevices) => {
|
||||
users.forEach((u) => {
|
||||
// we may have queued up another download request for this user
|
||||
// since we started this request. If that happens, we should
|
||||
@@ -384,25 +478,23 @@ export default class DeviceList {
|
||||
delete this._keyDownloadsInProgressByUser[u];
|
||||
const stat = this._deviceTrackingStatus[u];
|
||||
if (stat == TRACKING_STATUS_DOWNLOAD_IN_PROGRESS) {
|
||||
if (success) {
|
||||
if (newDevices) {
|
||||
// we didn't get any new invalidations since this download started:
|
||||
// this user's device list is now up to date.
|
||||
this._deviceTrackingStatus[u] = TRACKING_STATUS_UP_TO_DATE;
|
||||
this._devices[u] = newDevices[u];
|
||||
this._dirty = true;
|
||||
console.log("Device list for", u, "now up to date");
|
||||
} else {
|
||||
this._deviceTrackingStatus[u] = TRACKING_STATUS_PENDING_DOWNLOAD;
|
||||
}
|
||||
}
|
||||
});
|
||||
this._persistDeviceTrackingStatus();
|
||||
this.saveIfDirty();
|
||||
};
|
||||
|
||||
return prom;
|
||||
}
|
||||
|
||||
_persistDeviceTrackingStatus() {
|
||||
this._sessionStore.storeEndToEndDeviceTrackingStatus(this._deviceTrackingStatus);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -415,9 +507,8 @@ export default class DeviceList {
|
||||
* time (and queuing other requests up).
|
||||
*/
|
||||
class DeviceListUpdateSerialiser {
|
||||
constructor(baseApis, sessionStore, olmDevice) {
|
||||
constructor(baseApis, olmDevice) {
|
||||
this._baseApis = baseApis;
|
||||
this._sessionStore = sessionStore;
|
||||
this._olmDevice = olmDevice;
|
||||
|
||||
this._downloadInProgress = false;
|
||||
@@ -431,14 +522,15 @@ class DeviceListUpdateSerialiser {
|
||||
// non-null indicates that we have users queued for download.
|
||||
this._queuedQueryDeferred = null;
|
||||
|
||||
// sync token to be used for the next query: essentially the
|
||||
// most recent one we know about
|
||||
this._nextSyncToken = null;
|
||||
this._devices = null; // the complete device list
|
||||
this._updatedDevices = null; // device list updates we've fetched
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a key query request for the given users
|
||||
*
|
||||
* @param {object} devices The current device list
|
||||
*
|
||||
* @param {String[]} users list of user ids
|
||||
*
|
||||
* @param {String} syncToken sync token to pass in the query request, to
|
||||
@@ -446,13 +538,12 @@ class DeviceListUpdateSerialiser {
|
||||
*
|
||||
* @return {module:client.Promise} resolves when all the users listed have
|
||||
* been updated. rejects if there was a problem updating any of the
|
||||
* users.
|
||||
* users. Returns a fresh device list object for the users queried.
|
||||
*/
|
||||
updateDevicesForUsers(users, syncToken) {
|
||||
updateDevicesForUsers(devices, users, syncToken) {
|
||||
users.forEach((u) => {
|
||||
this._keyDownloadsQueuedByUser[u] = true;
|
||||
});
|
||||
this._nextSyncToken = syncToken;
|
||||
|
||||
if (!this._queuedQueryDeferred) {
|
||||
this._queuedQueryDeferred = Promise.defer();
|
||||
@@ -464,11 +555,13 @@ class DeviceListUpdateSerialiser {
|
||||
return this._queuedQueryDeferred.promise;
|
||||
}
|
||||
|
||||
this._devices = devices;
|
||||
this._updatedDevices = {};
|
||||
// start a new download.
|
||||
return this._doQueuedQueries();
|
||||
return this._doQueuedQueries(syncToken);
|
||||
}
|
||||
|
||||
_doQueuedQueries() {
|
||||
_doQueuedQueries(syncToken) {
|
||||
if (this._downloadInProgress) {
|
||||
throw new Error(
|
||||
"DeviceListUpdateSerialiser._doQueuedQueries called with request active",
|
||||
@@ -484,8 +577,8 @@ class DeviceListUpdateSerialiser {
|
||||
this._downloadInProgress = true;
|
||||
|
||||
const opts = {};
|
||||
if (this._nextSyncToken) {
|
||||
opts.token = this._nextSyncToken;
|
||||
if (syncToken) {
|
||||
opts.token = syncToken;
|
||||
}
|
||||
|
||||
this._baseApis.downloadKeysForUsers(
|
||||
@@ -510,11 +603,11 @@ class DeviceListUpdateSerialiser {
|
||||
console.log('Completed key download for ' + downloadUsers);
|
||||
|
||||
this._downloadInProgress = false;
|
||||
deferred.resolve();
|
||||
deferred.resolve(this._updatedDevices);
|
||||
|
||||
// if we have queued users, fire off another request.
|
||||
if (this._queuedQueryDeferred) {
|
||||
this._doQueuedQueries();
|
||||
this._doQueuedQueries(syncToken);
|
||||
}
|
||||
}, (e) => {
|
||||
console.warn('Error downloading keys for ' + downloadUsers + ':', e);
|
||||
@@ -530,7 +623,7 @@ class DeviceListUpdateSerialiser {
|
||||
|
||||
// map from deviceid -> deviceinfo for this user
|
||||
const userStore = {};
|
||||
const devs = this._sessionStore.getEndToEndDevicesForUser(userId);
|
||||
const devs = this._devices[userId];
|
||||
if (devs) {
|
||||
Object.keys(devs).forEach((deviceId) => {
|
||||
const d = DeviceInfo.fromStorage(devs[deviceId], deviceId);
|
||||
@@ -548,9 +641,7 @@ class DeviceListUpdateSerialiser {
|
||||
storage[deviceId] = userStore[deviceId].toStorage();
|
||||
});
|
||||
|
||||
this._sessionStore.storeEndToEndDevicesForUser(
|
||||
userId, storage,
|
||||
);
|
||||
this._updatedDevices[userId] = storage;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user