From e7afd833083f886b0a501c27ac07e766a39758c9 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Wed, 12 Apr 2006 01:42:50 -0700 Subject: [PATCH] [project @ robey@lag.net-20060412084250-bd347f3f01aca394] flip the switch: use BufferedPipe inside Channel --- paramiko/channel.py | 162 +++++++++++++------------------------------- 1 file changed, 47 insertions(+), 115 deletions(-) diff --git a/paramiko/channel.py b/paramiko/channel.py index 0b25448..7eb5b08 100644 --- a/paramiko/channel.py +++ b/paramiko/channel.py @@ -31,6 +31,7 @@ from paramiko import util from paramiko.message import Message from paramiko.ssh_exception import SSHException from paramiko.file import BufferedFile +from paramiko.buffered_pipe import BufferedPipe, PipeTimeout from paramiko import pipe @@ -69,14 +70,12 @@ class Channel (object): self.active = False self.eof_received = 0 self.eof_sent = 0 - self.in_buffer = '' - self.in_stderr_buffer = '' + self.in_buffer = BufferedPipe() + self.in_stderr_buffer = BufferedPipe() self.timeout = None self.closed = False self.ultra_debug = False self.lock = threading.Lock() - self.in_buffer_cv = threading.Condition(self.lock) - self.in_stderr_buffer_cv = threading.Condition(self.lock) self.out_buffer_cv = threading.Condition(self.lock) self.in_window_size = 0 self.out_window_size = 0 @@ -368,8 +367,7 @@ class Channel (object): self.combine_stderr = combine if combine and not old: # copy old stderr buffer into primary buffer - data = self.in_stderr_buffer - self.in_stderr_buffer = '' + data = self.in_stderr_buffer.empty() finally: self.lock.release() if len(data) > 0: @@ -393,7 +391,7 @@ class Channel (object): C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}. @param timeout: seconds to wait for a pending read/write operation - before raising C{socket.timeout}, or C{None} for no timeout. + before raising C{socket.timeout}, or C{None} for no timeout. @type timeout: float """ self.timeout = timeout @@ -477,13 +475,7 @@ class Channel (object): return at least one byte; C{False} otherwise. @rtype: boolean """ - self.lock.acquire() - try: - if len(self.in_buffer) == 0: - return False - return True - finally: - self.lock.release() + return self.in_buffer.read_ready() def recv(self, nbytes): """ @@ -500,38 +492,12 @@ class Channel (object): @raise socket.timeout: if no data is ready before the timeout set by L{settimeout}. """ - out = '' - self.lock.acquire() try: - if len(self.in_buffer) == 0: - if self.closed or self.eof_received: - return out - # should we block? - if self.timeout == 0.0: - raise socket.timeout() - # loop here in case we get woken up but a different thread has grabbed everything in the buffer - timeout = self.timeout - while (len(self.in_buffer) == 0) and not self.closed and not self.eof_received: - then = time.time() - self.in_buffer_cv.wait(timeout) - if timeout != None: - timeout -= time.time() - then - if timeout <= 0.0: - raise socket.timeout() - # something in the buffer and we have the lock - if len(self.in_buffer) <= nbytes: - out = self.in_buffer - self.in_buffer = '' - if (self.pipe is not None) and not (self.closed or self.eof_received): - # clear the pipe, since no more data is buffered - self.pipe.clear() - else: - out = self.in_buffer[:nbytes] - self.in_buffer = self.in_buffer[nbytes:] - ack = self._check_add_window(len(out)) - finally: - self.lock.release() + out = self.in_buffer.read(nbytes, self.timeout) + except PipeTimeout, e: + raise socket.timeout() + ack = self._check_add_window(len(out)) # no need to hold the channel lock when sending this if ack > 0: m = Message() @@ -555,13 +521,7 @@ class Channel (object): @since: 1.1 """ - self.lock.acquire() - try: - if len(self.in_stderr_buffer) == 0: - return False - return True - finally: - self.lock.release() + return self.in_stderr_buffer.read_ready() def recv_stderr(self, nbytes): """ @@ -582,34 +542,20 @@ class Channel (object): @since: 1.1 """ - out = '' - self.lock.acquire() try: - if len(self.in_stderr_buffer) == 0: - if self.closed or self.eof_received: - return out - # should we block? - if self.timeout == 0.0: - raise socket.timeout() - # loop here in case we get woken up but a different thread has grabbed everything in the buffer - timeout = self.timeout - while (len(self.in_stderr_buffer) == 0) and not self.closed and not self.eof_received: - then = time.time() - self.in_stderr_buffer_cv.wait(timeout) - if timeout != None: - timeout -= time.time() - then - if timeout <= 0.0: - raise socket.timeout() - # something in the buffer and we have the lock - if len(self.in_stderr_buffer) <= nbytes: - out = self.in_stderr_buffer - self.in_stderr_buffer = '' - else: - out = self.in_stderr_buffer[:nbytes] - self.in_stderr_buffer = self.in_stderr_buffer[nbytes:] - self._check_add_window(len(out)) - finally: - self.lock.release() + out = self.in_stderr_buffer.read(nbytes, self.timeout) + except PipeTimeout, e: + raise socket.timeout() + + ack = self._check_add_window(len(out)) + # no need to hold the channel lock when sending this + if ack > 0: + m = Message() + m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) + m.add_int(self.remote_chanid) + m.add_int(ack) + self.transport._send_user_message(m) + return out def send(self, s): @@ -783,8 +729,7 @@ class Channel (object): return self.pipe.fileno() # create the pipe and feed in any existing data self.pipe = pipe.make_pipe() - if len(self.in_buffer) > 0: - self.pipe.set() + self.in_buffer.set_event(self.pipe) return self.pipe.fileno() finally: self.lock.release() @@ -880,16 +825,7 @@ class Channel (object): s = m else: s = m.get_string() - self.lock.acquire() - try: - if self.ultra_debug: - self._log(DEBUG, 'fed %d bytes' % len(s)) - if self.pipe is not None: - self.pipe.set() - self.in_buffer += s - self.in_buffer_cv.notifyAll() - finally: - self.lock.release() + self.in_buffer.feed(s) def _feed_extended(self, m): code = m.get_int() @@ -899,14 +835,7 @@ class Channel (object): return if self.combine_stderr: return self._feed(s) - self.lock.acquire() - try: - if self.ultra_debug: - self._log(DEBUG, 'fed %d stderr bytes' % len(s)) - self.in_stderr_buffer += s - self.in_stderr_buffer_cv.notifyAll() - finally: - self.lock.release() + self.in_stderr_buffer.feed(s) def _window_adjust(self, m): nbytes = m.get_int() @@ -987,8 +916,8 @@ class Channel (object): try: if not self.eof_received: self.eof_received = True - self.in_buffer_cv.notifyAll() - self.in_stderr_buffer_cv.notifyAll() + self.in_buffer.close() + self.in_stderr_buffer.close() if self.pipe is not None: self.pipe.set_forever() finally: @@ -1025,8 +954,8 @@ class Channel (object): def _set_closed(self): # you are holding the lock. self.closed = True - self.in_buffer_cv.notifyAll() - self.in_stderr_buffer_cv.notifyAll() + self.in_buffer.close() + self.in_stderr_buffer.close() self.out_buffer_cv.notifyAll() if self.pipe is not None: self.pipe.set_forever() @@ -1067,19 +996,22 @@ class Channel (object): self.lock.release() def _check_add_window(self, n): - # already holding the lock! - if self.closed or self.eof_received or not self.active: - return 0 - if self.ultra_debug: - self._log(DEBUG, 'addwindow %d' % n) - self.in_window_sofar += n - if self.in_window_sofar <= self.in_window_threshold: - return 0 - if self.ultra_debug: - self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar) - out = self.in_window_sofar - self.in_window_sofar = 0 - return out + self.lock.acquire() + try: + if self.closed or self.eof_received or not self.active: + return 0 + if self.ultra_debug: + self._log(DEBUG, 'addwindow %d' % n) + self.in_window_sofar += n + if self.in_window_sofar <= self.in_window_threshold: + return 0 + if self.ultra_debug: + self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar) + out = self.in_window_sofar + self.in_window_sofar = 0 + return out + finally: + self.lock.release() def _wait_for_send_window(self, size): """