[project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--patch-58]
some Channel fixes for max packet size & blocking on zero window some clean-ups and fixes to channels: * when send() is blocked on a zero-width window, check that the channel is still open. this was causing some lockups. * set a lower bound to the "maximum packet size" we accept from the remote host. if they tell us anything less than 1KB, assume they meant 1KB. (it's not reasonable to fragment below that.) * leave a little padding instead of cutting right up to the maximum packet size: some space will be taken up by protocol overhead. * turn off some of the debug log lines unless "ultra_debug" is on (nobody cares about the feed info)
This commit is contained in:
parent
1144a5d3d9
commit
cba104ce3f
|
@ -15,7 +15,7 @@
|
|||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with Foobar; if not, write to the Free Software Foundation, Inc.,
|
||||
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
|
||||
|
||||
"""
|
||||
|
@ -52,6 +52,9 @@ class Channel (object):
|
|||
is exactly like a normal network socket, so it shouldn't be too surprising.
|
||||
"""
|
||||
|
||||
# lower bound on the max packet size we'll accept from the remote host
|
||||
MIN_PACKET_SIZE = 1024
|
||||
|
||||
def __init__(self, chanid):
|
||||
"""
|
||||
Create a new channel. The channel is not associated with any
|
||||
|
@ -71,6 +74,7 @@ class Channel (object):
|
|||
self.in_buffer = ''
|
||||
self.timeout = None
|
||||
self.closed = False
|
||||
self.ultra_debug = False
|
||||
self.lock = threading.Lock()
|
||||
self.in_buffer_cv = threading.Condition(self.lock)
|
||||
self.out_buffer_cv = threading.Condition(self.lock)
|
||||
|
@ -390,10 +394,10 @@ class Channel (object):
|
|||
by L{settimeout}.
|
||||
"""
|
||||
size = 0
|
||||
if self.closed or self.eof_sent:
|
||||
return size
|
||||
try:
|
||||
self.lock.acquire()
|
||||
if self.closed or self.eof_sent:
|
||||
return 0
|
||||
if self.out_window_size == 0:
|
||||
# should we block?
|
||||
if self.timeout == 0.0:
|
||||
|
@ -401,6 +405,8 @@ class Channel (object):
|
|||
# loop here in case we get woken up but a different thread has filled the buffer
|
||||
timeout = self.timeout
|
||||
while self.out_window_size == 0:
|
||||
if self.closed or self.eof_sent:
|
||||
return 0
|
||||
then = time.time()
|
||||
self.out_buffer_cv.wait(timeout)
|
||||
if timeout != None:
|
||||
|
@ -413,14 +419,16 @@ class Channel (object):
|
|||
size = len(s)
|
||||
if self.out_window_size < size:
|
||||
size = self.out_window_size
|
||||
if self.out_max_packet_size < size:
|
||||
size = self.out_max_packet_size
|
||||
if self.out_max_packet_size - 64 < size:
|
||||
size = self.out_max_packet_size - 64
|
||||
m = Message()
|
||||
m.add_byte(chr(MSG_CHANNEL_DATA))
|
||||
m.add_int(self.remote_chanid)
|
||||
m.add_string(s[:size])
|
||||
self.transport._send_message(m)
|
||||
self.out_window_size -= size
|
||||
if self.ultra_debug:
|
||||
self._log(DEBUG, 'window down to %d' % self.out_window_size)
|
||||
finally:
|
||||
self.lock.release()
|
||||
return size
|
||||
|
@ -612,12 +620,14 @@ class Channel (object):
|
|||
# threshold of bytes we receive before we bother to send a window update
|
||||
self.in_window_threshold = window_size // 10
|
||||
self.in_window_sofar = 0
|
||||
self._log(DEBUG, 'Max packet in: %d bytes' % max_packet_size)
|
||||
|
||||
def _set_remote_channel(self, chanid, window_size, max_packet_size):
|
||||
self.remote_chanid = chanid
|
||||
self.out_window_size = window_size
|
||||
self.out_max_packet_size = max_packet_size
|
||||
self.out_max_packet_size = max(max_packet_size, self.MIN_PACKET_SIZE)
|
||||
self.active = 1
|
||||
self._log(DEBUG, 'Max packet out: %d bytes' % max_packet_size)
|
||||
|
||||
def _request_success(self, m):
|
||||
self._log(DEBUG, 'Sesch channel %d request ok' % self.chanid)
|
||||
|
@ -630,13 +640,15 @@ class Channel (object):
|
|||
s = m.get_string()
|
||||
try:
|
||||
self.lock.acquire()
|
||||
self._log(DEBUG, 'fed %d bytes' % len(s))
|
||||
if self.ultra_debug:
|
||||
self._log(DEBUG, 'fed %d bytes' % len(s))
|
||||
if self.pipe_wfd != None:
|
||||
self._feed_pipe(s)
|
||||
else:
|
||||
self.in_buffer += s
|
||||
self.in_buffer_cv.notifyAll()
|
||||
self._log(DEBUG, '(out from feed)')
|
||||
if self.ultra_debug:
|
||||
self._log(DEBUG, '(out from feed)')
|
||||
finally:
|
||||
self.lock.release()
|
||||
|
||||
|
@ -644,7 +656,8 @@ class Channel (object):
|
|||
nbytes = m.get_int()
|
||||
try:
|
||||
self.lock.acquire()
|
||||
self._log(DEBUG, 'window up %d' % nbytes)
|
||||
if self.ultra_debug or True:
|
||||
self._log(DEBUG, 'window up %d' % nbytes)
|
||||
self.out_window_size += nbytes
|
||||
self.out_buffer_cv.notifyAll()
|
||||
finally:
|
||||
|
@ -832,10 +845,12 @@ class Channel (object):
|
|||
# already holding the lock!
|
||||
if self.closed or self.eof_received or not self.active:
|
||||
return
|
||||
self._log(DEBUG, 'addwindow %d' % n)
|
||||
if self.ultra_debug:
|
||||
self._log(DEBUG, 'addwindow %d' % n)
|
||||
self.in_window_sofar += n
|
||||
if self.in_window_sofar > self.in_window_threshold:
|
||||
self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar)
|
||||
if self.ultra_debug:
|
||||
self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar)
|
||||
m = Message()
|
||||
m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST))
|
||||
m.add_int(self.remote_chanid)
|
||||
|
|
Loading…
Reference in New Issue