1
0
mirror of https://github.com/matrix-org/matrix-js-sdk.git synced 2025-08-06 12:02:40 +03:00

Refactor key backup recovery to prepare for rust (#3708)

* Refactor key backup recovery to prepare for rust

* code review

* quick doc format

* code review fix
This commit is contained in:
Valere
2023-09-12 13:19:35 +02:00
committed by GitHub
parent f963ca5562
commit c7827d971c
8 changed files with 398 additions and 61 deletions

View File

@@ -26,10 +26,15 @@ python -m venv env
import base64
import json
import base58
from canonicaljson import encode_canonical_json
from cryptography.hazmat.primitives.asymmetric import ed25519, x25519
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.hazmat.primitives import hashes, padding, hmac
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from random import randbytes, seed
ALICE_DATA = {
@@ -77,6 +82,7 @@ def main() -> None:
import {{ IDeviceKeys, IMegolmSessionData }} from "../../../src/@types/crypto";
import {{ IDownloadKeyResult }} from "../../../src";
import {{ KeyBackupInfo }} from "../../../src/crypto-api";
import {{ IKeyBackupSession }} from "../../../src/crypto/keybackup";
/* eslint-disable comma-dangle */
@@ -186,6 +192,10 @@ def build_test_data(user_data, prefix = "") -> str:
}
}
backed_up_room_key = encrypt_megolm_key_for_backup(additional_exported_room_key, backup_decryption_key.public_key())
backup_recovery_key = export_recovery_key(user_data["B64_BACKUP_DECRYPTION_KEY"])
return f"""\
export const {prefix}TEST_USER_ID = "{user_data['TEST_USER_ID']}";
export const {prefix}TEST_DEVICE_ID = "{user_data['TEST_DEVICE_ID']}";
@@ -220,9 +230,15 @@ export const {prefix}SIGNED_CROSS_SIGNING_KEYS_DATA: Partial<IDownloadKeyResult>
json.dumps(build_cross_signing_keys_data(user_data), indent=4)
};
/** Signed OTKs, returned by `POST /keys/claim` */
export const {prefix}ONE_TIME_KEYS = { json.dumps(otks, indent=4) };
/** base64-encoded backup decryption (private) key */
export const {prefix}BACKUP_DECRYPTION_KEY_BASE64 = "{ user_data['B64_BACKUP_DECRYPTION_KEY'] }";
/** Backup decryption key in export format */
export const {prefix}BACKUP_DECRYPTION_KEY_BASE58 = "{ backup_recovery_key }";
/** Signed backup data, suitable for return from `GET /_matrix/client/v3/room_keys/keys/{{roomId}}/{{sessionId}}` */
export const {prefix}SIGNED_BACKUP_DATA: KeyBackupInfo = { json.dumps(backup_data, indent=4) };
@@ -236,8 +252,8 @@ export const {prefix}MEGOLM_SESSION_DATA: IMegolmSessionData = {
json.dumps(additional_exported_room_key, indent=4)
};
/** Signed OTKs, returned by `POST /keys/claim` */
export const {prefix}ONE_TIME_KEYS = { json.dumps(otks, indent=4) };
/** The key from {prefix}MEGOLM_SESSION_DATA, encrypted for backup using `m.megolm_backup.v1.curve25519-aes-sha2` algorithm*/
export const {prefix}CURVE25519_KEY_BACKUP_DATA: IKeyBackupSession = {json.dumps(backed_up_room_key, indent=4)};
"""
@@ -367,6 +383,97 @@ def build_exported_megolm_key() -> dict:
return megolm_export
def encrypt_megolm_key_for_backup(session_data: dict, backup_public_key: x25519.X25519PublicKey) -> dict:
"""
Encrypts an exported megolm key for key backup, using the m.megolm_backup.v1.curve25519-aes-sha2 algorithm.
"""
data = encode_canonical_json(session_data)
# Generate an ephemeral curve25519 key, and perform an ECDH with the ephemeral key
# and the backups public key to generate a shared secret.
# The public half of the ephemeral key, encoded using unpadded base64,
# becomes the ephemeral property of the session_data.
ephemeral_keypair = x25519.X25519PrivateKey.from_private_bytes(randbytes(32))
shared_secret = ephemeral_keypair.exchange(backup_public_key)
ephemeral = encode_base64(ephemeral_keypair.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw))
# Using the shared secret, generate 80 bytes by performing an HKDF using SHA-256 as the hash,
# with a salt of 32 bytes of 0, and with the empty string as the info.
# The first 32 bytes are used as the AES key, the next 32 bytes are used as the MAC key,
# and the last 16 bytes are used as the AES initialization vector.
salt = bytes(32)
info = b""
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=80,
salt=salt,
info=info,
)
raw_key = hkdf.derive(shared_secret)
aes_key = raw_key[:32]
mac = raw_key[32:64]
iv = raw_key[64:80]
# Stringify the JSON object, and encrypt it using AES-CBC-256 with PKCS#7 padding.
# This encrypted data, encoded using unpadded base64, becomes the ciphertext property of the session_data.
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv))
encryptor = cipher.encryptor()
padder = padding.PKCS7(128).padder()
padded_data = padder.update(data) + padder.finalize()
ct = encryptor.update(padded_data) + encryptor.finalize()
cipher_text = encode_base64(ct)
# Pass the raw encrypted data (prior to base64 encoding) through HMAC-SHA-256 using the MAC key generated above.
# The first 8 bytes of the resulting MAC are base64-encoded, and become the mac property of the session_data.
h = hmac.HMAC(mac, hashes.SHA256())
# h.update(ct)
signature = h.finalize()
mac = encode_base64(signature[:8])
encrypted_key = {
"first_message_index": 1,
"forwarded_count": 0,
"is_verified": False,
"session_data": {
"ciphertext": cipher_text,
"ephemeral": ephemeral,
"mac": mac
}
}
return encrypted_key
def export_recovery_key(key_b64: str) -> str:
"""
Export a private recovery key as a recovery key that can be presented to users.
As per spec https://spec.matrix.org/v1.8/client-server-api/#recovery-key
"""
private_key_bytes = base64.b64decode(key_b64)
# The 256-bit curve25519 private key is prepended by the bytes 0x8B and 0x01
export_bytes = bytearray()
export_bytes += b'\x8b'
export_bytes += b'\x01'
export_bytes += private_key_bytes
# All the bytes in the string above, including the two header bytes,
# are XORed together to form a parity byte. This parity byte is appended to the byte string.
parity_byte = 0 #b'\x8b' ^ b'\x01'
[parity_byte := parity_byte ^ x for x in export_bytes]
export_bytes += parity_byte.to_bytes(1, 'big')
# The byte string is encoded using base58
recovery_key = base58.b58encode(export_bytes).decode('utf-8')
split = [recovery_key[i:i + 4] for i in range(0, len(recovery_key), 4)]
return ' '.join(split)
if __name__ == "__main__":
main()