flip the switch: use BufferedPipe inside Channel
This commit is contained in:
Robey Pointer 2006-04-12 01:42:50 -07:00
parent 22db02c82f
commit e7afd83308
1 changed files with 47 additions and 115 deletions

View File

@ -31,6 +31,7 @@ from paramiko import util
from paramiko.message import Message from paramiko.message import Message
from paramiko.ssh_exception import SSHException from paramiko.ssh_exception import SSHException
from paramiko.file import BufferedFile from paramiko.file import BufferedFile
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe from paramiko import pipe
@ -69,14 +70,12 @@ class Channel (object):
self.active = False self.active = False
self.eof_received = 0 self.eof_received = 0
self.eof_sent = 0 self.eof_sent = 0
self.in_buffer = '' self.in_buffer = BufferedPipe()
self.in_stderr_buffer = '' self.in_stderr_buffer = BufferedPipe()
self.timeout = None self.timeout = None
self.closed = False self.closed = False
self.ultra_debug = False self.ultra_debug = False
self.lock = threading.Lock() self.lock = threading.Lock()
self.in_buffer_cv = threading.Condition(self.lock)
self.in_stderr_buffer_cv = threading.Condition(self.lock)
self.out_buffer_cv = threading.Condition(self.lock) self.out_buffer_cv = threading.Condition(self.lock)
self.in_window_size = 0 self.in_window_size = 0
self.out_window_size = 0 self.out_window_size = 0
@ -368,8 +367,7 @@ class Channel (object):
self.combine_stderr = combine self.combine_stderr = combine
if combine and not old: if combine and not old:
# copy old stderr buffer into primary buffer # copy old stderr buffer into primary buffer
data = self.in_stderr_buffer data = self.in_stderr_buffer.empty()
self.in_stderr_buffer = ''
finally: finally:
self.lock.release() self.lock.release()
if len(data) > 0: if len(data) > 0:
@ -393,7 +391,7 @@ class Channel (object):
C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}. C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}.
@param timeout: seconds to wait for a pending read/write operation @param timeout: seconds to wait for a pending read/write operation
before raising C{socket.timeout}, or C{None} for no timeout. before raising C{socket.timeout}, or C{None} for no timeout.
@type timeout: float @type timeout: float
""" """
self.timeout = timeout self.timeout = timeout
@ -477,13 +475,7 @@ class Channel (object):
return at least one byte; C{False} otherwise. return at least one byte; C{False} otherwise.
@rtype: boolean @rtype: boolean
""" """
self.lock.acquire() return self.in_buffer.read_ready()
try:
if len(self.in_buffer) == 0:
return False
return True
finally:
self.lock.release()
def recv(self, nbytes): def recv(self, nbytes):
""" """
@ -500,38 +492,12 @@ class Channel (object):
@raise socket.timeout: if no data is ready before the timeout set by @raise socket.timeout: if no data is ready before the timeout set by
L{settimeout}. L{settimeout}.
""" """
out = ''
self.lock.acquire()
try: try:
if len(self.in_buffer) == 0: out = self.in_buffer.read(nbytes, self.timeout)
if self.closed or self.eof_received: except PipeTimeout, e:
return out raise socket.timeout()
# should we block?
if self.timeout == 0.0:
raise socket.timeout()
# loop here in case we get woken up but a different thread has grabbed everything in the buffer
timeout = self.timeout
while (len(self.in_buffer) == 0) and not self.closed and not self.eof_received:
then = time.time()
self.in_buffer_cv.wait(timeout)
if timeout != None:
timeout -= time.time() - then
if timeout <= 0.0:
raise socket.timeout()
# something in the buffer and we have the lock
if len(self.in_buffer) <= nbytes:
out = self.in_buffer
self.in_buffer = ''
if (self.pipe is not None) and not (self.closed or self.eof_received):
# clear the pipe, since no more data is buffered
self.pipe.clear()
else:
out = self.in_buffer[:nbytes]
self.in_buffer = self.in_buffer[nbytes:]
ack = self._check_add_window(len(out))
finally:
self.lock.release()
ack = self._check_add_window(len(out))
# no need to hold the channel lock when sending this # no need to hold the channel lock when sending this
if ack > 0: if ack > 0:
m = Message() m = Message()
@ -555,13 +521,7 @@ class Channel (object):
@since: 1.1 @since: 1.1
""" """
self.lock.acquire() return self.in_stderr_buffer.read_ready()
try:
if len(self.in_stderr_buffer) == 0:
return False
return True
finally:
self.lock.release()
def recv_stderr(self, nbytes): def recv_stderr(self, nbytes):
""" """
@ -582,34 +542,20 @@ class Channel (object):
@since: 1.1 @since: 1.1
""" """
out = ''
self.lock.acquire()
try: try:
if len(self.in_stderr_buffer) == 0: out = self.in_stderr_buffer.read(nbytes, self.timeout)
if self.closed or self.eof_received: except PipeTimeout, e:
return out raise socket.timeout()
# should we block?
if self.timeout == 0.0: ack = self._check_add_window(len(out))
raise socket.timeout() # no need to hold the channel lock when sending this
# loop here in case we get woken up but a different thread has grabbed everything in the buffer if ack > 0:
timeout = self.timeout m = Message()
while (len(self.in_stderr_buffer) == 0) and not self.closed and not self.eof_received: m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST))
then = time.time() m.add_int(self.remote_chanid)
self.in_stderr_buffer_cv.wait(timeout) m.add_int(ack)
if timeout != None: self.transport._send_user_message(m)
timeout -= time.time() - then
if timeout <= 0.0:
raise socket.timeout()
# something in the buffer and we have the lock
if len(self.in_stderr_buffer) <= nbytes:
out = self.in_stderr_buffer
self.in_stderr_buffer = ''
else:
out = self.in_stderr_buffer[:nbytes]
self.in_stderr_buffer = self.in_stderr_buffer[nbytes:]
self._check_add_window(len(out))
finally:
self.lock.release()
return out return out
def send(self, s): def send(self, s):
@ -783,8 +729,7 @@ class Channel (object):
return self.pipe.fileno() return self.pipe.fileno()
# create the pipe and feed in any existing data # create the pipe and feed in any existing data
self.pipe = pipe.make_pipe() self.pipe = pipe.make_pipe()
if len(self.in_buffer) > 0: self.in_buffer.set_event(self.pipe)
self.pipe.set()
return self.pipe.fileno() return self.pipe.fileno()
finally: finally:
self.lock.release() self.lock.release()
@ -880,16 +825,7 @@ class Channel (object):
s = m s = m
else: else:
s = m.get_string() s = m.get_string()
self.lock.acquire() self.in_buffer.feed(s)
try:
if self.ultra_debug:
self._log(DEBUG, 'fed %d bytes' % len(s))
if self.pipe is not None:
self.pipe.set()
self.in_buffer += s
self.in_buffer_cv.notifyAll()
finally:
self.lock.release()
def _feed_extended(self, m): def _feed_extended(self, m):
code = m.get_int() code = m.get_int()
@ -899,14 +835,7 @@ class Channel (object):
return return
if self.combine_stderr: if self.combine_stderr:
return self._feed(s) return self._feed(s)
self.lock.acquire() self.in_stderr_buffer.feed(s)
try:
if self.ultra_debug:
self._log(DEBUG, 'fed %d stderr bytes' % len(s))
self.in_stderr_buffer += s
self.in_stderr_buffer_cv.notifyAll()
finally:
self.lock.release()
def _window_adjust(self, m): def _window_adjust(self, m):
nbytes = m.get_int() nbytes = m.get_int()
@ -987,8 +916,8 @@ class Channel (object):
try: try:
if not self.eof_received: if not self.eof_received:
self.eof_received = True self.eof_received = True
self.in_buffer_cv.notifyAll() self.in_buffer.close()
self.in_stderr_buffer_cv.notifyAll() self.in_stderr_buffer.close()
if self.pipe is not None: if self.pipe is not None:
self.pipe.set_forever() self.pipe.set_forever()
finally: finally:
@ -1025,8 +954,8 @@ class Channel (object):
def _set_closed(self): def _set_closed(self):
# you are holding the lock. # you are holding the lock.
self.closed = True self.closed = True
self.in_buffer_cv.notifyAll() self.in_buffer.close()
self.in_stderr_buffer_cv.notifyAll() self.in_stderr_buffer.close()
self.out_buffer_cv.notifyAll() self.out_buffer_cv.notifyAll()
if self.pipe is not None: if self.pipe is not None:
self.pipe.set_forever() self.pipe.set_forever()
@ -1067,19 +996,22 @@ class Channel (object):
self.lock.release() self.lock.release()
def _check_add_window(self, n): def _check_add_window(self, n):
# already holding the lock! self.lock.acquire()
if self.closed or self.eof_received or not self.active: try:
return 0 if self.closed or self.eof_received or not self.active:
if self.ultra_debug: return 0
self._log(DEBUG, 'addwindow %d' % n) if self.ultra_debug:
self.in_window_sofar += n self._log(DEBUG, 'addwindow %d' % n)
if self.in_window_sofar <= self.in_window_threshold: self.in_window_sofar += n
return 0 if self.in_window_sofar <= self.in_window_threshold:
if self.ultra_debug: return 0
self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar) if self.ultra_debug:
out = self.in_window_sofar self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar)
self.in_window_sofar = 0 out = self.in_window_sofar
return out self.in_window_sofar = 0
return out
finally:
self.lock.release()
def _wait_for_send_window(self, size): def _wait_for_send_window(self, size):
""" """