[project @ Arch-1:robey@lag.net--2005-master-shake%paramiko--dev--1--patch-13]

change pipe method to something that probably works on windows (the old system did not); also fix a race in _unlink
This commit is contained in:
Robey Pointer 2005-06-28 03:42:14 +00:00
parent 139051f815
commit c5d2e9904a
1 changed files with 48 additions and 22 deletions

View File

@ -1,5 +1,3 @@
#!/usr/bin/python
# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> # Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
# #
# This file is part of paramiko. # This file is part of paramiko.
@ -22,7 +20,7 @@
Abstraction for an SSH2 channel. Abstraction for an SSH2 channel.
""" """
import time, threading, socket, os import sys, time, threading, socket, os
from common import * from common import *
import util import util
@ -62,7 +60,7 @@ class Channel (object):
""" """
self.chanid = chanid self.chanid = chanid
self.transport = None self.transport = None
self.active = 0 self.active = False
self.eof_received = 0 self.eof_received = 0
self.eof_sent = 0 self.eof_sent = 0
self.in_buffer = '' self.in_buffer = ''
@ -78,6 +76,8 @@ class Channel (object):
self.name = str(chanid) self.name = str(chanid)
self.logger = util.get_logger('paramiko.chan.' + str(chanid)) self.logger = util.get_logger('paramiko.chan.' + str(chanid))
self.pipe_rfd = self.pipe_wfd = None self.pipe_rfd = self.pipe_wfd = None
# for windows:
self.pipe_rsock = self.pipe_wsock = None
self.pipe_set = False self.pipe_set = False
self.event = threading.Event() self.event = threading.Event()
self.combine_stderr = False self.combine_stderr = False
@ -136,7 +136,7 @@ class Channel (object):
m.add_string('') m.add_string('')
self.event.clear() self.event.clear()
self.transport._send_user_message(m) self.transport._send_user_message(m)
while 1: while True:
self.event.wait(0.1) self.event.wait(0.1)
if self.closed: if self.closed:
return False return False
@ -221,7 +221,7 @@ class Channel (object):
m.add_string(subsystem) m.add_string(subsystem)
self.event.clear() self.event.clear()
self.transport._send_user_message(m) self.transport._send_user_message(m)
while 1: while True:
self.event.wait(0.1) self.event.wait(0.1)
if self.closed: if self.closed:
return False return False
@ -515,9 +515,9 @@ class Channel (object):
if len(self.in_buffer) <= nbytes: if len(self.in_buffer) <= nbytes:
out = self.in_buffer out = self.in_buffer
self.in_buffer = '' self.in_buffer = ''
if self.pipe_rfd != None: if self.pipe_rfd != None:
# clear the pipe, since no more data is buffered # clear the pipe, since no more data is buffered
self._clear_pipe() self._clear_pipe()
else: else:
out = self.in_buffer[:nbytes] out = self.in_buffer[:nbytes]
self.in_buffer = self.in_buffer[nbytes:] self.in_buffer = self.in_buffer[nbytes:]
@ -766,9 +766,9 @@ class Channel (object):
if self.pipe_rfd != None: if self.pipe_rfd != None:
return self.pipe_rfd return self.pipe_rfd
# create the pipe and feed in any existing data # create the pipe and feed in any existing data
self.pipe_rfd, self.pipe_wfd = os.pipe() self.pipe_rfd, self.pipe_wfd = self._make_pipe()
if len(self.in_buffer) > 0: if len(self.in_buffer) > 0:
self._set_pipe() self._set_pipe()
return self.pipe_rfd return self.pipe_rfd
finally: finally:
self.lock.release() self.lock.release()
@ -1010,23 +1010,24 @@ class Channel (object):
def _set_pipe(self): def _set_pipe(self):
"you are already holding the lock" "you are already holding the lock"
if self.pipe_set: if self.pipe_set:
return return
self.pipe_set = True self.pipe_set = True
os.write(self.pipe_wfd, '*') os.write(self.pipe_wfd, '*')
def _clear_pipe(self): def _clear_pipe(self):
"you are already holding the lock" "you are already holding the lock"
if not self.pipe_set: if not self.pipe_set:
return return
os.read(self.pipe_rfd, 1) os.read(self.pipe_rfd, 1)
self.pipe_set = False self.pipe_set = False
def _unlink(self): def _unlink(self):
if self.closed or not self.active: # server connection could die before we become active: still signal the close!
if self.closed:
return return
self.lock.acquire()
try: try:
self.lock.acquire()
self._set_closed() self._set_closed()
self.transport._unlink_channel(self.chanid) self.transport._unlink_channel(self.chanid)
finally: finally:
@ -1076,7 +1077,7 @@ class Channel (object):
if timeout <= 0.0: if timeout <= 0.0:
raise socket.timeout() raise socket.timeout()
# we have some window to squeeze into # we have some window to squeeze into
if self.closed: if self.closed or self.eof_sent:
return 0 return 0
if self.out_window_size < size: if self.out_window_size < size:
size = self.out_window_size size = self.out_window_size
@ -1087,6 +1088,31 @@ class Channel (object):
self._log(DEBUG, 'window down to %d' % self.out_window_size) self._log(DEBUG, 'window down to %d' % self.out_window_size)
return size return size
def _make_pipe (self):
"""
Create a pipe in such a way that the readable end may be used in select()
on the host OS. For posix (Linux, MacOS, etc) this means just returning
an OS-level pipe. For Windows, we need to do some convolutions to create
an actual OS-level "WinSock", because on Windows, only a "WinSock" may be
selected on. Sigh.
@return (read_end, write_end) tuple
"""
if sys.platform[:3] != 'win':
return os.pipe()
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.bind(('127.0.0.1', 0))
serv.listen(1)
# need to save sockets in pipe_rsock/pipe_wsock so they don't get closed
self.pipe_rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.pipe_rsock.connect(('127.0.0.1', serv.getsockname()[1]))
self.pipe_wsock, addr = serv.accept()
serv.close()
return self.pipe_rsock.fileno(), self.pipe_wsock.fileno()
class ChannelFile (BufferedFile): class ChannelFile (BufferedFile):
""" """