1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-26 07:41:33 +03:00

Move abbreviated DVSNI code into a separate branch

This commit is contained in:
Seth Schoen
2015-02-09 09:35:37 -08:00
parent b324f9d912
commit 1c6865c329
2 changed files with 0 additions and 337 deletions

View File

@@ -17,217 +17,11 @@ import sys
import signal
import time
import socket
import struct
import Crypto.Random
import M2Crypto.X509
import OpenSSL.crypto
import OpenSSL.SSL
def unpack_2bytes(two_bytes):
"""Interpret a two-byte string as an integer. E.g. 't_' -> 29791."""
assert len(two_bytes) == 2
return struct.unpack(">H", two_bytes)[0]
def unpack_3bytes(three_bytes):
"""Interpret a three-byte string as an integer. E.g. '0M~' -> 3165566."""
assert len(three_bytes) == 3
return struct.unpack(">I", chr(0) + three_bytes)[0]
def pack_2bytes(value):
"""Interpret an integer less than 65536 as a two-byte string. E.g.
29791 -> 't_'."""
assert value < 65536
return struct.pack(">H", value)
def pack_3bytes(value):
"""Interpret an integer less than 16777216 as a three-byte string.
E.g. '0M~' -> 3165566."""
assert value < 16777216
return struct.pack(">I", value)[1:]
# Exclude this function from coverage testing because it is currently
# not used.
def tls_parse_client_hello(tls_record): # pragma: no cover
# pylint: disable=too-many-return-statements,too-many-locals,bad-builtin
# pylint: disable=too-many-branches
"""If possible, parse the specified TLS record as a ClientHello and
return the first host_name indicated in a Server Name Indication
extension within that ClientHello. If the TLS record could not
be parsed or there is no such extension or host_name present,
return None.
:param str tls_record: The TLS record to be parsed (which is assumed
to contain a single ClientHello handshake message)."""
# TLS handshake?
if tls_record[0] != chr(0x16):
return None
# TLS version
tls_version = tls_record[1:3]
if map(ord, tls_version) not in [[0x03, 0x01], [0x03, 0x02], [0x03, 0x03]]:
return None
# TLS record length
tls_record_len = unpack_2bytes(tls_record[3:5])
if len(tls_record) < tls_record_len:
return None
# Handshake type, length, and version
handshake_type = tls_record[5]
if handshake_type != chr(0x01):
return None
handshake_len = unpack_3bytes(tls_record[6:9])
handshake_version = tls_record[9:11]
handshake = tls_record[11:]
# Handshake length includes handshake_version (2 bytes)
if len(handshake) + 2 < handshake_len:
return None
if map(ord, handshake_version) not in [[0x03, 0x01], [0x03, 0x02],
[0x03, 0x03]]:
return None
# Random
unused_random = handshake[0:32]
# Session ID
session_id_length = ord(handshake[32])
i = 33
i += session_id_length
# Ciphersuites
ciphersuites_length = unpack_2bytes(handshake[i:i+2])
if ciphersuites_length >= 2:
best_ciphersuite = handshake[i+2:i+4]
else:
best_ciphersuite = chr(0) + chr(0)
i += 2
i += ciphersuites_length
# Compression methods
compression_length = ord(handshake[i])
i += 1
i += compression_length
# ClientHello extensions
extensions_length = unpack_2bytes(handshake[i:i+2])
i += 2
if extensions_length < 10:
# Minimum size of a 1-byte SNI hostname extension
return None
while i < len(handshake):
# XXX If stated extension lengths are wrong or inconsistent or
# XXX if the packet has been truncated in the middle of an
# XXX extension, this may crash or hang! This needs to be updated
# XXX to fail cleanly when confronted with inconsistent extension
# XXX fields.
extension_type = handshake[i:i+2]
if extension_type == "\0\0":
# SNI
extension_length = unpack_2bytes(handshake[i+2:i+4])
i += 4
unused_server_name_list_length = unpack_2bytes(handshake[i:i+2])
first_sn_type = handshake[i+2]
if first_sn_type != "\0":
# SNI extension referenced something other than a
# hostname
return None
first_sn_length = unpack_2bytes(handshake[i+3:i+5])
first_sn = handshake[i+5:i+5+first_sn_length]
return best_ciphersuite, first_sn
else:
# Other than SNI
extension_length = unpack_2bytes(handshake[i+2:i+4])
i += 4
i += extension_length
continue
return None
def tls_generate_server_hello(ciphersuite):
"""Generate a TLS 1.2 ServerHello message.
:param ciphersuite str: The ciphersuite that the ServerHello will
claim to have selected (two bytes)."""
# Handshake type: ServerHello (0x02)
server_hello = chr(0x02)
# ServerHello length (38 bytes based on below)
server_hello += chr(0x0) + chr(0x0) + chr(38)
# TLS version (0x0303)
server_hello += chr(0x03) + chr(0x03)
# Server Random
server_hello += Crypto.Random.new().read(32)
# Session ID length (0)
server_hello += chr(0x0)
# Ciphersuite
server_hello += ciphersuite
# Compression method (null)
server_hello += chr(0x0)
# Extension length (2 bytes) + extensions go here if any extensions
# are required, BUT if no extensions are present then the extensions
# and extension length field are both omitted entirely (rather than
# declaring extension length 0x0000) - see RFC 5246 p. 42.
# TLS handshake
tls_record = chr(0x16)
# TLS version
tls_record += chr(0x03) + chr(0x03)
# TLS record length
assert len(server_hello) < 256
tls_record += chr(0) + chr(len(server_hello))
# Append server hello handshake
tls_record += server_hello
return tls_record
def tls_generate_cert_msg(cert_pem):
"""Generate a TLS 1.2 Certificate handshake message containing a
single certificate.
:param str cert_pem: The certificate to be include in the message (in
PEM format)."""
cert_as_der = M2Crypto.X509.load_cert_string(cert_pem).as_der()
# Handshake type: Certificate (0x0b)
cert_msg = chr(0x0b)
cert_msg_length = len(cert_as_der) + 6
cert_msg += pack_3bytes(cert_msg_length)
certs_length = len(cert_as_der) + 3
cert_msg += pack_3bytes(certs_length)
cert_length = len(cert_as_der)
cert_msg += pack_3bytes(cert_length)
cert_msg += cert_as_der
# TLS handshake
tls_record = chr(0x16)
# TLS version
tls_record += chr(0x03) + chr(0x03)
# TLS record length
assert len(cert_msg) < 65536
tls_record += pack_2bytes(len(cert_msg))
# Append certificate handshake
tls_record += cert_msg
return tls_record
def tls_generate_server_hello_done():
"""Generate a TLS 1.2 ServerHelloDone message."""
return "16030300040e000000".decode("hex")
class StandaloneAuthenticator(object):
# pylint: disable=too-many-instance-attributes
"""The StandaloneAuthenticator class itself, which can be invoked
@@ -391,31 +185,6 @@ class StandaloneAuthenticator(object):
self.ssl_conn.shutdown()
self.ssl_conn.close()
# The code below uses the minimal pure Python implementation
# of TLS ClientHello, ServerHello, and Certificate messages
# (as an alternative to a full TLS implementation). It will
# not reach Finished state with a compliant TLS implementation.
#
# client_hello = self.connection.recv(65536)
# result = tls_parse_client_hello(client_hello)
# if result is None:
# print "No SNI found in ClientHello, dropping connection"
# self.connection.close()
# continue
# ciphersuite, sni = result
# if sni in self.tasks:
# pem_cert = self.tasks[sni]
# else:
# # We don't know which cert to send!
# print "Unexpected SNI value", sni
# # Choose the "first" cert and send it (but maybe we
# # should just disconnect instead?)
# pem_cert = self.tasks.values()[0]
# self.connection.send(tls_generate_server_hello(ciphersuite))
# self.connection.send(tls_generate_cert_msg(pem_cert))
# self.connection.send(tls_generate_server_hello_done())
# self.connection.close()
def start_listener(self, port, key):
"""Create a child process which will start a TCP listener on the
specified port to perform the specified DVSNI challenges.

View File

@@ -38,112 +38,6 @@ class CallableExhausted(Exception):
pass
class PackAndUnpackTests(unittest.TestCase):
"""Tests for byte packing and unpacking routines used for TLS
parsing."""
def test_pack_and_unpack_bytes(self):
from letsencrypt.client.standalone_authenticator import \
unpack_2bytes, unpack_3bytes, pack_2bytes, pack_3bytes
self.assertEqual(unpack_2bytes("JZ"), 19034)
self.assertEqual(unpack_2bytes(chr(0)*2), 0)
self.assertEqual(unpack_2bytes(chr(255)*2), 65535)
self.assertEqual(unpack_3bytes("abc"), 6382179)
self.assertEqual(unpack_3bytes(chr(0)*3), 0)
self.assertEqual(unpack_3bytes(chr(255)*3), 16777215)
self.assertEqual(pack_2bytes(12), chr(0) + chr(12))
self.assertEqual(pack_2bytes(1729), chr(6) + chr(193))
self.assertEqual(pack_3bytes(0), chr(0)*3)
self.assertEqual(pack_3bytes(12345678), chr(0xbc) + "aN")
def test_invalid_pack_and_unpack(self):
from letsencrypt.client.standalone_authenticator import \
unpack_2bytes, unpack_3bytes, pack_2bytes, pack_3bytes
self.assertRaises(AssertionError, pack_2bytes, 65537)
self.assertRaises(AssertionError, pack_3bytes, 500000000)
self.assertRaises(AssertionError, unpack_2bytes, "foo")
self.assertRaises(AssertionError, unpack_3bytes, "food")
class TLSParseClientHelloTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
"""Test for tls_parse_client_hello() function."""
def test_tls_parse_client_hello(self):
from letsencrypt.client.standalone_authenticator import \
tls_parse_client_hello
client_hello = "16030100c4010000c003030cfef9971eda442c60cbb6c397" \
"7957a81a8ada317e800b7867a8c61f71c40cab000020c02b" \
"c02fc00ac009c013c014c007c011003300320039002f0035" \
"000a000500040100007700000010000e00000b7777772e65" \
"66662e6f7267ff01000100000a0008000600170018001900" \
"0b00020100002300003374000000100021001f0568322d31" \
"3408737064792f332e3106737064792f3308687474702f31" \
"2e31000500050100000000000d0012001004010501020104" \
"030503020304020202".decode("hex")
return_value = tls_parse_client_hello(client_hello)
self.assertEqual(return_value, (chr(0xc0) + chr(0x2b), "www.eff.org"))
# TODO: The failure cases are extremely numerous and require
# constructing TLS ClientHello messages that are individually
# defective or surprising in distinct ways. (Each invalid TLS
# record is invalid in its own way.)
class TLSGenerateServerHelloTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
"""Tests for tls_generate_server_hello() function."""
def test_tls_generate_server_hello(self):
from letsencrypt.client.standalone_authenticator import \
tls_generate_server_hello
server_hello = tls_generate_server_hello("Q!")
self.assertEqual(server_hello[:11].encode("hex"),
'160303002a020000260303')
self.assertEqual(server_hello[43:], chr(0) + 'Q!' + chr(0))
class TLSGenerateCertMsgTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
"""Tests for tls_generate_cert_msg() function."""
def test_tls_generate_cert_msg(self):
from letsencrypt.client.standalone_authenticator import \
tls_generate_cert_msg
cert = pkg_resources.resource_string(__name__,
'testdata/cert.pem')
cert_msg = tls_generate_cert_msg(cert)
self.assertEqual(cert_msg.encode("hex"),
"16030301ec0b0001e80001e50001e2308201de30820188a003"
"02010202020539300d06092a864886f70d01010b0500307731"
"0b30090603550406130255533111300f06035504080c084d69"
"63686967616e3112301006035504070c09416e6e204172626f"
"72312b3029060355040a0c22556e6976657273697479206f66"
"204d6963686967616e20616e64207468652045464631143012"
"06035504030c0b6578616d706c652e636f6d301e170d313431"
"3231313232333434355a170d3134313231383232333434355a"
"3077310b30090603550406130255533111300f06035504080c"
"084d6963686967616e3112301006035504070c09416e6e2041"
"72626f72312b3029060355040a0c22556e6976657273697479"
"206f66204d6963686967616e20616e64207468652045464631"
"14301206035504030c0b6578616d706c652e636f6d305c300d"
"06092a864886f70d0101010500034b003048024100ac7573b4"
"51ed1fddae705243fcdfc75bd02c751b14b875010410e51f03"
"6545dddfa79f34aefdbee90584df471681d9894bce8e6d1cfa"
"9544e8af84744fedc2e50203010001300d06092a864886f70d"
"01010b05000341002db8cf421dc0854a4a59ed92c965bebeb3"
"25ea411f97cc9dd7e4dd7269d748d3e9513ed7828db63874d9"
"ae7a1a8ada02f2404f9fc7ebb13c1af27fa1c36707fa")
class TLSServerHelloDoneTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
"""Tests for tls_generate_server_hello_done() function."""
def test_tls_generate_server_hello_done(self):
from letsencrypt.client.standalone_authenticator import \
tls_generate_server_hello_done
self.assertEqual(tls_generate_server_hello_done().encode("hex"), \
"16030300040e000000")
class ChallPrefTest(unittest.TestCase):
"""Tests for chall_pref() method."""
def setUp(self):