factor out BufferedPipe into its own class
This commit is contained in:
Robey Pointer 2006-04-11 00:39:46 -07:00
parent 017d315bce
commit 9e14a3bf58
2 changed files with 239 additions and 0 deletions

168
paramiko/buffered_pipe.py Normal file
View File

@ -0,0 +1,168 @@
# Copyright (C) 2006 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
# Paramiko is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
"""
Attempt to generalize the "feeder" part of a Channel: an object which can be
read from and closed, but is reading from a buffer fed by another thread. The
read operations are blocking and can have a timeout set.
"""
import array
import threading
import time
class PipeTimeout (IOError):
"""
Indicates that a timeout was reached on a read from a L{BufferedPipe}.
"""
pass
class BufferedPipe (object):
"""
A buffer that obeys normal read (with timeout) & close semantics for a
file or socket, but is fed data from another thread. This is used by
L{Channel}.
"""
def __init__(self):
self._lock = threading.Lock()
self._cv = threading.Condition(self._lock)
self._event = None
self._buffer = array.array('B')
self._closed = False
def feed(self, data):
"""
Feed new data into this pipe. This method is assumed to be called
from a separate thread, so synchronization is done.
@param data: the data to add
@type data: str
"""
self._lock.acquire()
try:
if self._event is not None:
self._event.set()
self._buffer.fromstring(data)
self._cv.notifyAll()
finally:
self._lock.release()
def read_ready(self):
"""
Returns true if data is buffered and ready to be read from this
feeder. A C{False} result does not mean that the feeder has closed;
it means you may need to wait before more data arrives.
@return: C{True} if a L{read} call would immediately return at least
one byte; C{False} otherwise.
@rtype: bool
"""
self._lock.acquire()
try:
if len(self._buffer) == 0:
return False
return True
finally:
self._lock.release()
def read(self, nbytes, timeout=None):
"""
Read data from the pipe. The return value is a string representing
the data received. The maximum amount of data to be received at once
is specified by C{nbytes}. If a string of length zero is returned,
the pipe has been closed.
The optional C{timeout} argument can be a nonnegative float expressing
seconds, or C{None} for no timeout. If a float is given, a
C{PipeTimeout} will be raised if the timeout period value has
elapsed before any data arrives.
@param nbytes: maximum number of bytes to read
@type nbytes: int
@param timeout: maximum seconds to wait (or C{None}, the default, to
wait forever)
@type timeout: float
@return: data
@rtype: str
@raise PipeTimeout: if a timeout was set via L{settimeout} and no
data was ready before that timeout
"""
out = ''
self._lock.acquire()
try:
if len(self._buffer) == 0:
if self._closed:
return out
# should we block?
if timeout == 0.0:
raise PipeTimeout()
# loop here in case we get woken up but a different thread has
# grabbed everything in the buffer.
while (len(self._buffer) == 0) and not self._closed:
then = time.time()
self._cv.wait(timeout)
if timeout is not None:
timeout -= time.time() - then
if timeout <= 0.0:
raise PipeTimeout()
# something's in the buffer and we have the lock!
if len(self._buffer) <= nbytes:
out = self._buffer.tostring()
del self._buffer[:]
if (self._event is not None) and not self._closed:
self._event.clear()
else:
out = self._buffer[:nbytes].tostring()
del self.in_buffer[:nbytes]
finally:
self._lock.release()
return out
def close(self):
"""
Close this pipe object. Future calls to L{read} after the buffer
has been emptied will return immediately with an empty string.
"""
self._lock.acquire()
try:
self._closed = True
self._cv.notifyAll()
if self._event is not None:
self._event.set()
finally:
self._lock.release()
def __len__(self):
"""
Return the number of bytes buffered.
@return: number of bytes bufferes
@rtype: int
"""
self._lock.acquire()
try:
return len(self._buffer)
finally:
self._lock.release()

View File

@ -0,0 +1,71 @@
# Copyright (C) 2006 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
# Paramiko is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
"""
Some unit tests for BufferedPipe.
"""
import threading
import time
import unittest
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
def delay_thread(pipe):
pipe.feed('a')
time.sleep(0.5)
pipe.feed('b')
pipe.close()
def close_thread(pipe):
time.sleep(0.2)
pipe.close()
class BufferedPipeTest (unittest.TestCase):
def test_1_buffered_pipe(self):
p = BufferedPipe()
self.assert_(not p.read_ready())
p.feed('hello.')
self.assert_(p.read_ready())
data = p.read(6)
self.assertEquals('hello.', data)
p.close()
self.assert_(not p.read_ready())
self.assertEquals('', p.read(1))
def test_2_delay(self):
p = BufferedPipe()
self.assert_(not p.read_ready())
threading.Thread(target=delay_thread, args=(p,)).start()
self.assertEquals('a', p.read(1, 0.1))
try:
p.read(1, 0.1)
self.assert_(False)
except PipeTimeout:
pass
self.assertEquals('b', p.read(1, 0.5))
self.assertEquals('', p.read(1))
def test_3_close_while_reading(self):
p = BufferedPipe()
threading.Thread(target=close_thread, args=(p,)).start()
data = p.read(1, 1.0)
self.assertEquals('', data)