From c5d2e9904aa830874e835b2f35a6d97cc72ba0b7 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Tue, 28 Jun 2005 03:42:14 +0000 Subject: [PATCH] [project @ Arch-1:robey@lag.net--2005-master-shake%paramiko--dev--1--patch-13] change pipe method to something that probably works on windows (the old system did not); also fix a race in _unlink --- paramiko/channel.py | 70 +++++++++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 22 deletions(-) diff --git a/paramiko/channel.py b/paramiko/channel.py index cd866c0..025e159 100644 --- a/paramiko/channel.py +++ b/paramiko/channel.py @@ -1,5 +1,3 @@ -#!/usr/bin/python - # Copyright (C) 2003-2005 Robey Pointer # # This file is part of paramiko. @@ -22,7 +20,7 @@ Abstraction for an SSH2 channel. """ -import time, threading, socket, os +import sys, time, threading, socket, os from common import * import util @@ -62,7 +60,7 @@ class Channel (object): """ self.chanid = chanid self.transport = None - self.active = 0 + self.active = False self.eof_received = 0 self.eof_sent = 0 self.in_buffer = '' @@ -78,6 +76,8 @@ class Channel (object): self.name = str(chanid) self.logger = util.get_logger('paramiko.chan.' + str(chanid)) self.pipe_rfd = self.pipe_wfd = None + # for windows: + self.pipe_rsock = self.pipe_wsock = None self.pipe_set = False self.event = threading.Event() self.combine_stderr = False @@ -136,7 +136,7 @@ class Channel (object): m.add_string('') self.event.clear() self.transport._send_user_message(m) - while 1: + while True: self.event.wait(0.1) if self.closed: return False @@ -221,7 +221,7 @@ class Channel (object): m.add_string(subsystem) self.event.clear() self.transport._send_user_message(m) - while 1: + while True: self.event.wait(0.1) if self.closed: return False @@ -515,9 +515,9 @@ class Channel (object): if len(self.in_buffer) <= nbytes: out = self.in_buffer self.in_buffer = '' - if self.pipe_rfd != None: - # clear the pipe, since no more data is buffered - self._clear_pipe() + if self.pipe_rfd != None: + # clear the pipe, since no more data is buffered + self._clear_pipe() else: out = self.in_buffer[:nbytes] self.in_buffer = self.in_buffer[nbytes:] @@ -766,9 +766,9 @@ class Channel (object): if self.pipe_rfd != None: return self.pipe_rfd # create the pipe and feed in any existing data - self.pipe_rfd, self.pipe_wfd = os.pipe() + self.pipe_rfd, self.pipe_wfd = self._make_pipe() if len(self.in_buffer) > 0: - self._set_pipe() + self._set_pipe() return self.pipe_rfd finally: self.lock.release() @@ -1010,23 +1010,24 @@ class Channel (object): def _set_pipe(self): "you are already holding the lock" - if self.pipe_set: - return - self.pipe_set = True - os.write(self.pipe_wfd, '*') + if self.pipe_set: + return + self.pipe_set = True + os.write(self.pipe_wfd, '*') def _clear_pipe(self): "you are already holding the lock" - if not self.pipe_set: - return - os.read(self.pipe_rfd, 1) - self.pipe_set = False + if not self.pipe_set: + return + os.read(self.pipe_rfd, 1) + self.pipe_set = False def _unlink(self): - if self.closed or not self.active: + # server connection could die before we become active: still signal the close! + if self.closed: return + self.lock.acquire() try: - self.lock.acquire() self._set_closed() self.transport._unlink_channel(self.chanid) finally: @@ -1076,7 +1077,7 @@ class Channel (object): if timeout <= 0.0: raise socket.timeout() # we have some window to squeeze into - if self.closed: + if self.closed or self.eof_sent: return 0 if self.out_window_size < size: size = self.out_window_size @@ -1087,6 +1088,31 @@ class Channel (object): self._log(DEBUG, 'window down to %d' % self.out_window_size) return size + def _make_pipe (self): + """ + Create a pipe in such a way that the readable end may be used in select() + on the host OS. For posix (Linux, MacOS, etc) this means just returning + an OS-level pipe. For Windows, we need to do some convolutions to create + an actual OS-level "WinSock", because on Windows, only a "WinSock" may be + selected on. Sigh. + + @return (read_end, write_end) tuple + """ + if sys.platform[:3] != 'win': + return os.pipe() + serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serv.bind(('127.0.0.1', 0)) + serv.listen(1) + + # need to save sockets in pipe_rsock/pipe_wsock so they don't get closed + self.pipe_rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.pipe_rsock.connect(('127.0.0.1', serv.getsockname()[1])) + + self.pipe_wsock, addr = serv.accept() + serv.close() + return self.pipe_rsock.fileno(), self.pipe_wsock.fileno() + + class ChannelFile (BufferedFile): """