diff --git a/paramiko/buffered_pipe.py b/paramiko/buffered_pipe.py new file mode 100644 index 0000000..8e1aacd --- /dev/null +++ b/paramiko/buffered_pipe.py @@ -0,0 +1,168 @@ +# Copyright (C) 2006 Robey Pointer +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Attempt to generalize the "feeder" part of a Channel: an object which can be +read from and closed, but is reading from a buffer fed by another thread. The +read operations are blocking and can have a timeout set. +""" + +import array +import threading +import time + + +class PipeTimeout (IOError): + """ + Indicates that a timeout was reached on a read from a L{BufferedPipe}. + """ + pass + + +class BufferedPipe (object): + """ + A buffer that obeys normal read (with timeout) & close semantics for a + file or socket, but is fed data from another thread. This is used by + L{Channel}. + """ + + def __init__(self): + self._lock = threading.Lock() + self._cv = threading.Condition(self._lock) + self._event = None + self._buffer = array.array('B') + self._closed = False + + def feed(self, data): + """ + Feed new data into this pipe. This method is assumed to be called + from a separate thread, so synchronization is done. + + @param data: the data to add + @type data: str + """ + self._lock.acquire() + try: + if self._event is not None: + self._event.set() + self._buffer.fromstring(data) + self._cv.notifyAll() + finally: + self._lock.release() + + def read_ready(self): + """ + Returns true if data is buffered and ready to be read from this + feeder. A C{False} result does not mean that the feeder has closed; + it means you may need to wait before more data arrives. + + @return: C{True} if a L{read} call would immediately return at least + one byte; C{False} otherwise. + @rtype: bool + """ + self._lock.acquire() + try: + if len(self._buffer) == 0: + return False + return True + finally: + self._lock.release() + + def read(self, nbytes, timeout=None): + """ + Read data from the pipe. The return value is a string representing + the data received. The maximum amount of data to be received at once + is specified by C{nbytes}. If a string of length zero is returned, + the pipe has been closed. + + The optional C{timeout} argument can be a nonnegative float expressing + seconds, or C{None} for no timeout. If a float is given, a + C{PipeTimeout} will be raised if the timeout period value has + elapsed before any data arrives. + + @param nbytes: maximum number of bytes to read + @type nbytes: int + @param timeout: maximum seconds to wait (or C{None}, the default, to + wait forever) + @type timeout: float + @return: data + @rtype: str + + @raise PipeTimeout: if a timeout was set via L{settimeout} and no + data was ready before that timeout + """ + out = '' + self._lock.acquire() + try: + if len(self._buffer) == 0: + if self._closed: + return out + # should we block? + if timeout == 0.0: + raise PipeTimeout() + # loop here in case we get woken up but a different thread has + # grabbed everything in the buffer. + while (len(self._buffer) == 0) and not self._closed: + then = time.time() + self._cv.wait(timeout) + if timeout is not None: + timeout -= time.time() - then + if timeout <= 0.0: + raise PipeTimeout() + + # something's in the buffer and we have the lock! + if len(self._buffer) <= nbytes: + out = self._buffer.tostring() + del self._buffer[:] + if (self._event is not None) and not self._closed: + self._event.clear() + else: + out = self._buffer[:nbytes].tostring() + del self.in_buffer[:nbytes] + finally: + self._lock.release() + + return out + + def close(self): + """ + Close this pipe object. Future calls to L{read} after the buffer + has been emptied will return immediately with an empty string. + """ + self._lock.acquire() + try: + self._closed = True + self._cv.notifyAll() + if self._event is not None: + self._event.set() + finally: + self._lock.release() + + def __len__(self): + """ + Return the number of bytes buffered. + + @return: number of bytes bufferes + @rtype: int + """ + self._lock.acquire() + try: + return len(self._buffer) + finally: + self._lock.release() + diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py new file mode 100644 index 0000000..8e4a428 --- /dev/null +++ b/tests/test_buffered_pipe.py @@ -0,0 +1,71 @@ +# Copyright (C) 2006 Robey Pointer +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for BufferedPipe. +""" + +import threading +import time +import unittest +from paramiko.buffered_pipe import BufferedPipe, PipeTimeout + + +def delay_thread(pipe): + pipe.feed('a') + time.sleep(0.5) + pipe.feed('b') + pipe.close() + + +def close_thread(pipe): + time.sleep(0.2) + pipe.close() + + +class BufferedPipeTest (unittest.TestCase): + + def test_1_buffered_pipe(self): + p = BufferedPipe() + self.assert_(not p.read_ready()) + p.feed('hello.') + self.assert_(p.read_ready()) + data = p.read(6) + self.assertEquals('hello.', data) + p.close() + self.assert_(not p.read_ready()) + self.assertEquals('', p.read(1)) + + def test_2_delay(self): + p = BufferedPipe() + self.assert_(not p.read_ready()) + threading.Thread(target=delay_thread, args=(p,)).start() + self.assertEquals('a', p.read(1, 0.1)) + try: + p.read(1, 0.1) + self.assert_(False) + except PipeTimeout: + pass + self.assertEquals('b', p.read(1, 0.5)) + self.assertEquals('', p.read(1)) + + def test_3_close_while_reading(self): + p = BufferedPipe() + threading.Thread(target=close_thread, args=(p,)).start() + data = p.read(1, 1.0) + self.assertEquals('', data)