1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-11-28 05:03:59 +03:00

refactor megolm encryption to improve perceived speed

- allow applications to pre-send decryption keys before the message is sent
- establish new olm sessions with a shorter timeout first, and then re-try in
  the background with a longer timeout without blocking message sending
This commit is contained in:
Hubert Chathi
2020-03-09 18:27:43 -04:00
parent acba31bd6d
commit 98d955ef1f
6 changed files with 305 additions and 177 deletions

View File

@@ -60,6 +60,15 @@ export class EncryptionAlgorithm {
this._roomId = params.roomId;
}
/**
* Perform any background tasks that can be done before a message is ready to
* send, in order to speed up sending of the message.
*
* @param {module:models/room} room the room the event is in
*/
prepareToEncrypt(room) {
}
/**
* Encrypt a message event
*

View File

@@ -192,8 +192,6 @@ utils.inherits(MegolmEncryption, EncryptionAlgorithm);
MegolmEncryption.prototype._ensureOutboundSession = async function(
devicesInRoom, blocked,
) {
const self = this;
let session;
// takes the previous OutboundSessionInfo, and considers whether to create
@@ -201,12 +199,12 @@ MegolmEncryption.prototype._ensureOutboundSession = async function(
// Updates `session` to hold the final OutboundSessionInfo.
//
// returns a promise which resolves once the keyshare is successful.
async function prepareSession(oldSession) {
const prepareSession = async (oldSession) => {
session = oldSession;
// need to make a brand new session?
if (session && session.needsRotation(self._sessionRotationPeriodMsgs,
self._sessionRotationPeriodMs)
if (session && session.needsRotation(this._sessionRotationPeriodMsgs,
this._sessionRotationPeriodMs)
) {
logger.log("Starting new megolm session because we need to rotate.");
session = null;
@@ -218,32 +216,20 @@ MegolmEncryption.prototype._ensureOutboundSession = async function(
}
if (!session) {
logger.log(`Starting new megolm session for room ${self._roomId}`);
session = await self._prepareNewSession();
logger.log(`Starting new megolm session for room ${this._roomId}`);
session = await this._prepareNewSession();
logger.log(`Started new megolm session ${session.sessionId} ` +
`for room ${self._roomId}`);
self._outboundSessions[session.sessionId] = session;
`for room ${this._roomId}`);
this._outboundSessions[session.sessionId] = session;
}
// now check if we need to share with any devices
const shareMap = {};
for (const userId in devicesInRoom) {
if (!devicesInRoom.hasOwnProperty(userId)) {
continue;
}
const userDevices = devicesInRoom[userId];
for (const deviceId in userDevices) {
if (!userDevices.hasOwnProperty(deviceId)) {
continue;
}
const deviceInfo = userDevices[deviceId];
for (const [userId, userDevices] of Object.entries(devicesInRoom)) {
for (const [deviceId, deviceInfo] of Object.entries(userDevices)) {
const key = deviceInfo.getIdentityKey();
if (key == self._olmDevice.deviceCurve25519Key) {
if (key == this._olmDevice.deviceCurve25519Key) {
// don't bother sending to ourself
continue;
}
@@ -260,49 +246,100 @@ MegolmEncryption.prototype._ensureOutboundSession = async function(
const errorDevices = [];
await self._shareKeyWithDevices(
session, shareMap, errorDevices,
const key = this._olmDevice.getOutboundGroupSessionKey(session.sessionId);
const payload = {
type: "m.room_key",
content: {
algorithm: olmlib.MEGOLM_ALGORITHM,
room_id: this._roomId,
session_id: session.sessionId,
session_key: key.key,
chain_index: key.chain_index,
},
};
const [devicesWithoutSession, olmSessions] = await olmlib.getExistingOlmSessions(
this._olmDevice, this._baseApis, shareMap,
);
// are there any new blocked devices that we need to notify?
const blockedMap = {};
for (const userId in blocked) {
if (!blocked.hasOwnProperty(userId)) {
continue;
}
await Promise.all([
(async () => {
// share keys with devices that we already have a session for
await this._shareKeyWithOlmSessions(
session, key, payload, olmSessions,
);
})(),
(async () => {
// meanwhile, establish olm sessions for devices that we don't
// already have a session for, and share keys with them. Use a
// shorter timeout when fetching one-time keys.
await this._shareKeyWithDevices(
session, key, payload, devicesWithoutSession, errorDevices, 2000,
);
const userBlockedDevices = blocked[userId];
(async () => {
// Retry sending keys to devices that we were unable to establish
// an olm session for. This time, we use a longer timeout, but we
// do this in the background and don't block anything else while we
// do this.
const retryDevices = {};
for (const {userId, deviceInfo} of errorDevices) {
retryDevices[userId] = retryDevices[userId] || [];
retryDevices[userId].push(deviceInfo);
}
for (const deviceId in userBlockedDevices) {
if (!userBlockedDevices.hasOwnProperty(deviceId)) {
continue;
const failedDevices = [];
await this._shareKeyWithDevices(
session, key, payload, retryDevices, failedDevices,
);
const blockedMap = {};
const filteredFailedDevices =
await this._olmDevice.filterOutNotifiedErrorDevices(
failedDevices,
);
for (const {userId, deviceInfo} of filteredFailedDevices) {
blockedMap[userId] = blockedMap[userId] || {};
// we use a similar format to what
// olmlib.ensureOlmSessionsForDevices returns, so that
// we can use the same function to split
blockedMap[userId][deviceInfo.deviceId] = {
device: {
code: "m.no_olm",
reason: WITHHELD_MESSAGES["m.no_olm"],
deviceInfo,
},
};
const deviceId = deviceInfo.deviceId;
// mark this device as "handled" because we don't want to try
// to claim a one-time-key for dead devices on every message.
session.markSharedWithDevice(userId, deviceId, key.chain_index);
}
// notify devices that we couldn't get an olm session
await this._notifyBlockedDevices(session, blockedMap);
})();
})(),
(async () => {
// also, notify blocked devices that they're blocked
const blockedMap = {};
for (const [userId, userBlockedDevices] of Object.entries(blocked)) {
for (const [deviceId, device] of Object.entries(userBlockedDevices)) {
if (
!session.blockedDevicesNotified[userId] ||
session.blockedDevicesNotified[userId][deviceId] === undefined
) {
blockedMap[userId] = blockedMap[userId] || {};
blockedMap[userId][deviceId] = {device};
}
}
}
if (
!session.blockedDevicesNotified[userId] ||
session.blockedDevicesNotified[userId][deviceId] === undefined
) {
blockedMap[userId] = blockedMap[userId] || [];
blockedMap[userId].push(userBlockedDevices[deviceId]);
}
}
}
const filteredErrorDevices =
await self._olmDevice.filterOutNotifiedErrorDevices(errorDevices);
for (const {userId, deviceInfo} of filteredErrorDevices) {
blockedMap[userId] = blockedMap[userId] || [];
blockedMap[userId].push({
code: "m.no_olm",
reason: WITHHELD_MESSAGES["m.no_olm"],
deviceInfo,
});
}
// notify blocked devices that they're blocked
await self._notifyBlockedDevices(session, blockedMap);
}
await this._notifyBlockedDevices(session, blockedMap);
})(),
]);
};
// helper which returns the session prepared by prepareSession
function returnSession() {
@@ -349,44 +386,29 @@ MegolmEncryption.prototype._prepareNewSession = async function() {
};
/**
* Splits the user device map into multiple chunks to reduce the number of
* devices we encrypt to per API call. Also filters out devices we don't have
* a session with.
* Determines what devices in devicesByUser don't have an olm session as given
* in devicemap.
*
* @private
*
* @param {module:crypto/algorithms/megolm.OutboundSessionInfo} session
* @param {object} devicemap the devices that have olm sessions, as returned by
* olmlib.ensureOlmSessionsForDevices.
* @param {object} devicesByUser a map of user IDs to array of deviceInfo
* @param {array} [noOlmDevices] an array to fill with devices that don't have
* olm sessions
*
* @param {number} chainIndex current chain index
*
* @param {object<userId, deviceId>} devicemap
* mapping from userId to deviceId to {@link module:crypto~OlmSessionResult}
*
* @param {object<string, module:crypto/deviceinfo[]>} devicesByUser
* map from userid to list of devices
*
* @param {array<object>} errorDevices
* array that will be populated with the devices that can't get an
* olm session for
*
* @return {array<object<userid, deviceInfo>>}
* @return {array} an array of devices that don't have olm sessions. If
* noOlmDevices is specified, then noOlmDevices will be returned.
*/
MegolmEncryption.prototype._splitUserDeviceMap = function(
session, chainIndex, devicemap, devicesByUser, errorDevices,
MegolmEncryption.prototype._getDevicesWithoutSessions = function(
devicemap, devicesByUser, noOlmDevices,
) {
const maxUsersPerRequest = 20;
noOlmDevices = noOlmDevices || [];
// use an array where the slices of a content map gets stored
const mapSlices = [];
let currentSliceId = 0; // start inserting in the first slice
let entriesInCurrentSlice = 0;
for (const userId of Object.keys(devicesByUser)) {
const devicesToShareWith = devicesByUser[userId];
for (const [userId, devicesToShareWith] of Object.entries(devicesByUser)) {
const sessionResults = devicemap[userId];
for (let i = 0; i < devicesToShareWith.length; i++) {
const deviceInfo = devicesToShareWith[i];
for (const deviceInfo of devicesToShareWith) {
const deviceId = deviceInfo.deviceId;
const sessionResult = sessionResults[deviceId];
@@ -394,45 +416,17 @@ MegolmEncryption.prototype._splitUserDeviceMap = function(
// no session with this device, probably because there
// were no one-time keys.
// mark this device as "handled" because we don't want to try
// to claim a one-time-key for dead devices on every message.
session.markSharedWithDevice(userId, deviceId, chainIndex);
errorDevices.push({userId, deviceInfo});
noOlmDevices.push({userId, deviceInfo});
delete sessionResults[deviceId];
// ensureOlmSessionsForUsers has already done the logging,
// so just skip it.
continue;
}
logger.log(
"share keys with device " + userId + ":" + deviceId,
);
if (!mapSlices[currentSliceId]) {
mapSlices[currentSliceId] = [];
}
mapSlices[currentSliceId].push({
userId: userId,
deviceInfo: deviceInfo,
});
entriesInCurrentSlice++;
}
// We do this in the per-user loop as we prefer that all messages to the
// same user end up in the same API call to make it easier for the
// server (e.g. only have to send one EDU if a remote user, etc). This
// does mean that if a user has many devices we may go over the desired
// limit, but its not a hard limit so that is fine.
if (entriesInCurrentSlice > maxUsersPerRequest) {
// the current slice is filled up. Start inserting into the next slice
entriesInCurrentSlice = 0;
currentSliceId++;
}
}
return mapSlices;
return noOlmDevices;
};
/**
@@ -445,20 +439,18 @@ MegolmEncryption.prototype._splitUserDeviceMap = function(
*
* @return {array<array<object>>} the blocked devices, split into chunks
*/
MegolmEncryption.prototype._splitBlockedDevices = function(devicesByUser) {
const maxUsersPerRequest = 20;
MegolmEncryption.prototype._splitDevices = function(devicesByUser) {
const maxDevicesPerRequest = 20;
// use an array where the slices of a content map gets stored
let currentSlice = [];
const mapSlices = [currentSlice];
for (const userId of Object.keys(devicesByUser)) {
const userBlockedDevicesToShareWith = devicesByUser[userId];
for (const blockedInfo of userBlockedDevicesToShareWith) {
for (const [userId, userDevices] of Object.entries(devicesByUser)) {
for (const deviceInfo of Object.values(userDevices)) {
currentSlice.push({
userId: userId,
blockedInfo: blockedInfo,
deviceInfo: deviceInfo.device,
});
}
@@ -467,7 +459,7 @@ MegolmEncryption.prototype._splitBlockedDevices = function(devicesByUser) {
// server (e.g. only have to send one EDU if a remote user, etc). This
// does mean that if a user has many devices we may go over the desired
// limit, but its not a hard limit so that is fine.
if (currentSlice.length > maxUsersPerRequest) {
if (currentSlice.length > maxDevicesPerRequest) {
// the current slice is filled up. Start inserting into the next slice
currentSlice = [];
mapSlices.push(currentSlice);
@@ -562,7 +554,7 @@ MegolmEncryption.prototype._sendBlockedNotificationsToDevices = async function(
for (const val of userDeviceMap) {
const userId = val.userId;
const blockedInfo = val.blockedInfo;
const blockedInfo = val.deviceInfo;
const deviceInfo = blockedInfo.deviceInfo;
const deviceId = deviceInfo.deviceId;
@@ -682,37 +674,41 @@ MegolmEncryption.prototype.reshareKeyWithDevice = async function(
};
/**
* @private
*
* @param {module:crypto/algorithms/megolm.OutboundSessionInfo} session
*
* @param {object} key the session key as returned by
* OlmDevice.getOutboundGroupSessionKey
*
* @param {object} payload the base to-device message payload for sharing keys
*
* @param {object<string, module:crypto/deviceinfo[]>} devicesByUser
* map from userid to list of devices
*
* @param {array<object>} errorDevices
* array that will be populated with the devices that we can't get an
* olm session for
*
* @param {Number} [otkTimeout] The timeout in milliseconds when requesting
* one-time keys for establishing new olm sessions.
*/
MegolmEncryption.prototype._shareKeyWithDevices = async function(
session, devicesByUser, errorDevices,
session, key, payload, devicesByUser, errorDevices, otkTimeout,
) {
const key = this._olmDevice.getOutboundGroupSessionKey(session.sessionId);
const payload = {
type: "m.room_key",
content: {
algorithm: olmlib.MEGOLM_ALGORITHM,
room_id: this._roomId,
session_id: session.sessionId,
session_key: key.key,
chain_index: key.chain_index,
},
};
const devicemap = await olmlib.ensureOlmSessionsForDevices(
this._olmDevice, this._baseApis, devicesByUser,
this._olmDevice, this._baseApis, devicesByUser, otkTimeout,
);
const userDeviceMaps = this._splitUserDeviceMap(
session, key.chain_index, devicemap, devicesByUser, errorDevices,
);
this._getDevicesWithoutSessions(devicemap, devicesByUser, errorDevices);
await this._shareKeyWithOlmSessions(session, key, payload, devicemap);
};
MegolmEncryption.prototype._shareKeyWithOlmSessions = async function(
session, key, payload, devicemap,
) {
const userDeviceMaps = this._splitDevices(devicemap);
for (let i = 0; i < userDeviceMaps.length; i++) {
try {
@@ -748,7 +744,7 @@ MegolmEncryption.prototype._notifyBlockedDevices = async function(
sender_key: this._olmDevice.deviceCurve25519Key,
};
const userDeviceMaps = this._splitBlockedDevices(devicesByUser);
const userDeviceMaps = this._splitDevices(devicesByUser);
for (let i = 0; i < userDeviceMaps.length; i++) {
try {
@@ -766,6 +762,37 @@ MegolmEncryption.prototype._notifyBlockedDevices = async function(
}
};
/**
* Perform any background tasks that can be done before a message is ready to
* send, in order to speed up sending of the message.
*
* @param {module:models/room} room the room the event is in
*/
MegolmEncryption.prototype.prepareToEncrypt = function(room) {
logger.log(`Preparing to encrypt events for ${this._roomId}`);
if (this.encryptionPreparation) {
// We're already preparing something, so don't do anything else.
// FIXME: check if we need to restart
return;
}
this.encryptionPreparation = (async () => {
const [devicesInRoom, blocked] = await this._getDevicesInRoom(room);
if (this._crypto.getGlobalErrorOnUnknownDevices()) {
// Drop unknown devices for now. When the message gets sent, we'll
// throw an error, but we'll still be prepared to send to the known
// devices.
this._removeUnknownDevices(devicesInRoom);
}
await this._ensureOutboundSession(devicesInRoom, blocked);
delete this.encryptionPreparation;
})();
};
/**
* @inheritdoc
*
@@ -776,38 +803,47 @@ MegolmEncryption.prototype._notifyBlockedDevices = async function(
* @return {Promise} Promise which resolves to the new event body
*/
MegolmEncryption.prototype.encryptMessage = async function(room, eventType, content) {
const self = this;
logger.log(`Starting to encrypt event for ${this._roomId}`);
if (this.encryptionPreparation) {
// If we started sending keys, wait for it to be done.
// FIXME: check if we need to restart
try {
await this.encryptionPreparation;
} catch (e) {
// ignore any errors -- if the preparation failed, we'll just
// restart everything here
}
}
const [devicesInRoom, blocked] = await this._getDevicesInRoom(room);
// check if any of these devices are not yet known to the user.
// if so, warn the user so they can verify or ignore.
if (this._crypto.getGlobalErrorOnUnknownDevices()) {
self._checkForUnknownDevices(devicesInRoom);
this._checkForUnknownDevices(devicesInRoom);
}
const session = await self._ensureOutboundSession(devicesInRoom, blocked);
const session = await this._ensureOutboundSession(devicesInRoom, blocked);
const payloadJson = {
room_id: self._roomId,
room_id: this._roomId,
type: eventType,
content: content,
};
const ciphertext = self._olmDevice.encryptGroupMessage(
const ciphertext = this._olmDevice.encryptGroupMessage(
session.sessionId, JSON.stringify(payloadJson),
);
const encryptedContent = {
algorithm: olmlib.MEGOLM_ALGORITHM,
sender_key: self._olmDevice.deviceCurve25519Key,
sender_key: this._olmDevice.deviceCurve25519Key,
ciphertext: ciphertext,
session_id: session.sessionId,
// Include our device ID so that recipients can send us a
// m.new_device message if they don't have our session key.
// XXX: Do we still need this now that m.new_device messages
// no longer exist since #483?
device_id: self._deviceId,
device_id: this._deviceId,
};
session.useCount++;
@@ -855,6 +891,27 @@ MegolmEncryption.prototype._checkForUnknownDevices = function(devicesInRoom) {
}
};
/**
* Remove unknown devices from a set of devices. The devicesInRoom parameter
* will be modified.
*
* @param {Object} devicesInRoom userId -> {deviceId -> object}
* devices we should shared the session with.
*/
MegolmEncryption.prototype._removeUnknownDevices = function(devicesInRoom) {
for (const [userId, userDevices] of Object.entries(devicesInRoom)) {
for (const [deviceId, device] of Object.entries(userDevices)) {
if (device.isUnverified() && !device.isKnown()) {
delete userDevices[deviceId];
}
}
if (Object.keys(userDevices).length === 0) {
delete devicesInRoom[userId];
}
}
};
/**
* Get the list of unblocked devices for all users in the room
*