From 1c6865c329da178285997e4b56cd9f9ed9ebfa34 Mon Sep 17 00:00:00 2001 From: Seth Schoen Date: Mon, 9 Feb 2015 09:35:37 -0800 Subject: [PATCH] Move abbreviated DVSNI code into a separate branch --- .../client/standalone_authenticator.py | 231 ------------------ .../tests/standalone_authenticator_test.py | 106 -------- 2 files changed, 337 deletions(-) diff --git a/letsencrypt/client/standalone_authenticator.py b/letsencrypt/client/standalone_authenticator.py index 0a64b7fe5..83ffd22c4 100644 --- a/letsencrypt/client/standalone_authenticator.py +++ b/letsencrypt/client/standalone_authenticator.py @@ -17,217 +17,11 @@ import sys import signal import time import socket -import struct import Crypto.Random -import M2Crypto.X509 import OpenSSL.crypto import OpenSSL.SSL -def unpack_2bytes(two_bytes): - """Interpret a two-byte string as an integer. E.g. 't_' -> 29791.""" - assert len(two_bytes) == 2 - return struct.unpack(">H", two_bytes)[0] - - -def unpack_3bytes(three_bytes): - """Interpret a three-byte string as an integer. E.g. '0M~' -> 3165566.""" - assert len(three_bytes) == 3 - return struct.unpack(">I", chr(0) + three_bytes)[0] - - -def pack_2bytes(value): - """Interpret an integer less than 65536 as a two-byte string. E.g. - 29791 -> 't_'.""" - assert value < 65536 - return struct.pack(">H", value) - - -def pack_3bytes(value): - """Interpret an integer less than 16777216 as a three-byte string. - E.g. '0M~' -> 3165566.""" - assert value < 16777216 - return struct.pack(">I", value)[1:] - - -# Exclude this function from coverage testing because it is currently -# not used. -def tls_parse_client_hello(tls_record): # pragma: no cover - # pylint: disable=too-many-return-statements,too-many-locals,bad-builtin - # pylint: disable=too-many-branches - """If possible, parse the specified TLS record as a ClientHello and - return the first host_name indicated in a Server Name Indication - extension within that ClientHello. If the TLS record could not - be parsed or there is no such extension or host_name present, - return None. - - :param str tls_record: The TLS record to be parsed (which is assumed - to contain a single ClientHello handshake message).""" - - # TLS handshake? - if tls_record[0] != chr(0x16): - return None - - # TLS version - tls_version = tls_record[1:3] - if map(ord, tls_version) not in [[0x03, 0x01], [0x03, 0x02], [0x03, 0x03]]: - return None - - # TLS record length - tls_record_len = unpack_2bytes(tls_record[3:5]) - if len(tls_record) < tls_record_len: - return None - - # Handshake type, length, and version - handshake_type = tls_record[5] - if handshake_type != chr(0x01): - return None - handshake_len = unpack_3bytes(tls_record[6:9]) - handshake_version = tls_record[9:11] - handshake = tls_record[11:] - - # Handshake length includes handshake_version (2 bytes) - if len(handshake) + 2 < handshake_len: - return None - if map(ord, handshake_version) not in [[0x03, 0x01], [0x03, 0x02], - [0x03, 0x03]]: - return None - - # Random - unused_random = handshake[0:32] - - # Session ID - session_id_length = ord(handshake[32]) - i = 33 - i += session_id_length - - # Ciphersuites - ciphersuites_length = unpack_2bytes(handshake[i:i+2]) - if ciphersuites_length >= 2: - best_ciphersuite = handshake[i+2:i+4] - else: - best_ciphersuite = chr(0) + chr(0) - i += 2 - i += ciphersuites_length - - # Compression methods - compression_length = ord(handshake[i]) - i += 1 - i += compression_length - - # ClientHello extensions - extensions_length = unpack_2bytes(handshake[i:i+2]) - i += 2 - if extensions_length < 10: - # Minimum size of a 1-byte SNI hostname extension - return None - - while i < len(handshake): - # XXX If stated extension lengths are wrong or inconsistent or - # XXX if the packet has been truncated in the middle of an - # XXX extension, this may crash or hang! This needs to be updated - # XXX to fail cleanly when confronted with inconsistent extension - # XXX fields. - extension_type = handshake[i:i+2] - if extension_type == "\0\0": - # SNI - extension_length = unpack_2bytes(handshake[i+2:i+4]) - i += 4 - unused_server_name_list_length = unpack_2bytes(handshake[i:i+2]) - first_sn_type = handshake[i+2] - if first_sn_type != "\0": - # SNI extension referenced something other than a - # hostname - return None - first_sn_length = unpack_2bytes(handshake[i+3:i+5]) - first_sn = handshake[i+5:i+5+first_sn_length] - return best_ciphersuite, first_sn - else: - # Other than SNI - extension_length = unpack_2bytes(handshake[i+2:i+4]) - i += 4 - i += extension_length - continue - return None - - -def tls_generate_server_hello(ciphersuite): - """Generate a TLS 1.2 ServerHello message. - - :param ciphersuite str: The ciphersuite that the ServerHello will - claim to have selected (two bytes).""" - - # Handshake type: ServerHello (0x02) - server_hello = chr(0x02) - # ServerHello length (38 bytes based on below) - server_hello += chr(0x0) + chr(0x0) + chr(38) - # TLS version (0x0303) - server_hello += chr(0x03) + chr(0x03) - # Server Random - server_hello += Crypto.Random.new().read(32) - # Session ID length (0) - server_hello += chr(0x0) - # Ciphersuite - server_hello += ciphersuite - # Compression method (null) - server_hello += chr(0x0) - # Extension length (2 bytes) + extensions go here if any extensions - # are required, BUT if no extensions are present then the extensions - # and extension length field are both omitted entirely (rather than - # declaring extension length 0x0000) - see RFC 5246 p. 42. - - # TLS handshake - tls_record = chr(0x16) - # TLS version - tls_record += chr(0x03) + chr(0x03) - # TLS record length - assert len(server_hello) < 256 - tls_record += chr(0) + chr(len(server_hello)) - # Append server hello handshake - tls_record += server_hello - return tls_record - - -def tls_generate_cert_msg(cert_pem): - """Generate a TLS 1.2 Certificate handshake message containing a - single certificate. - - :param str cert_pem: The certificate to be include in the message (in - PEM format).""" - - cert_as_der = M2Crypto.X509.load_cert_string(cert_pem).as_der() - # Handshake type: Certificate (0x0b) - cert_msg = chr(0x0b) - - cert_msg_length = len(cert_as_der) + 6 - cert_msg += pack_3bytes(cert_msg_length) - - certs_length = len(cert_as_der) + 3 - cert_msg += pack_3bytes(certs_length) - - cert_length = len(cert_as_der) - cert_msg += pack_3bytes(cert_length) - - cert_msg += cert_as_der - - # TLS handshake - tls_record = chr(0x16) - # TLS version - tls_record += chr(0x03) + chr(0x03) - # TLS record length - assert len(cert_msg) < 65536 - tls_record += pack_2bytes(len(cert_msg)) - # Append certificate handshake - tls_record += cert_msg - return tls_record - - -def tls_generate_server_hello_done(): - """Generate a TLS 1.2 ServerHelloDone message.""" - - return "16030300040e000000".decode("hex") - - class StandaloneAuthenticator(object): # pylint: disable=too-many-instance-attributes """The StandaloneAuthenticator class itself, which can be invoked @@ -391,31 +185,6 @@ class StandaloneAuthenticator(object): self.ssl_conn.shutdown() self.ssl_conn.close() - # The code below uses the minimal pure Python implementation - # of TLS ClientHello, ServerHello, and Certificate messages - # (as an alternative to a full TLS implementation). It will - # not reach Finished state with a compliant TLS implementation. - # - # client_hello = self.connection.recv(65536) - # result = tls_parse_client_hello(client_hello) - # if result is None: - # print "No SNI found in ClientHello, dropping connection" - # self.connection.close() - # continue - # ciphersuite, sni = result - # if sni in self.tasks: - # pem_cert = self.tasks[sni] - # else: - # # We don't know which cert to send! - # print "Unexpected SNI value", sni - # # Choose the "first" cert and send it (but maybe we - # # should just disconnect instead?) - # pem_cert = self.tasks.values()[0] - # self.connection.send(tls_generate_server_hello(ciphersuite)) - # self.connection.send(tls_generate_cert_msg(pem_cert)) - # self.connection.send(tls_generate_server_hello_done()) - # self.connection.close() - def start_listener(self, port, key): """Create a child process which will start a TCP listener on the specified port to perform the specified DVSNI challenges. diff --git a/letsencrypt/client/tests/standalone_authenticator_test.py b/letsencrypt/client/tests/standalone_authenticator_test.py index 21a738363..d855cced9 100644 --- a/letsencrypt/client/tests/standalone_authenticator_test.py +++ b/letsencrypt/client/tests/standalone_authenticator_test.py @@ -38,112 +38,6 @@ class CallableExhausted(Exception): pass -class PackAndUnpackTests(unittest.TestCase): - """Tests for byte packing and unpacking routines used for TLS - parsing.""" - def test_pack_and_unpack_bytes(self): - from letsencrypt.client.standalone_authenticator import \ - unpack_2bytes, unpack_3bytes, pack_2bytes, pack_3bytes - self.assertEqual(unpack_2bytes("JZ"), 19034) - self.assertEqual(unpack_2bytes(chr(0)*2), 0) - self.assertEqual(unpack_2bytes(chr(255)*2), 65535) - - self.assertEqual(unpack_3bytes("abc"), 6382179) - self.assertEqual(unpack_3bytes(chr(0)*3), 0) - self.assertEqual(unpack_3bytes(chr(255)*3), 16777215) - - self.assertEqual(pack_2bytes(12), chr(0) + chr(12)) - self.assertEqual(pack_2bytes(1729), chr(6) + chr(193)) - - self.assertEqual(pack_3bytes(0), chr(0)*3) - self.assertEqual(pack_3bytes(12345678), chr(0xbc) + "aN") - - def test_invalid_pack_and_unpack(self): - from letsencrypt.client.standalone_authenticator import \ - unpack_2bytes, unpack_3bytes, pack_2bytes, pack_3bytes - self.assertRaises(AssertionError, pack_2bytes, 65537) - self.assertRaises(AssertionError, pack_3bytes, 500000000) - self.assertRaises(AssertionError, unpack_2bytes, "foo") - self.assertRaises(AssertionError, unpack_3bytes, "food") - - -class TLSParseClientHelloTest(unittest.TestCase): - # pylint: disable=too-few-public-methods - """Test for tls_parse_client_hello() function.""" - def test_tls_parse_client_hello(self): - from letsencrypt.client.standalone_authenticator import \ - tls_parse_client_hello - client_hello = "16030100c4010000c003030cfef9971eda442c60cbb6c397" \ - "7957a81a8ada317e800b7867a8c61f71c40cab000020c02b" \ - "c02fc00ac009c013c014c007c011003300320039002f0035" \ - "000a000500040100007700000010000e00000b7777772e65" \ - "66662e6f7267ff01000100000a0008000600170018001900" \ - "0b00020100002300003374000000100021001f0568322d31" \ - "3408737064792f332e3106737064792f3308687474702f31" \ - "2e31000500050100000000000d0012001004010501020104" \ - "030503020304020202".decode("hex") - return_value = tls_parse_client_hello(client_hello) - self.assertEqual(return_value, (chr(0xc0) + chr(0x2b), "www.eff.org")) - # TODO: The failure cases are extremely numerous and require - # constructing TLS ClientHello messages that are individually - # defective or surprising in distinct ways. (Each invalid TLS - # record is invalid in its own way.) - - -class TLSGenerateServerHelloTest(unittest.TestCase): - # pylint: disable=too-few-public-methods - """Tests for tls_generate_server_hello() function.""" - def test_tls_generate_server_hello(self): - from letsencrypt.client.standalone_authenticator import \ - tls_generate_server_hello - server_hello = tls_generate_server_hello("Q!") - self.assertEqual(server_hello[:11].encode("hex"), - '160303002a020000260303') - self.assertEqual(server_hello[43:], chr(0) + 'Q!' + chr(0)) - - -class TLSGenerateCertMsgTest(unittest.TestCase): - # pylint: disable=too-few-public-methods - """Tests for tls_generate_cert_msg() function.""" - def test_tls_generate_cert_msg(self): - from letsencrypt.client.standalone_authenticator import \ - tls_generate_cert_msg - cert = pkg_resources.resource_string(__name__, - 'testdata/cert.pem') - cert_msg = tls_generate_cert_msg(cert) - self.assertEqual(cert_msg.encode("hex"), - "16030301ec0b0001e80001e50001e2308201de30820188a003" - "02010202020539300d06092a864886f70d01010b0500307731" - "0b30090603550406130255533111300f06035504080c084d69" - "63686967616e3112301006035504070c09416e6e204172626f" - "72312b3029060355040a0c22556e6976657273697479206f66" - "204d6963686967616e20616e64207468652045464631143012" - "06035504030c0b6578616d706c652e636f6d301e170d313431" - "3231313232333434355a170d3134313231383232333434355a" - "3077310b30090603550406130255533111300f06035504080c" - "084d6963686967616e3112301006035504070c09416e6e2041" - "72626f72312b3029060355040a0c22556e6976657273697479" - "206f66204d6963686967616e20616e64207468652045464631" - "14301206035504030c0b6578616d706c652e636f6d305c300d" - "06092a864886f70d0101010500034b003048024100ac7573b4" - "51ed1fddae705243fcdfc75bd02c751b14b875010410e51f03" - "6545dddfa79f34aefdbee90584df471681d9894bce8e6d1cfa" - "9544e8af84744fedc2e50203010001300d06092a864886f70d" - "01010b05000341002db8cf421dc0854a4a59ed92c965bebeb3" - "25ea411f97cc9dd7e4dd7269d748d3e9513ed7828db63874d9" - "ae7a1a8ada02f2404f9fc7ebb13c1af27fa1c36707fa") - - -class TLSServerHelloDoneTest(unittest.TestCase): - # pylint: disable=too-few-public-methods - """Tests for tls_generate_server_hello_done() function.""" - def test_tls_generate_server_hello_done(self): - from letsencrypt.client.standalone_authenticator import \ - tls_generate_server_hello_done - self.assertEqual(tls_generate_server_hello_done().encode("hex"), \ - "16030300040e000000") - - class ChallPrefTest(unittest.TestCase): """Tests for chall_pref() method.""" def setUp(self):