don't call _send_user_message() while holding the channel lock. the call may block waiting for CTS (during rekey), and the feeder thread may be waiting on the channel lock to clear out channel traffic before it gets to the kex-init. instead, the 2 methods that wanted to send from inside the lock now just return messages to send. slightly hacky but functional. :)
This commit is contained in:
parent
72e28736e2
commit
373e65dd97
|
@ -468,7 +468,7 @@ class Channel (object):
|
||||||
try:
|
try:
|
||||||
if not self.active or self.closed:
|
if not self.active or self.closed:
|
||||||
return
|
return
|
||||||
self._close_internal()
|
msgs = self._close_internal()
|
||||||
|
|
||||||
# only close the pipe when the user explicitly closes the channel.
|
# only close the pipe when the user explicitly closes the channel.
|
||||||
# otherwise they will get unpleasant surprises.
|
# otherwise they will get unpleasant surprises.
|
||||||
|
@ -477,6 +477,9 @@ class Channel (object):
|
||||||
self.pipe = None
|
self.pipe = None
|
||||||
finally:
|
finally:
|
||||||
self.lock.release()
|
self.lock.release()
|
||||||
|
for m in msgs:
|
||||||
|
if m is not None:
|
||||||
|
self.transport._send_user_message(m)
|
||||||
|
|
||||||
def recv_ready(self):
|
def recv_ready(self):
|
||||||
"""
|
"""
|
||||||
|
@ -589,7 +592,7 @@ class Channel (object):
|
||||||
@rtype: str
|
@rtype: str
|
||||||
|
|
||||||
@raise socket.timeout: if no data is ready before the timeout set by
|
@raise socket.timeout: if no data is ready before the timeout set by
|
||||||
L{settimeout}.
|
L{settimeout}.
|
||||||
|
|
||||||
@since: 1.1
|
@since: 1.1
|
||||||
"""
|
"""
|
||||||
|
@ -637,7 +640,7 @@ class Channel (object):
|
||||||
@rtype: int
|
@rtype: int
|
||||||
|
|
||||||
@raise socket.timeout: if no data could be sent before the timeout set
|
@raise socket.timeout: if no data could be sent before the timeout set
|
||||||
by L{settimeout}.
|
by L{settimeout}.
|
||||||
"""
|
"""
|
||||||
size = len(s)
|
size = len(s)
|
||||||
self.lock.acquire()
|
self.lock.acquire()
|
||||||
|
@ -670,7 +673,7 @@ class Channel (object):
|
||||||
@rtype: int
|
@rtype: int
|
||||||
|
|
||||||
@raise socket.timeout: if no data could be sent before the timeout set
|
@raise socket.timeout: if no data could be sent before the timeout set
|
||||||
by L{settimeout}.
|
by L{settimeout}.
|
||||||
|
|
||||||
@since: 1.1
|
@since: 1.1
|
||||||
"""
|
"""
|
||||||
|
@ -817,9 +820,11 @@ class Channel (object):
|
||||||
if (how == 1) or (how == 2):
|
if (how == 1) or (how == 2):
|
||||||
self.lock.acquire()
|
self.lock.acquire()
|
||||||
try:
|
try:
|
||||||
self._send_eof()
|
m = self._send_eof()
|
||||||
finally:
|
finally:
|
||||||
self.lock.release()
|
self.lock.release()
|
||||||
|
if m is not None:
|
||||||
|
self.transport._send_user_message(m)
|
||||||
|
|
||||||
def shutdown_read(self):
|
def shutdown_read(self):
|
||||||
"""
|
"""
|
||||||
|
@ -876,9 +881,12 @@ class Channel (object):
|
||||||
def _request_failed(self, m):
|
def _request_failed(self, m):
|
||||||
self.lock.acquire()
|
self.lock.acquire()
|
||||||
try:
|
try:
|
||||||
self._close_internal()
|
msgs = self._close_internal()
|
||||||
finally:
|
finally:
|
||||||
self.lock.release()
|
self.lock.release()
|
||||||
|
for m in msgs:
|
||||||
|
if m is not None:
|
||||||
|
self.transport._send_user_message(m)
|
||||||
|
|
||||||
def _feed(self, m):
|
def _feed(self, m):
|
||||||
if type(m) is str:
|
if type(m) is str:
|
||||||
|
@ -1004,10 +1012,13 @@ class Channel (object):
|
||||||
def _handle_close(self, m):
|
def _handle_close(self, m):
|
||||||
self.lock.acquire()
|
self.lock.acquire()
|
||||||
try:
|
try:
|
||||||
self._close_internal()
|
msgs = self._close_internal()
|
||||||
self.transport._unlink_channel(self.chanid)
|
self.transport._unlink_channel(self.chanid)
|
||||||
finally:
|
finally:
|
||||||
self.lock.release()
|
self.lock.release()
|
||||||
|
for m in msgs:
|
||||||
|
if m is not None:
|
||||||
|
self.transport._send_user_message(m)
|
||||||
|
|
||||||
|
|
||||||
### internals...
|
### internals...
|
||||||
|
@ -1028,30 +1039,26 @@ class Channel (object):
|
||||||
def _send_eof(self):
|
def _send_eof(self):
|
||||||
# you are holding the lock.
|
# you are holding the lock.
|
||||||
if self.eof_sent:
|
if self.eof_sent:
|
||||||
return
|
return None
|
||||||
m = Message()
|
m = Message()
|
||||||
m.add_byte(chr(MSG_CHANNEL_EOF))
|
m.add_byte(chr(MSG_CHANNEL_EOF))
|
||||||
m.add_int(self.remote_chanid)
|
m.add_int(self.remote_chanid)
|
||||||
self.transport._send_user_message(m)
|
|
||||||
self.eof_sent = True
|
self.eof_sent = True
|
||||||
self._log(DEBUG, 'EOF sent')
|
self._log(DEBUG, 'EOF sent')
|
||||||
return
|
return m
|
||||||
|
|
||||||
def _close_internal(self):
|
def _close_internal(self):
|
||||||
# you are holding the lock.
|
# you are holding the lock.
|
||||||
if not self.active or self.closed:
|
if not self.active or self.closed:
|
||||||
return
|
return None, None
|
||||||
try:
|
m1 = self._send_eof()
|
||||||
self._send_eof()
|
m2 = Message()
|
||||||
m = Message()
|
m2.add_byte(chr(MSG_CHANNEL_CLOSE))
|
||||||
m.add_byte(chr(MSG_CHANNEL_CLOSE))
|
m2.add_int(self.remote_chanid)
|
||||||
m.add_int(self.remote_chanid)
|
|
||||||
self.transport._send_user_message(m)
|
|
||||||
except EOFError:
|
|
||||||
pass
|
|
||||||
self._set_closed()
|
self._set_closed()
|
||||||
# can't unlink from the Transport yet -- the remote side may still
|
# can't unlink from the Transport yet -- the remote side may still
|
||||||
# try to send meta-data (exit-status, etc)
|
# try to send meta-data (exit-status, etc)
|
||||||
|
return m1, m2
|
||||||
|
|
||||||
def _unlink(self):
|
def _unlink(self):
|
||||||
# server connection could die before we become active: still signal the close!
|
# server connection could die before we become active: still signal the close!
|
||||||
|
|
Loading…
Reference in New Issue