Merge branch '1.11' into 1.12

This commit is contained in:
Jeff Forcier 2014-02-14 09:16:43 -08:00
commit 758543f238
3 changed files with 38 additions and 11 deletions

View File

@ -20,10 +20,13 @@
L{ProxyCommand}.
"""
from datetime import datetime
import os
from shlex import split as shlsplit
import signal
from subprocess import Popen, PIPE
from select import select
import socket
from paramiko.ssh_exception import ProxyCommandFailure
@ -48,6 +51,8 @@ class ProxyCommand(object):
"""
self.cmd = shlsplit(command_line)
self.process = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
self.timeout = None
self.buffer = []
def send(self, content):
"""
@ -64,7 +69,7 @@ class ProxyCommand(object):
# died and we can't proceed. The best option here is to
# raise an exception informing the user that the informed
# ProxyCommand is not working.
raise BadProxyCommand(' '.join(self.cmd), e.strerror)
raise ProxyCommandFailure(' '.join(self.cmd), e.strerror)
return len(content)
def recv(self, size):
@ -78,14 +83,30 @@ class ProxyCommand(object):
@rtype: int
"""
try:
return os.read(self.process.stdout.fileno(), size)
start = datetime.now()
while len(self.buffer) < size:
if self.timeout is not None:
elapsed = (datetime.now() - start).microseconds
timeout = self.timeout * 1000 * 1000 # to microseconds
if elapsed >= timeout:
raise socket.timeout()
r, w, x = select([self.process.stdout], [], [], 0.0)
if r and r[0] == self.process.stdout:
b = os.read(self.process.stdout.fileno(), 1)
# Store in class-level buffer for persistence across
# timeouts; this makes us act more like a real socket
# (where timeouts don't actually drop data.)
self.buffer.append(b)
result = ''.join(self.buffer)
self.buffer = []
return result
except socket.timeout:
raise # socket.timeout is a subclass of IOError
except IOError, e:
raise BadProxyCommand(' '.join(self.cmd), e.strerror)
raise ProxyCommandFailure(' '.join(self.cmd), e.strerror)
def close(self):
os.kill(self.process.pid, signal.SIGTERM)
def settimeout(self, timeout):
# Timeouts are meaningless for this implementation, but are part of the
# spec, so must be present.
pass
self.timeout = timeout

View File

@ -1541,10 +1541,6 @@ class Transport (threading.Thread):
# containers.
Random.atfork()
# Hold reference to 'sys' so we can test sys.modules to detect
# interpreter shutdown.
self.sys = sys
# active=True occurs before the thread is launched, to avoid a race
_active_threads.append(self)
if self.server_mode:
@ -1614,7 +1610,10 @@ class Transport (threading.Thread):
self.saved_exception = e
except socket.error, e:
if type(e.args) is tuple:
emsg = '%s (%d)' % (e.args[1], e.args[0])
if e.args:
emsg = '%s (%d)' % (e.args[1], e.args[0])
else: # empty tuple, e.g. socket.timeout
emsg = str(e) or repr(e)
else:
emsg = e.args
self._log(ERROR, 'Socket exception: ' + emsg)

View File

@ -2,6 +2,13 @@
Changelog
=========
* :bug:`252` (`Fabric #1020 <https://github.com/fabric/fabric/issues/1020>`_)
Enhanced the implementation of ``ProxyCommand`` to avoid a deadlock/hang
condition that frequently occurs at ``Transport`` shutdown time. Thanks to
Mateusz Kobos, Matthijs van der Vleuten and Guillaume Zitta for the original
reports and to Marius Gedminas for helping test nontrivial use cases.
* :bug:`268` Fix some missed renames of ``ProxyCommand`` related error classes.
Thanks to Marius Gedminas for catch & patch.
* :bug:`34` (PR :issue:`35`) Fix SFTP prefetching incompatibility with some
SFTP servers regarding request/response ordering. Thanks to Richard
Kettlewell for catch & patch.