You've already forked matrix-js-sdk
mirror of
https://github.com/matrix-org/matrix-js-sdk.git
synced 2025-11-25 05:23:13 +03:00
Avoid deadlocks when ensuring Olm sessions for devices
This reworks tracking the Olm sessions a particular task is updating to avoid deadlocks. By ensuring we synchronously mark all sessions a task cares about as in progress from the start, we know that no other tasks will own updating a session in common, which avoids deadlocks across multiple tasks that might be working on a shared set of devices. Fixes https://github.com/vector-im/element-web/issues/16194
This commit is contained in:
@@ -190,5 +190,91 @@ describe("OlmDevice", function() {
|
|||||||
// new session and will have called claimOneTimeKeys
|
// new session and will have called claimOneTimeKeys
|
||||||
expect(count).toBe(2);
|
expect(count).toBe(2);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("avoids deadlocks when two tasks are ensuring the same devices", async function() {
|
||||||
|
// This test checks whether `ensureOlmSessionsForDevices` properly
|
||||||
|
// handles multiple tasks in flight ensuring some set of devices in
|
||||||
|
// common without deadlocks.
|
||||||
|
|
||||||
|
let claimRequestCount = 0;
|
||||||
|
const baseApis = {
|
||||||
|
claimOneTimeKeys: () => {
|
||||||
|
// simulate a very slow server (.5 seconds to respond)
|
||||||
|
claimRequestCount++;
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
setTimeout(reject, 500);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const deviceBobA = DeviceInfo.fromStorage({
|
||||||
|
keys: {
|
||||||
|
"curve25519:BOB-A": "akey",
|
||||||
|
},
|
||||||
|
}, "BOB-A");
|
||||||
|
const deviceBobB = DeviceInfo.fromStorage({
|
||||||
|
keys: {
|
||||||
|
"curve25519:BOB-B": "bkey",
|
||||||
|
},
|
||||||
|
}, "BOB-B");
|
||||||
|
|
||||||
|
// There's no required ordering of devices per user, so here we
|
||||||
|
// create two different orderings so that each task reserves a
|
||||||
|
// device the other task needs before continuing.
|
||||||
|
const devicesByUserAB = {
|
||||||
|
"@bob:example.com": [
|
||||||
|
deviceBobA,
|
||||||
|
deviceBobB,
|
||||||
|
],
|
||||||
|
};
|
||||||
|
const devicesByUserBA = {
|
||||||
|
"@bob:example.com": [
|
||||||
|
deviceBobB,
|
||||||
|
deviceBobA,
|
||||||
|
],
|
||||||
|
};
|
||||||
|
|
||||||
|
function alwaysSucceed(promise) {
|
||||||
|
// swallow any exception thrown by a promise, so that
|
||||||
|
// Promise.all doesn't abort
|
||||||
|
return promise.catch(() => {});
|
||||||
|
}
|
||||||
|
|
||||||
|
const task1 = alwaysSucceed(olmlib.ensureOlmSessionsForDevices(
|
||||||
|
aliceOlmDevice, baseApis, devicesByUserAB,
|
||||||
|
));
|
||||||
|
|
||||||
|
// After a single tick through the first task, it should have
|
||||||
|
// claimed ownership of all devices to avoid deadlocking others.
|
||||||
|
expect(Object.keys(aliceOlmDevice._sessionsInProgress).length).toBe(2);
|
||||||
|
|
||||||
|
const task2 = alwaysSucceed(olmlib.ensureOlmSessionsForDevices(
|
||||||
|
aliceOlmDevice, baseApis, devicesByUserBA,
|
||||||
|
));
|
||||||
|
|
||||||
|
// The second task should not have changed the ownership count, as
|
||||||
|
// it's waiting on the first task.
|
||||||
|
expect(Object.keys(aliceOlmDevice._sessionsInProgress).length).toBe(2);
|
||||||
|
|
||||||
|
// Track the tasks, but don't await them yet.
|
||||||
|
const promises = Promise.all([
|
||||||
|
task1,
|
||||||
|
task2,
|
||||||
|
]);
|
||||||
|
|
||||||
|
await new Promise((resolve) => {
|
||||||
|
setTimeout(resolve, 200);
|
||||||
|
});
|
||||||
|
|
||||||
|
// After .2s, the first task should have made an initial claim request.
|
||||||
|
expect(claimRequestCount).toBe(1);
|
||||||
|
|
||||||
|
await promises;
|
||||||
|
|
||||||
|
// After waiting for both tasks to complete, the first task should
|
||||||
|
// have failed, so the second task should have tried to create a
|
||||||
|
// new session and will have called claimOneTimeKeys
|
||||||
|
expect(claimRequestCount).toBe(2);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -208,6 +208,48 @@ export async function ensureOlmSessionsForDevices(
|
|||||||
const result = {};
|
const result = {};
|
||||||
const resolveSession = {};
|
const resolveSession = {};
|
||||||
|
|
||||||
|
// Mark all sessions this task intends to update as in progress. It is
|
||||||
|
// important to do this for all devices this task cares about in a single
|
||||||
|
// synchronous operation, as otherwise it is possible to have deadlocks
|
||||||
|
// where multiple tasks wait indefinitely on another task to update some set
|
||||||
|
// of common devices.
|
||||||
|
for (const [userId, devices] of Object.entries(devicesByUser)) {
|
||||||
|
for (const deviceInfo of devices) {
|
||||||
|
const deviceId = deviceInfo.deviceId;
|
||||||
|
const key = deviceInfo.getIdentityKey();
|
||||||
|
|
||||||
|
if (key === olmDevice.deviceCurve25519Key) {
|
||||||
|
// We don't start sessions with ourself, so there's no need to
|
||||||
|
// mark it in progress.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const forWhom = `for ${key} (${userId}:${deviceId})`;
|
||||||
|
if (!olmDevice._sessionsInProgress[key]) {
|
||||||
|
// pre-emptively mark the session as in-progress to avoid race
|
||||||
|
// conditions. If we find that we already have a session, then
|
||||||
|
// we'll resolve
|
||||||
|
log.debug(`Marking Olm session in progress ${forWhom}`);
|
||||||
|
olmDevice._sessionsInProgress[key] = new Promise(
|
||||||
|
(resolve, reject) => {
|
||||||
|
resolveSession[key] = {
|
||||||
|
resolve: (...args) => {
|
||||||
|
log.debug(`Resolved Olm session in progress ${forWhom}`);
|
||||||
|
delete olmDevice._sessionsInProgress[key];
|
||||||
|
resolve(...args);
|
||||||
|
},
|
||||||
|
reject: (...args) => {
|
||||||
|
log.debug(`Rejected Olm session in progress ${forWhom}`);
|
||||||
|
delete olmDevice._sessionsInProgress[key];
|
||||||
|
reject(...args);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (const [userId, devices] of Object.entries(devicesByUser)) {
|
for (const [userId, devices] of Object.entries(devicesByUser)) {
|
||||||
result[userId] = {};
|
result[userId] = {};
|
||||||
for (const deviceInfo of devices) {
|
for (const deviceInfo of devices) {
|
||||||
@@ -234,28 +276,6 @@ export async function ensureOlmSessionsForDevices(
|
|||||||
|
|
||||||
const forWhom = `for ${key} (${userId}:${deviceId})`;
|
const forWhom = `for ${key} (${userId}:${deviceId})`;
|
||||||
log.debug(`Ensuring Olm session ${forWhom}`);
|
log.debug(`Ensuring Olm session ${forWhom}`);
|
||||||
if (!olmDevice._sessionsInProgress[key]) {
|
|
||||||
// pre-emptively mark the session as in-progress to avoid race
|
|
||||||
// conditions. If we find that we already have a session, then
|
|
||||||
// we'll resolve
|
|
||||||
log.debug(`Marking Olm session in progress ${forWhom}`);
|
|
||||||
olmDevice._sessionsInProgress[key] = new Promise(
|
|
||||||
(resolve, reject) => {
|
|
||||||
resolveSession[key] = {
|
|
||||||
resolve: (...args) => {
|
|
||||||
log.debug(`Resolved Olm session in progress ${forWhom}`);
|
|
||||||
delete olmDevice._sessionsInProgress[key];
|
|
||||||
resolve(...args);
|
|
||||||
},
|
|
||||||
reject: (...args) => {
|
|
||||||
log.debug(`Rejected Olm session in progress ${forWhom}`);
|
|
||||||
delete olmDevice._sessionsInProgress[key];
|
|
||||||
reject(...args);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
const sessionId = await olmDevice.getSessionIdForDevice(
|
const sessionId = await olmDevice.getSessionIdForDevice(
|
||||||
key, resolveSession[key], log,
|
key, resolveSession[key], log,
|
||||||
);
|
);
|
||||||
|
|||||||
Reference in New Issue
Block a user