From e3d9b90ea1f25792e652a0809b37b52635243932 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Sun, 28 Oct 2007 20:03:44 -0700 Subject: [PATCH] [project @ robey@lag.net-20071029030344-9adfzb9ulfodtepu] bug 157205: select() doesn't notify incoming stderr data, because stderr's pipe isn't hooked up to the fileno() BufferedPipe. to fix, i added an "or" pipe-event that can be triggered by either stdout or stderr, and hooked them both up to fileno(). added a unit test for the bug and one for the "or" pipe. --- paramiko/channel.py | 10 ++++++---- paramiko/pipe.py | 38 +++++++++++++++++++++++++++++++++++-- tests/test_buffered_pipe.py | 15 +++++++++++++++ tests/test_transport.py | 38 +++++++++++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 6 deletions(-) diff --git a/paramiko/channel.py b/paramiko/channel.py index 3462465..052e487 100644 --- a/paramiko/channel.py +++ b/paramiko/channel.py @@ -782,14 +782,14 @@ class Channel (object): def fileno(self): """ Returns an OS-level file descriptor which can be used for polling, but - but I{not} for reading or writing). This is primaily to allow python's + but I{not} for reading or writing. This is primaily to allow python's C{select} module to work. The first time C{fileno} is called on a channel, a pipe is created to simulate real OS-level file descriptor (FD) behavior. Because of this, two OS-level FDs are created, which will use up FDs faster than normal. - You won't notice this effect unless you open hundreds or thousands of - channels simultaneously, but it's still notable. + (You won't notice this effect unless you have hundreds of channels + open at the same time.) @return: an OS-level file descriptor @rtype: int @@ -803,7 +803,9 @@ class Channel (object): return self._pipe.fileno() # create the pipe and feed in any existing data self._pipe = pipe.make_pipe() - self.in_buffer.set_event(self._pipe) + p1, p2 = pipe.make_or_pipe(self._pipe) + self.in_buffer.set_event(p1) + self.in_stderr_buffer.set_event(p2) return self._pipe.fileno() finally: self.lock.release() diff --git a/paramiko/pipe.py b/paramiko/pipe.py index d71ca16..1cfed2d 100644 --- a/paramiko/pipe.py +++ b/paramiko/pipe.py @@ -19,6 +19,9 @@ """ Abstraction of a one-way pipe where the read end can be used in select(). Normally this is trivial, but Windows makes it nearly impossible. + +The pipe acts like an Event, which can be set or cleared. When set, the pipe +will trigger as readable in select(). """ import sys @@ -57,7 +60,7 @@ class PosixPipe (object): self._set = False def set (self): - if self._set: + if self._set or self._closed: return self._set = True os.write(self._wfd, '*') @@ -103,7 +106,7 @@ class WindowsPipe (object): self._set = False def set (self): - if self._set: + if self._set or self._closed: return self._set = True self._wsock.send('*') @@ -111,3 +114,34 @@ class WindowsPipe (object): def set_forever (self): self._forever = True self.set() + + +class OrPipe (object): + def __init__(self, pipe): + self._set = False + self._partner = None + self._pipe = pipe + + def set(self): + self._set = True + if not self._partner._set: + self._pipe.set() + + def clear(self): + self._set = False + if not self._partner._set: + self._pipe.clear() + + +def make_or_pipe(pipe): + """ + wraps a pipe into two pipe-like objects which are "or"d together to + affect the real pipe. if either returned pipe is set, the wrapped pipe + is set. when both are cleared, the wrapped pipe is cleared. + """ + p1 = OrPipe(pipe) + p2 = OrPipe(pipe) + p1._partner = p2 + p2._partner = p1 + return p1, p2 + diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py index ac12352..bef8fb8 100644 --- a/tests/test_buffered_pipe.py +++ b/tests/test_buffered_pipe.py @@ -24,6 +24,7 @@ import threading import time import unittest from paramiko.buffered_pipe import BufferedPipe, PipeTimeout +from paramiko import pipe def delay_thread(pipe): @@ -75,3 +76,17 @@ class BufferedPipeTest (unittest.TestCase): threading.Thread(target=close_thread, args=(p,)).start() data = p.read(1, 1.0) self.assertEquals('', data) + + def test_4_or_pipe(self): + p = pipe.make_pipe() + p1, p2 = pipe.make_or_pipe(p) + self.assertFalse(p._set) + p1.set() + self.assertTrue(p._set) + p2.set() + self.assertTrue(p._set) + p1.clear() + self.assertTrue(p._set) + p2.clear() + self.assertFalse(p._set) + diff --git a/tests/test_transport.py b/tests/test_transport.py index 53e6969..6aaf738 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -639,3 +639,41 @@ class TransportTest (unittest.TestCase): self.tc.cancel_port_forward('', port) self.assertTrue(self.server._listen is None) + def test_K_stderr_select(self): + """ + verify that select() on a channel works even if only stderr is + receiving data. + """ + self.setup_test_server() + chan = self.tc.open_session() + chan.invoke_shell() + schan = self.ts.accept(1.0) + + # nothing should be ready + r, w, e = select.select([chan], [], [], 0.1) + self.assertEquals([], r) + self.assertEquals([], w) + self.assertEquals([], e) + + schan.send_stderr('hello\n') + + # something should be ready now (give it 1 second to appear) + for i in range(10): + r, w, e = select.select([chan], [], [], 0.1) + if chan in r: + break + time.sleep(0.1) + self.assertEquals([chan], r) + self.assertEquals([], w) + self.assertEquals([], e) + + self.assertEquals('hello\n', chan.recv_stderr(6)) + + # and, should be dead again now + r, w, e = select.select([chan], [], [], 0.1) + self.assertEquals([], r) + self.assertEquals([], w) + self.assertEquals([], e) + + schan.close() + chan.close()