[project @ Arch-1:robey@lag.net--2005-master-shake%paramiko--dev--1--patch-24]

the previous windows pipe fix still didn't work.  replace it with a new pipe.py abstraction of pipes (one for posix, one for windows) which appears to finally work on windows.  for real this time.  also add some more documentation to Channel to explain that after exec_command, invoke_shell, or invoke_subsystem, a Channel can't be reused.
This commit is contained in:
Robey Pointer 2005-07-07 01:03:49 +00:00
parent de81b40e7d
commit 0b093e49b4
2 changed files with 124 additions and 58 deletions

View File

@ -27,6 +27,7 @@ import util
from message import Message
from ssh_exception import SSHException
from file import BufferedFile
import pipe
class Channel (object):
@ -75,10 +76,7 @@ class Channel (object):
self.status_event = threading.Event()
self.name = str(chanid)
self.logger = util.get_logger('paramiko.chan.' + str(chanid))
self.pipe_rfd = self.pipe_wfd = None
# for windows:
self.pipe_rsock = self.pipe_wsock = None
self.pipe_set = False
self.pipe = None
self.event = threading.Event()
self.combine_stderr = False
self.exit_status = -1
@ -153,6 +151,9 @@ class Channel (object):
shell will operate through the pty, and the channel will be connected
to the stdin and stdout of the pty.
When the shell exits, the channel will be closed and can't be reused.
You must open a new channel if you wish to open another shell.
@return: C{True} if the operation succeeded; C{False} if not.
@rtype: bool
"""
@ -165,7 +166,7 @@ class Channel (object):
m.add_boolean(1)
self.event.clear()
self.transport._send_user_message(m)
while 1:
while True:
self.event.wait(0.1)
if self.closed:
return False
@ -178,6 +179,10 @@ class Channel (object):
will then be directly connected to the stdin, stdout, and stderr of
the command being executed.
When the command finishes executing, the channel will be closed and
can't be reused. You must open a new channel if you wish to execute
another command.
@param command: a shell command to execute.
@type command: str
@return: C{True} if the operation succeeded; C{False} if not.
@ -193,7 +198,7 @@ class Channel (object):
m.add_string(command)
self.event.clear()
self.transport._send_user_message(m)
while 1:
while True:
self.event.wait(0.1)
if self.closed:
return False
@ -206,6 +211,9 @@ class Channel (object):
server allows it, the channel will then be directly connected to the
requested subsystem.
When the subsystem finishes, the channel will be closed and can't be
reused.
@param subsystem: name of the subsystem being requested.
@type subsystem: str
@return: C{True} if the operation succeeded; C{False} if not.
@ -515,9 +523,9 @@ class Channel (object):
if len(self.in_buffer) <= nbytes:
out = self.in_buffer
self.in_buffer = ''
if self.pipe_rfd != None:
if self.pipe is not None:
# clear the pipe, since no more data is buffered
self._clear_pipe()
self.pipe.clear()
else:
out = self.in_buffer[:nbytes]
self.in_buffer = self.in_buffer[nbytes:]
@ -763,13 +771,13 @@ class Channel (object):
"""
self.lock.acquire()
try:
if self.pipe_rfd != None:
return self.pipe_rfd
if self.pipe is not None:
return self.pipe.fileno()
# create the pipe and feed in any existing data
self.pipe_rfd, self.pipe_wfd = self._make_pipe()
self.pipe = pipe.make_pipe()
if len(self.in_buffer) > 0:
self._set_pipe()
return self.pipe_rfd
self.pipe.set()
return self.pipe.fileno()
finally:
self.lock.release()
@ -859,8 +867,8 @@ class Channel (object):
try:
if self.ultra_debug:
self._log(DEBUG, 'fed %d bytes' % len(s))
if self.pipe_wfd != None:
self._set_pipe()
if self.pipe is not None:
self.pipe.set()
self.in_buffer += s
self.in_buffer_cv.notifyAll()
finally:
@ -964,9 +972,9 @@ class Channel (object):
self.eof_received = True
self.in_buffer_cv.notifyAll()
self.in_stderr_buffer_cv.notifyAll()
if self.pipe_wfd != None:
os.close(self.pipe_wfd)
self.pipe_wfd = None
if self.pipe is not None:
self.pipe.close()
self.pipe = None
finally:
self.lock.release()
self._log(DEBUG, 'EOF received')
@ -976,9 +984,9 @@ class Channel (object):
self.lock.acquire()
try:
self.transport._unlink_channel(self.chanid)
if self.pipe_wfd != None:
os.close(self.pipe_wfd)
self.pipe_wfd = None
if self.pipe is not None:
self.pipe.close()
self.pipe = None
finally:
self.lock.release()
@ -1008,20 +1016,6 @@ class Channel (object):
self._log(DEBUG, 'EOF sent')
return
def _set_pipe(self):
"you are already holding the lock"
if self.pipe_set:
return
self.pipe_set = True
os.write(self.pipe_wfd, '*')
def _clear_pipe(self):
"you are already holding the lock"
if not self.pipe_set:
return
os.read(self.pipe_rfd, 1)
self.pipe_set = False
def _unlink(self):
# server connection could die before we become active: still signal the close!
if self.closed:
@ -1088,29 +1082,6 @@ class Channel (object):
self._log(DEBUG, 'window down to %d' % self.out_window_size)
return size
def _make_pipe (self):
"""
Create a pipe in such a way that the readable end may be used in select()
on the host OS. For posix (Linux, MacOS, etc) this means just returning
an OS-level pipe. For Windows, we need to do some convolutions to create
an actual OS-level "WinSock", because on Windows, only a "WinSock" may be
selected on. Sigh.
@return: (read_end, write_end) tuple
"""
if sys.platform[:3] != 'win':
return os.pipe()
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.bind(('127.0.0.1', 0))
serv.listen(1)
# need to save sockets in pipe_rsock/pipe_wsock so they don't get closed
self.pipe_rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.pipe_rsock.connect(('127.0.0.1', serv.getsockname()[1]))
self.pipe_wsock, addr = serv.accept()
serv.close()
return self.pipe_rsock.fileno(), self.pipe_wsock.fileno()

95
paramiko/pipe.py Normal file
View File

@ -0,0 +1,95 @@
# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
# Paramiko is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
"""
Abstraction of a one-way pipe where the read end can be used in select().
Normally this is trivial, but Windows makes it nearly impossible.
"""
import sys
import os
import socket
def make_pipe ():
# if sys.platform[:3] != 'win':
# return PosixPipe()
return WindowsPipe()
class PosixPipe (object):
def __init__ (self):
self._rfd, self._wfd = os.pipe()
self._set = False
def close (self):
os.close(self._rfd)
os.close(self._wfd)
def fileno (self):
return self._rfd
def clear (self):
if not self._set:
return
os.read(self._rfd, 1)
self._set = False
def set (self):
if self._set:
return
self._set = True
os.write(self._wfd, '*')
class WindowsPipe (object):
"""
On Windows, only an OS-level "WinSock" may be used in select(), but reads
and writes must be to the actual socket object.
"""
def __init__ (self):
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.bind(('127.0.0.1', 0))
serv.listen(1)
# need to save sockets in pipe_rsock/pipe_wsock so they don't get closed
self._rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._rsock.connect(('127.0.0.1', serv.getsockname()[1]))
self._wsock, addr = serv.accept()
serv.close()
self._set = False
def close (self):
self._rsock.close()
self._wsock.close()
def fileno (self):
return self._rsock.fileno()
def clear (self):
if not self._set:
return
self._rsock.recv(1)
self._set = False
def set (self):
if self._set:
return
self._set = True
self._wsock.send('*')