1
0
mirror of https://github.com/certbot/certbot.git synced 2026-01-23 07:20:55 +03:00

Merge pull request #244 from letsencrypt/already_listening

Already listening
This commit is contained in:
schoen
2015-02-13 15:24:33 -08:00
3 changed files with 144 additions and 2 deletions

View File

@@ -66,3 +66,7 @@ IConfig.work_dir. Used for easy revocation."""
REC_TOKEN_DIR = "recovery_tokens"
"""Directory where all recovery tokens are saved (relative to
IConfig.work_dir)."""
NETSTAT = "/bin/netstat"
"""Location of netstat binary for checking whether a listener is already
running on the specified port (Linux-specific)."""

View File

@@ -2,6 +2,7 @@
import os
import signal
import socket
import subprocess
import sys
import time
@@ -150,8 +151,9 @@ class StandaloneAuthenticator(object):
elif self.subproc_state == "inuse":
display.generic_notification(
"Could not bind TCP port {0} because it is already in "
"use it is already in use by another process on this "
"system (such as a web server).".format(port))
"use by another process on this system (such as a web "
"server). Please stop the program in question and then "
"try again.".format(port))
return False
elif self.subproc_state == "cantbind":
display.generic_notification(
@@ -258,6 +260,51 @@ class StandaloneAuthenticator(object):
# should terminate via sys.exit().
return self.do_child_process(port, key)
def already_listening(self, port): # pylint: disable=no-self-use
"""Check if a process is already listening on the port.
If so, also tell the user via a display notification.
.. warning::
The current implementation is Linux-specific. (On other
operating systems, it will simply not detect bound ports.)
This function can only usefully be run as root.
:param int port: The TCP port in question.
:returns: True or False."""
try:
proc = subprocess.Popen(
[constants.NETSTAT, "-nta", "--program"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _ = proc.communicate()
if proc.wait() != 0:
raise OSError("netstat subprocess failed")
lines = [x.split() for x in stdout.split("\n")[2:] if x]
listeners = [L[6] for L in lines if
# IPv4 socket case
(L[0] == 'tcp' and L[5] == 'LISTEN' \
and L[3] == '0.0.0.0:{0}'.format(port)) or \
# IPv6 socket case
(L[0] == 'tcp6' and L[5] == 'LISTEN' \
and L[3] == ':::{0}'.format(port))]
if listeners:
pid, name = listeners[0].split("/")
display = zope.component.getUtility(interfaces.IDisplay)
display.generic_notification(
"The program {0} (process ID {1}) is already listening "
"on TCP port {2}. This will prevent us from binding to "
"that port. Please stop the {0} program temporarily "
"and then try again.".format(name, pid, port))
return True
except (OSError, ValueError, IndexError):
# A sign that this command isn't available or usable this
# way on this operating system, or there was something
# unexpected about the format of the netstat output; we will
# not be able to recover from this condition.
pass
return False
# IAuthenticator method implementations follow
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
@@ -317,6 +364,12 @@ class StandaloneAuthenticator(object):
results_if_failure.append(False)
if not self.tasks:
raise ValueError("nothing for .perform() to do")
if self.already_listening(constants.DVSNI_CHALLENGE_PORT):
# If we know a process is already listening on this port,
# tell the user, and don't even attempt to bind it. (This
# test is Linux-specific and won't indicate that the port
# is bound if invoked on a different operating system.)
return results_if_failure
# Try to do the authentication; note that this creates
# the listener subprocess via os.fork()
if self.start_listener(constants.DVSNI_CHALLENGE_PORT, key):

View File

@@ -96,6 +96,7 @@ class SNICallbackTest(unittest.TestCase):
called_ctx = connection.set_context.call_args[0][0]
self.assertTrue(isinstance(called_ctx, OpenSSL.SSL.Context))
class ClientSignalHandlerTest(unittest.TestCase):
"""Tests for client_signal_handler() method."""
def setUp(self):
@@ -177,6 +178,79 @@ class SubprocSignalHandlerTest(unittest.TestCase):
mock_exit.assert_called_once_with(0)
class AlreadyListeningTest(unittest.TestCase):
"""Tests for already_listening() method."""
def setUp(self):
from letsencrypt.client.standalone_authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
@mock.patch("letsencrypt.client.standalone_authenticator.subprocess.Popen")
def test_subprocess_fails(self, mock_popen):
subprocess_object = mock.MagicMock()
subprocess_object.communicate.return_value = ("foo", "bar")
subprocess_object.wait.return_value = 1
mock_popen.return_value = subprocess_object
result = self.authenticator.already_listening(17)
self.assertFalse(result)
subprocess_object.wait.assert_called_once_with()
@mock.patch("letsencrypt.client.standalone_authenticator.subprocess.Popen")
def test_no_relevant_line(self, mock_popen):
# pylint: disable=line-too-long,trailing-whitespace
subprocess_object = mock.MagicMock()
subprocess_object.communicate.return_value = (
"""Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.1.1:53 0.0.0.0:* LISTEN 1234/foo
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 2345/bar
tcp 0 0 0.0.0.0:180 0.0.0.0:* LISTEN 11111/hello """,
"I am the standard error")
subprocess_object.wait.return_value = 0
mock_popen.return_value = subprocess_object
result = self.authenticator.already_listening(17)
self.assertFalse(result)
@mock.patch("letsencrypt.client.standalone_authenticator.subprocess.Popen")
@mock.patch("letsencrypt.client.standalone_authenticator."
"zope.component.getUtility")
def test_has_relevant_line(self, mock_get_utility, mock_popen):
# pylint: disable=line-too-long,trailing-whitespace
subprocess_object = mock.MagicMock()
subprocess_object.communicate.return_value = (
"""Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.1.1:53 0.0.0.0:* LISTEN 1234/foo
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 2345/bar
tcp 0 0 0.0.0.0:17 0.0.0.0:* LISTEN 11111/hello
tcp 0 0 0.0.0.0:1728 0.0.0.0:* LISTEN 2345/bar """,
"I am the standard error")
subprocess_object.wait.return_value = 0
mock_popen.return_value = subprocess_object
result = self.authenticator.already_listening(17)
self.assertTrue(result)
self.assertEqual(mock_get_utility.call_count, 1)
@mock.patch("letsencrypt.client.standalone_authenticator.subprocess.Popen")
@mock.patch("letsencrypt.client.standalone_authenticator."
"zope.component.getUtility")
def test_has_relevant_ipv6_line(self, mock_get_utility, mock_popen):
# pylint: disable=line-too-long,trailing-whitespace
subprocess_object = mock.MagicMock()
subprocess_object.communicate.return_value = (
"""Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.1.1:53 0.0.0.0:* LISTEN 1234/foo
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 2345/bar
tcp6 0 0 :::17 :::* LISTEN 11111/hello
tcp 0 0 0.0.0.0:1728 0.0.0.0:* LISTEN 2345/bar """,
"I am the standard error")
subprocess_object.wait.return_value = 0
mock_popen.return_value = subprocess_object
result = self.authenticator.already_listening(17)
self.assertTrue(result)
self.assertEqual(mock_get_utility.call_count, 1)
class PerformTest(unittest.TestCase):
"""Tests for perform() method."""
def setUp(self):
@@ -184,6 +258,17 @@ class PerformTest(unittest.TestCase):
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
def test_perform_when_already_listening(self):
test_key = pkg_resources.resource_string(
__name__, "testdata/rsa256_key.pem")
key = le_util.Key("something", test_key)
chall1 = challenge_util.DvsniChall(
"foo.example.com", "whee", "foononce", key)
self.authenticator.already_listening = mock.Mock()
self.authenticator.already_listening.return_value = True
result = self.authenticator.perform([chall1])
self.assertEqual(result, [None])
def test_can_perform(self):
"""What happens if start_listener() returns True."""
test_key = pkg_resources.resource_string(