mirror of
https://github.com/certbot/certbot.git
synced 2026-01-26 07:41:33 +03:00
use M2Crypto in CSR verify/sign/encrypt
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
|
||||
import subprocess, tempfile, re
|
||||
import M2Crypto
|
||||
import hashlib
|
||||
# we can use tempfile.NamedTemporaryFile() to get tempfiles
|
||||
# to pass to OpenSSL subprocesses.
|
||||
|
||||
@@ -128,38 +129,62 @@ def can_sign(name):
|
||||
if name in ["google.com", "www.google.com"]: return False
|
||||
return True
|
||||
|
||||
def verify(key, data):
|
||||
"""What string was validly signed by this public key? (or None)"""
|
||||
# Note: Only relatively short strings will work, so we normally
|
||||
# sign a hash of the signed data rather than signing the signed
|
||||
# data directly.
|
||||
with tempfile.NamedTemporaryFile() as tmp:
|
||||
tmp.write(key)
|
||||
tmp.flush()
|
||||
out, err = subprocess.Popen(["openssl", "rsautl", "-pubin", "-inkey", tmp.name, "-verify"],shell=False,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate(data)
|
||||
if out and not err:
|
||||
return out
|
||||
return None
|
||||
def verify(key, data, signature):
|
||||
"""
|
||||
Given a public key, some data, and its signature,
|
||||
verify the signature.
|
||||
|
||||
@type key: str
|
||||
@param key: PEM-encoded string of the public key.
|
||||
|
||||
@type data: str
|
||||
@param data: The data (before being hashed; we will use sha256 here)
|
||||
|
||||
@type signature: str
|
||||
@param signature: binary string of the signature
|
||||
|
||||
@return: True if the signature checks out, False otherwise.
|
||||
"""
|
||||
bio = M2Crypto.BIO.MemoryBuffer(key)
|
||||
pubkey = M2Crypto.RSA.load_pub_key_bio(bio)
|
||||
try:
|
||||
res = pubkey.verify(hashlib.sha256(data).digest(), signature, 'sha256')
|
||||
except M2Crypto.RSA.RSAError:
|
||||
return False
|
||||
return (res == 1)
|
||||
|
||||
def sign(key, data):
|
||||
"""Sign this data with this private key. For client-side use."""
|
||||
with tempfile.NamedTemporaryFile() as tmp:
|
||||
tmp.write(key)
|
||||
tmp.flush()
|
||||
out, err = subprocess.Popen(["openssl", "rsautl", "-inkey", tmp.name, "-sign"],shell=False,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate(data)
|
||||
if out and not err:
|
||||
return out
|
||||
return None
|
||||
"""
|
||||
Sign this data with this private key. For client-side use.
|
||||
|
||||
@type key: str
|
||||
@param key: PEM-encoded string of the private key.
|
||||
|
||||
@type data: str
|
||||
@param data: The data to be signed. Will be hashed (sha256) prior to
|
||||
signing.
|
||||
|
||||
@return: binary string of the signature
|
||||
"""
|
||||
privkey = M2Crypto.RSA.load_key_string(key)
|
||||
return privkey.sign(hashlib.sha256(data).digest(), 'sha256')
|
||||
|
||||
def encrypt(key, data):
|
||||
"""Encrypt this data with this public key."""
|
||||
with tempfile.NamedTemporaryFile() as tmp:
|
||||
tmp.write(key)
|
||||
tmp.flush()
|
||||
out, err = subprocess.Popen(["openssl", "rsautl", "-pubin", "-inkey", tmp.name, "-encrypt"],shell=False,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate(data)
|
||||
if out and not err:
|
||||
return out
|
||||
return None
|
||||
"""
|
||||
Encrypt this data with this public key.
|
||||
|
||||
@type key: str
|
||||
@param key: PEM-encoded string of the public key
|
||||
|
||||
@type data: str
|
||||
@param data: The data to be encrypted.
|
||||
|
||||
@return: binary string of the encrypted value, using PKCS1_OAEP_PADDING
|
||||
"""
|
||||
bio = M2Crypto.BIO.MemoryBuffer(key)
|
||||
pubkey = M2Crypto.RSA.load_pub_key_bio(bio)
|
||||
return pubkey.public_encrypt(data, M2Crypto.RSA.pkcs1_oaep_padding)
|
||||
|
||||
|
||||
def issue(csr):
|
||||
"""Issue the certificate requested by this CSR and return it!"""
|
||||
|
||||
@@ -223,7 +223,8 @@ class session(object):
|
||||
if not CSR.parse(csr):
|
||||
self.die(r, r.BadCSR)
|
||||
return
|
||||
if CSR.verify(CSR.pubkey(csr), sig) != sha256("(%d) (%s) (%s)" % (timestamp, recipient, csr)):
|
||||
digest_data = "(%d) (%s) (%s)" % (timestamp, recipient, csr)
|
||||
if CSR.verify(CSR.pubkey(csr), digest_data, sig) == False:
|
||||
self.die(r, r.BadSignature)
|
||||
return
|
||||
if not CSR.csr_goodkey(csr):
|
||||
|
||||
Reference in New Issue
Block a user