bug 157205: select() doesn't notify incoming stderr data, because stderr's pipe isn't hooked up to the fileno() BufferedPipe. to fix, i added an "or" pipe-event that can be triggered by either stdout or stderr, and hooked them both up to fileno(). added a unit test for the bug and one for the "or" pipe.
This commit is contained in:
parent
80b9e289ce
commit
e3d9b90ea1
|
@ -782,14 +782,14 @@ class Channel (object):
|
|||
def fileno(self):
|
||||
"""
|
||||
Returns an OS-level file descriptor which can be used for polling, but
|
||||
but I{not} for reading or writing). This is primaily to allow python's
|
||||
but I{not} for reading or writing. This is primaily to allow python's
|
||||
C{select} module to work.
|
||||
|
||||
The first time C{fileno} is called on a channel, a pipe is created to
|
||||
simulate real OS-level file descriptor (FD) behavior. Because of this,
|
||||
two OS-level FDs are created, which will use up FDs faster than normal.
|
||||
You won't notice this effect unless you open hundreds or thousands of
|
||||
channels simultaneously, but it's still notable.
|
||||
(You won't notice this effect unless you have hundreds of channels
|
||||
open at the same time.)
|
||||
|
||||
@return: an OS-level file descriptor
|
||||
@rtype: int
|
||||
|
@ -803,7 +803,9 @@ class Channel (object):
|
|||
return self._pipe.fileno()
|
||||
# create the pipe and feed in any existing data
|
||||
self._pipe = pipe.make_pipe()
|
||||
self.in_buffer.set_event(self._pipe)
|
||||
p1, p2 = pipe.make_or_pipe(self._pipe)
|
||||
self.in_buffer.set_event(p1)
|
||||
self.in_stderr_buffer.set_event(p2)
|
||||
return self._pipe.fileno()
|
||||
finally:
|
||||
self.lock.release()
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
"""
|
||||
Abstraction of a one-way pipe where the read end can be used in select().
|
||||
Normally this is trivial, but Windows makes it nearly impossible.
|
||||
|
||||
The pipe acts like an Event, which can be set or cleared. When set, the pipe
|
||||
will trigger as readable in select().
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
@ -57,7 +60,7 @@ class PosixPipe (object):
|
|||
self._set = False
|
||||
|
||||
def set (self):
|
||||
if self._set:
|
||||
if self._set or self._closed:
|
||||
return
|
||||
self._set = True
|
||||
os.write(self._wfd, '*')
|
||||
|
@ -103,7 +106,7 @@ class WindowsPipe (object):
|
|||
self._set = False
|
||||
|
||||
def set (self):
|
||||
if self._set:
|
||||
if self._set or self._closed:
|
||||
return
|
||||
self._set = True
|
||||
self._wsock.send('*')
|
||||
|
@ -111,3 +114,34 @@ class WindowsPipe (object):
|
|||
def set_forever (self):
|
||||
self._forever = True
|
||||
self.set()
|
||||
|
||||
|
||||
class OrPipe (object):
|
||||
def __init__(self, pipe):
|
||||
self._set = False
|
||||
self._partner = None
|
||||
self._pipe = pipe
|
||||
|
||||
def set(self):
|
||||
self._set = True
|
||||
if not self._partner._set:
|
||||
self._pipe.set()
|
||||
|
||||
def clear(self):
|
||||
self._set = False
|
||||
if not self._partner._set:
|
||||
self._pipe.clear()
|
||||
|
||||
|
||||
def make_or_pipe(pipe):
|
||||
"""
|
||||
wraps a pipe into two pipe-like objects which are "or"d together to
|
||||
affect the real pipe. if either returned pipe is set, the wrapped pipe
|
||||
is set. when both are cleared, the wrapped pipe is cleared.
|
||||
"""
|
||||
p1 = OrPipe(pipe)
|
||||
p2 = OrPipe(pipe)
|
||||
p1._partner = p2
|
||||
p2._partner = p1
|
||||
return p1, p2
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import threading
|
|||
import time
|
||||
import unittest
|
||||
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
|
||||
from paramiko import pipe
|
||||
|
||||
|
||||
def delay_thread(pipe):
|
||||
|
@ -75,3 +76,17 @@ class BufferedPipeTest (unittest.TestCase):
|
|||
threading.Thread(target=close_thread, args=(p,)).start()
|
||||
data = p.read(1, 1.0)
|
||||
self.assertEquals('', data)
|
||||
|
||||
def test_4_or_pipe(self):
|
||||
p = pipe.make_pipe()
|
||||
p1, p2 = pipe.make_or_pipe(p)
|
||||
self.assertFalse(p._set)
|
||||
p1.set()
|
||||
self.assertTrue(p._set)
|
||||
p2.set()
|
||||
self.assertTrue(p._set)
|
||||
p1.clear()
|
||||
self.assertTrue(p._set)
|
||||
p2.clear()
|
||||
self.assertFalse(p._set)
|
||||
|
||||
|
|
|
@ -639,3 +639,41 @@ class TransportTest (unittest.TestCase):
|
|||
self.tc.cancel_port_forward('', port)
|
||||
self.assertTrue(self.server._listen is None)
|
||||
|
||||
def test_K_stderr_select(self):
|
||||
"""
|
||||
verify that select() on a channel works even if only stderr is
|
||||
receiving data.
|
||||
"""
|
||||
self.setup_test_server()
|
||||
chan = self.tc.open_session()
|
||||
chan.invoke_shell()
|
||||
schan = self.ts.accept(1.0)
|
||||
|
||||
# nothing should be ready
|
||||
r, w, e = select.select([chan], [], [], 0.1)
|
||||
self.assertEquals([], r)
|
||||
self.assertEquals([], w)
|
||||
self.assertEquals([], e)
|
||||
|
||||
schan.send_stderr('hello\n')
|
||||
|
||||
# something should be ready now (give it 1 second to appear)
|
||||
for i in range(10):
|
||||
r, w, e = select.select([chan], [], [], 0.1)
|
||||
if chan in r:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
self.assertEquals([chan], r)
|
||||
self.assertEquals([], w)
|
||||
self.assertEquals([], e)
|
||||
|
||||
self.assertEquals('hello\n', chan.recv_stderr(6))
|
||||
|
||||
# and, should be dead again now
|
||||
r, w, e = select.select([chan], [], [], 0.1)
|
||||
self.assertEquals([], r)
|
||||
self.assertEquals([], w)
|
||||
self.assertEquals([], e)
|
||||
|
||||
schan.close()
|
||||
chan.close()
|
||||
|
|
Loading…
Reference in New Issue