1162 lines
41 KiB
Python
1162 lines
41 KiB
Python
# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
|
|
#
|
|
# This file is part of paramiko.
|
|
#
|
|
# Paramiko is free software; you can redistribute it and/or modify it under the
|
|
# terms of the GNU Lesser General Public License as published by the Free
|
|
# Software Foundation; either version 2.1 of the License, or (at your option)
|
|
# any later version.
|
|
#
|
|
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
|
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
|
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
|
|
|
|
"""
|
|
Abstraction for an SSH2 channel.
|
|
"""
|
|
|
|
import sys, time, threading, socket, os
|
|
|
|
from common import *
|
|
import util
|
|
from message import Message
|
|
from ssh_exception import SSHException
|
|
from file import BufferedFile
|
|
|
|
|
|
class Channel (object):
|
|
"""
|
|
A secure tunnel across an SSH L{Transport}. A Channel is meant to behave
|
|
like a socket, and has an API that should be indistinguishable from the
|
|
python socket API.
|
|
|
|
Because SSH2 has a windowing kind of flow control, if you stop reading data
|
|
from a Channel and its buffer fills up, the server will be unable to send
|
|
you any more data until you read some of it. (This won't affect other
|
|
channels on the same transport -- all channels on a single transport are
|
|
flow-controlled independently.) Similarly, if the server isn't reading
|
|
data you send, calls to L{send} may block, unless you set a timeout. This
|
|
is exactly like a normal network socket, so it shouldn't be too surprising.
|
|
"""
|
|
|
|
# lower bound on the max packet size we'll accept from the remote host
|
|
MIN_PACKET_SIZE = 1024
|
|
|
|
def __init__(self, chanid):
|
|
"""
|
|
Create a new channel. The channel is not associated with any
|
|
particular session or L{Transport} until the Transport attaches it.
|
|
Normally you would only call this method from the constructor of a
|
|
subclass of L{Channel}.
|
|
|
|
@param chanid: the ID of this channel, as passed by an existing
|
|
L{Transport}.
|
|
@type chanid: int
|
|
"""
|
|
self.chanid = chanid
|
|
self.transport = None
|
|
self.active = False
|
|
self.eof_received = 0
|
|
self.eof_sent = 0
|
|
self.in_buffer = ''
|
|
self.in_stderr_buffer = ''
|
|
self.timeout = None
|
|
self.closed = False
|
|
self.ultra_debug = False
|
|
self.lock = threading.Lock()
|
|
self.in_buffer_cv = threading.Condition(self.lock)
|
|
self.in_stderr_buffer_cv = threading.Condition(self.lock)
|
|
self.out_buffer_cv = threading.Condition(self.lock)
|
|
self.status_event = threading.Event()
|
|
self.name = str(chanid)
|
|
self.logger = util.get_logger('paramiko.chan.' + str(chanid))
|
|
self.pipe_rfd = self.pipe_wfd = None
|
|
# for windows:
|
|
self.pipe_rsock = self.pipe_wsock = None
|
|
self.pipe_set = False
|
|
self.event = threading.Event()
|
|
self.combine_stderr = False
|
|
self.exit_status = -1
|
|
|
|
def __repr__(self):
|
|
"""
|
|
Return a string representation of this object, for debugging.
|
|
|
|
@rtype: str
|
|
"""
|
|
out = '<paramiko.Channel %d' % self.chanid
|
|
if self.closed:
|
|
out += ' (closed)'
|
|
elif self.active:
|
|
if self.eof_received:
|
|
out += ' (EOF received)'
|
|
if self.eof_sent:
|
|
out += ' (EOF sent)'
|
|
out += ' (open) window=%d' % (self.out_window_size)
|
|
if len(self.in_buffer) > 0:
|
|
out += ' in-buffer=%d' % (len(self.in_buffer),)
|
|
out += ' -> ' + repr(self.transport)
|
|
out += '>'
|
|
return out
|
|
|
|
def get_pty(self, term='vt100', width=80, height=24):
|
|
"""
|
|
Request a pseudo-terminal from the server. This is usually used right
|
|
after creating a client channel, to ask the server to provide some
|
|
basic terminal semantics for a shell invoked with L{invoke_shell}.
|
|
It isn't necessary (or desirable) to call this method if you're going
|
|
to exectue a single command with L{exec_command}.
|
|
|
|
@param term: the terminal type to emulate (for example, C{'vt100'}).
|
|
@type term: str
|
|
@param width: width (in characters) of the terminal screen
|
|
@type width: int
|
|
@param height: height (in characters) of the terminal screen
|
|
@type height: int
|
|
@return: C{True} if the operation succeeded; C{False} if not.
|
|
@rtype: bool
|
|
"""
|
|
if self.closed or self.eof_received or self.eof_sent or not self.active:
|
|
raise SSHException('Channel is not open')
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_REQUEST))
|
|
m.add_int(self.remote_chanid)
|
|
m.add_string('pty-req')
|
|
m.add_boolean(True)
|
|
m.add_string(term)
|
|
m.add_int(width)
|
|
m.add_int(height)
|
|
# pixel height, width (usually useless)
|
|
m.add_int(0).add_int(0)
|
|
m.add_string('')
|
|
self.event.clear()
|
|
self.transport._send_user_message(m)
|
|
while True:
|
|
self.event.wait(0.1)
|
|
if self.closed:
|
|
return False
|
|
if self.event.isSet():
|
|
return True
|
|
|
|
def invoke_shell(self):
|
|
"""
|
|
Request an interactive shell session on this channel. If the server
|
|
allows it, the channel will then be directly connected to the stdin,
|
|
stdout, and stderr of the shell.
|
|
|
|
Normally you would call L{get_pty} before this, in which case the
|
|
shell will operate through the pty, and the channel will be connected
|
|
to the stdin and stdout of the pty.
|
|
|
|
@return: C{True} if the operation succeeded; C{False} if not.
|
|
@rtype: bool
|
|
"""
|
|
if self.closed or self.eof_received or self.eof_sent or not self.active:
|
|
raise SSHException('Channel is not open')
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_REQUEST))
|
|
m.add_int(self.remote_chanid)
|
|
m.add_string('shell')
|
|
m.add_boolean(1)
|
|
self.event.clear()
|
|
self.transport._send_user_message(m)
|
|
while 1:
|
|
self.event.wait(0.1)
|
|
if self.closed:
|
|
return False
|
|
if self.event.isSet():
|
|
return True
|
|
|
|
def exec_command(self, command):
|
|
"""
|
|
Execute a command on the server. If the server allows it, the channel
|
|
will then be directly connected to the stdin, stdout, and stderr of
|
|
the command being executed.
|
|
|
|
@param command: a shell command to execute.
|
|
@type command: str
|
|
@return: C{True} if the operation succeeded; C{False} if not.
|
|
@rtype: bool
|
|
"""
|
|
if self.closed or self.eof_received or self.eof_sent or not self.active:
|
|
raise SSHException('Channel is not open')
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_REQUEST))
|
|
m.add_int(self.remote_chanid)
|
|
m.add_string('exec')
|
|
m.add_boolean(1)
|
|
m.add_string(command)
|
|
self.event.clear()
|
|
self.transport._send_user_message(m)
|
|
while 1:
|
|
self.event.wait(0.1)
|
|
if self.closed:
|
|
return False
|
|
if self.event.isSet():
|
|
return True
|
|
|
|
def invoke_subsystem(self, subsystem):
|
|
"""
|
|
Request a subsystem on the server (for example, C{sftp}). If the
|
|
server allows it, the channel will then be directly connected to the
|
|
requested subsystem.
|
|
|
|
@param subsystem: name of the subsystem being requested.
|
|
@type subsystem: str
|
|
@return: C{True} if the operation succeeded; C{False} if not.
|
|
@rtype: bool
|
|
"""
|
|
if self.closed or self.eof_received or self.eof_sent or not self.active:
|
|
raise SSHException('Channel is not open')
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_REQUEST))
|
|
m.add_int(self.remote_chanid)
|
|
m.add_string('subsystem')
|
|
m.add_boolean(1)
|
|
m.add_string(subsystem)
|
|
self.event.clear()
|
|
self.transport._send_user_message(m)
|
|
while True:
|
|
self.event.wait(0.1)
|
|
if self.closed:
|
|
return False
|
|
if self.event.isSet():
|
|
return True
|
|
|
|
def resize_pty(self, width=80, height=24):
|
|
"""
|
|
Resize the pseudo-terminal. This can be used to change the width and
|
|
height of the terminal emulation created in a previous L{get_pty} call.
|
|
|
|
@param width: new width (in characters) of the terminal screen
|
|
@type width: int
|
|
@param height: new height (in characters) of the terminal screen
|
|
@type height: int
|
|
@return: C{True} if the operation succeeded; C{False} if not.
|
|
@rtype: bool
|
|
"""
|
|
if self.closed or self.eof_received or self.eof_sent or not self.active:
|
|
raise SSHException('Channel is not open')
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_REQUEST))
|
|
m.add_int(self.remote_chanid)
|
|
m.add_string('window-change')
|
|
m.add_boolean(1)
|
|
m.add_int(width)
|
|
m.add_int(height)
|
|
m.add_int(0).add_int(0)
|
|
self.event.clear()
|
|
self.transport._send_user_message(m)
|
|
while True:
|
|
self.event.wait(0.1)
|
|
if self.closed:
|
|
return False
|
|
if self.event.isSet():
|
|
return True
|
|
|
|
def recv_exit_status(self):
|
|
"""
|
|
Return the exit status from the process on the server. This is
|
|
mostly useful for retrieving the reults of an L{exec_command}.
|
|
If the command hasn't finished yet, this method will wait until
|
|
it does, or until the channel is closed. If no exit status is
|
|
provided by the server, -1 is returned.
|
|
|
|
@return: the exit code of the process on the server.
|
|
@rtype: int
|
|
|
|
@since: 1.2
|
|
"""
|
|
while True:
|
|
if self.closed or self.status_event.isSet():
|
|
return self.exit_status
|
|
self.status_event.wait(0.1)
|
|
|
|
def send_exit_status(self, status):
|
|
"""
|
|
Send the exit status of an executed command to the client. (This
|
|
really only makes sense in server mode.) Many clients expect to
|
|
get some sort of status code back from an executed command after
|
|
it completes.
|
|
|
|
@param status: the exit code of the process
|
|
@type status: int
|
|
|
|
@since: 1.2
|
|
"""
|
|
# in many cases, the channel will not still be open here.
|
|
# that's fine.
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_REQUEST))
|
|
m.add_int(self.remote_chanid)
|
|
m.add_string('exit-status')
|
|
m.add_boolean(0)
|
|
m.add_int(status)
|
|
self.transport._send_user_message(m)
|
|
|
|
def get_transport(self):
|
|
"""
|
|
Return the L{Transport} associated with this channel.
|
|
|
|
@return: the L{Transport} that was used to create this channel.
|
|
@rtype: L{Transport}
|
|
"""
|
|
return self.transport
|
|
|
|
def set_name(self, name):
|
|
"""
|
|
Set a name for this channel. Currently it's only used to set the name
|
|
of the log level used for debugging. The name can be fetched with the
|
|
L{get_name} method.
|
|
|
|
@param name: new channel name.
|
|
@type name: str
|
|
"""
|
|
self.name = name
|
|
self.logger = util.get_logger(self.transport.get_log_channel() + '.' + self.name)
|
|
|
|
def get_name(self):
|
|
"""
|
|
Get the name of this channel that was previously set by L{set_name}.
|
|
|
|
@return: the name of this channel.
|
|
@rtype: str
|
|
"""
|
|
return self.name
|
|
|
|
def get_id(self):
|
|
"""
|
|
Return the ID # for this channel. The channel ID is unique across
|
|
a L{Transport} and usually a small number. It's also the number
|
|
passed to L{ServerInterface.check_channel_request} when determining
|
|
whether to accept a channel request in server mode.
|
|
|
|
@return: the ID of this channel.
|
|
@rtype: int
|
|
|
|
@since: ivysaur
|
|
"""
|
|
return self.chanid
|
|
|
|
def set_combine_stderr(self, combine):
|
|
"""
|
|
Set whether stderr should be combined into stdout on this channel.
|
|
The default is C{False}, but in some cases it may be convenient to
|
|
have both streams combined.
|
|
|
|
If this is C{False}, and L{exec_command} is called (or C{invoke_shell}
|
|
with no pty), output to stderr will not show up through the L{recv}
|
|
and L{recv_ready} calls. You will have to use L{recv_stderr} and
|
|
L{recv_stderr_ready} to get stderr output.
|
|
|
|
If this is C{True}, data will never show up via L{recv_stderr} or
|
|
L{recv_stderr_ready}.
|
|
|
|
@param combine: C{True} if stderr output should be combined into
|
|
stdout on this channel.
|
|
@type combine: bool
|
|
@return: previous setting.
|
|
@rtype: bool
|
|
|
|
@since: 1.1
|
|
"""
|
|
data = ''
|
|
self.lock.acquire()
|
|
try:
|
|
old = self.combine_stderr
|
|
self.combine_stderr = combine
|
|
if combine and not old:
|
|
# copy old stderr buffer into primary buffer
|
|
data = self.in_stderr_buffer
|
|
self.in_stderr_buffer = ''
|
|
finally:
|
|
self.lock.release()
|
|
if len(data) > 0:
|
|
self._feed(data)
|
|
return old
|
|
|
|
|
|
### socket API
|
|
|
|
|
|
def settimeout(self, timeout):
|
|
"""
|
|
Set a timeout on blocking read/write operations. The C{timeout}
|
|
argument can be a nonnegative float expressing seconds, or C{None}. If
|
|
a float is given, subsequent channel read/write operations will raise
|
|
a timeout exception if the timeout period value has elapsed before the
|
|
operation has completed. Setting a timeout of C{None} disables
|
|
timeouts on socket operations.
|
|
|
|
C{chan.settimeout(0.0)} is equivalent to C{chan.setblocking(0)};
|
|
C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}.
|
|
|
|
@param timeout: seconds to wait for a pending read/write operation
|
|
before raising C{socket.timeout}, or C{None} for no timeout.
|
|
@type timeout: float
|
|
"""
|
|
self.timeout = timeout
|
|
|
|
def gettimeout(self):
|
|
"""
|
|
Returns the timeout in seconds (as a float) associated with socket
|
|
operations, or C{None} if no timeout is set. This reflects the last
|
|
call to L{setblocking} or L{settimeout}.
|
|
|
|
@return: timeout in seconds, or C{None}.
|
|
@rtype: float
|
|
"""
|
|
return self.timeout
|
|
|
|
def setblocking(self, blocking):
|
|
"""
|
|
Set blocking or non-blocking mode of the channel: if C{blocking} is 0,
|
|
the channel is set to non-blocking mode; otherwise it's set to blocking
|
|
mode. Initially all channels are in blocking mode.
|
|
|
|
In non-blocking mode, if a L{recv} call doesn't find any data, or if a
|
|
L{send} call can't immediately dispose of the data, an error exception
|
|
is raised. In blocking mode, the calls block until they can proceed.
|
|
|
|
C{chan.setblocking(0)} is equivalent to C{chan.settimeout(0)};
|
|
C{chan.setblocking(1)} is equivalent to C{chan.settimeout(None)}.
|
|
|
|
@param blocking: 0 to set non-blocking mode; non-0 to set blocking
|
|
mode.
|
|
@type blocking: int
|
|
"""
|
|
if blocking:
|
|
self.settimeout(None)
|
|
else:
|
|
self.settimeout(0.0)
|
|
|
|
def close(self):
|
|
"""
|
|
Close the channel. All future read/write operations on the channel
|
|
will fail. The remote end will receive no more data (after queued data
|
|
is flushed). Channels are automatically closed when they are garbage-
|
|
collected, or when their L{Transport} is closed.
|
|
"""
|
|
self.lock.acquire()
|
|
try:
|
|
if not self.active or self.closed:
|
|
return
|
|
try:
|
|
self._send_eof()
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_CLOSE))
|
|
m.add_int(self.remote_chanid)
|
|
self.transport._send_user_message(m)
|
|
except EOFError:
|
|
pass
|
|
self._set_closed()
|
|
# can't unlink from the Transport yet -- the remote side may still
|
|
# try to send meta-data (exit-status, etc)
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def recv_ready(self):
|
|
"""
|
|
Returns true if data is buffered and ready to be read from this
|
|
channel. A C{False} result does not mean that the channel has closed;
|
|
it means you may need to wait before more data arrives.
|
|
|
|
@return: C{True} if a L{recv} call on this channel would immediately
|
|
return at least one byte; C{False} otherwise.
|
|
@rtype: boolean
|
|
"""
|
|
self.lock.acquire()
|
|
try:
|
|
if len(self.in_buffer) == 0:
|
|
return False
|
|
return True
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def recv(self, nbytes):
|
|
"""
|
|
Receive data from the channel. The return value is a string
|
|
representing the data received. The maximum amount of data to be
|
|
received at once is specified by C{nbytes}. If a string of length zero
|
|
is returned, the channel stream has closed.
|
|
|
|
@param nbytes: maximum number of bytes to read.
|
|
@type nbytes: int
|
|
@return: data.
|
|
@rtype: str
|
|
|
|
@raise socket.timeout: if no data is ready before the timeout set by
|
|
L{settimeout}.
|
|
"""
|
|
out = ''
|
|
self.lock.acquire()
|
|
try:
|
|
if len(self.in_buffer) == 0:
|
|
if self.closed or self.eof_received:
|
|
return out
|
|
# should we block?
|
|
if self.timeout == 0.0:
|
|
raise socket.timeout()
|
|
# loop here in case we get woken up but a different thread has grabbed everything in the buffer
|
|
timeout = self.timeout
|
|
while (len(self.in_buffer) == 0) and not self.closed and not self.eof_received:
|
|
then = time.time()
|
|
self.in_buffer_cv.wait(timeout)
|
|
if timeout != None:
|
|
timeout -= time.time() - then
|
|
if timeout <= 0.0:
|
|
raise socket.timeout()
|
|
# something in the buffer and we have the lock
|
|
if len(self.in_buffer) <= nbytes:
|
|
out = self.in_buffer
|
|
self.in_buffer = ''
|
|
if self.pipe_rfd != None:
|
|
# clear the pipe, since no more data is buffered
|
|
self._clear_pipe()
|
|
else:
|
|
out = self.in_buffer[:nbytes]
|
|
self.in_buffer = self.in_buffer[nbytes:]
|
|
self._check_add_window(len(out))
|
|
finally:
|
|
self.lock.release()
|
|
return out
|
|
|
|
def recv_stderr_ready(self):
|
|
"""
|
|
Returns true if data is buffered and ready to be read from this
|
|
channel's stderr stream. Only channels using L{exec_command} or
|
|
L{invoke_shell} without a pty will ever have data on the stderr
|
|
stream.
|
|
|
|
@return: C{True} if a L{recv_stderr} call on this channel would
|
|
immediately return at least one byte; C{False} otherwise.
|
|
@rtype: boolean
|
|
|
|
@since: 1.1
|
|
"""
|
|
self.lock.acquire()
|
|
try:
|
|
if len(self.in_stderr_buffer) == 0:
|
|
return False
|
|
return True
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def recv_stderr(self, nbytes):
|
|
"""
|
|
Receive data from the channel's stderr stream. Only channels using
|
|
L{exec_command} or L{invoke_shell} without a pty will ever have data
|
|
on the stderr stream. The return value is a string representing the
|
|
data received. The maximum amount of data to be received at once is
|
|
specified by C{nbytes}. If a string of length zero is returned, the
|
|
channel stream has closed.
|
|
|
|
@param nbytes: maximum number of bytes to read.
|
|
@type nbytes: int
|
|
@return: data.
|
|
@rtype: str
|
|
|
|
@raise socket.timeout: if no data is ready before the timeout set by
|
|
L{settimeout}.
|
|
|
|
@since: 1.1
|
|
"""
|
|
out = ''
|
|
self.lock.acquire()
|
|
try:
|
|
if len(self.in_stderr_buffer) == 0:
|
|
if self.closed or self.eof_received:
|
|
return out
|
|
# should we block?
|
|
if self.timeout == 0.0:
|
|
raise socket.timeout()
|
|
# loop here in case we get woken up but a different thread has grabbed everything in the buffer
|
|
timeout = self.timeout
|
|
while (len(self.in_stderr_buffer) == 0) and not self.closed and not self.eof_received:
|
|
then = time.time()
|
|
self.in_stderr_buffer_cv.wait(timeout)
|
|
if timeout != None:
|
|
timeout -= time.time() - then
|
|
if timeout <= 0.0:
|
|
raise socket.timeout()
|
|
# something in the buffer and we have the lock
|
|
if len(self.in_stderr_buffer) <= nbytes:
|
|
out = self.in_stderr_buffer
|
|
self.in_stderr_buffer = ''
|
|
else:
|
|
out = self.in_stderr_buffer[:nbytes]
|
|
self.in_stderr_buffer = self.in_stderr_buffer[nbytes:]
|
|
self._check_add_window(len(out))
|
|
finally:
|
|
self.lock.release()
|
|
return out
|
|
|
|
def send(self, s):
|
|
"""
|
|
Send data to the channel. Returns the number of bytes sent, or 0 if
|
|
the channel stream is closed. Applications are responsible for
|
|
checking that all data has been sent: if only some of the data was
|
|
transmitted, the application needs to attempt delivery of the remaining
|
|
data.
|
|
|
|
@param s: data to send.
|
|
@type s: str
|
|
@return: number of bytes actually sent.
|
|
@rtype: int
|
|
|
|
@raise socket.timeout: if no data could be sent before the timeout set
|
|
by L{settimeout}.
|
|
"""
|
|
size = len(s)
|
|
self.lock.acquire()
|
|
try:
|
|
size = self._wait_for_send_window(size)
|
|
if size == 0:
|
|
# eof or similar
|
|
return 0
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_DATA))
|
|
m.add_int(self.remote_chanid)
|
|
m.add_string(s[:size])
|
|
self.transport._send_user_message(m)
|
|
finally:
|
|
self.lock.release()
|
|
return size
|
|
|
|
def send_stderr(self, s):
|
|
"""
|
|
Send data to the channel on the "stderr" stream. This is normally
|
|
only used by servers to send output from shell commands -- clients
|
|
won't use this. Returns the number of bytes sent, or 0 if the channel
|
|
stream is closed. Applications are responsible for checking that all
|
|
data has been sent: if only some of the data was transmitted, the
|
|
application needs to attempt delivery of the remaining data.
|
|
|
|
@param s: data to send.
|
|
@type s: str
|
|
@return: number of bytes actually sent.
|
|
@rtype: int
|
|
|
|
@raise socket.timeout: if no data could be sent before the timeout set
|
|
by L{settimeout}.
|
|
|
|
@since: 1.1
|
|
"""
|
|
size = len(s)
|
|
self.lock.acquire()
|
|
try:
|
|
size = self._wait_for_send_window(size)
|
|
if size == 0:
|
|
# eof or similar
|
|
return 0
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_EXTENDED_DATA))
|
|
m.add_int(self.remote_chanid)
|
|
m.add_int(1)
|
|
m.add_string(s[:size])
|
|
self.transport._send_user_message(m)
|
|
finally:
|
|
self.lock.release()
|
|
return size
|
|
|
|
def sendall(self, s):
|
|
"""
|
|
Send data to the channel, without allowing partial results. Unlike
|
|
L{send}, this method continues to send data from the given string until
|
|
either all data has been sent or an error occurs. Nothing is returned.
|
|
|
|
@param s: data to send.
|
|
@type s: str
|
|
|
|
@raise socket.timeout: if sending stalled for longer than the timeout
|
|
set by L{settimeout}.
|
|
@raise socket.error: if an error occured before the entire string was
|
|
sent.
|
|
|
|
@note: If the channel is closed while only part of the data hase been
|
|
sent, there is no way to determine how much data (if any) was sent.
|
|
This is irritating, but identically follows python's API.
|
|
"""
|
|
while s:
|
|
if self.closed:
|
|
# this doesn't seem useful, but it is the documented behavior of Socket
|
|
raise socket.error('Socket is closed')
|
|
sent = self.send(s)
|
|
s = s[sent:]
|
|
return None
|
|
|
|
def sendall_stderr(self, s):
|
|
"""
|
|
Send data to the channel's "stderr" stream, without allowing partial
|
|
results. Unlike L{send_stderr}, this method continues to send data
|
|
from the given string until all data has been sent or an error occurs.
|
|
Nothing is returned.
|
|
|
|
@param s: data to send to the client as "stderr" output.
|
|
@type s: str
|
|
|
|
@raise socket.timeout: if sending stalled for longer than the timeout
|
|
set by L{settimeout}.
|
|
@raise socket.error: if an error occured before the entire string was
|
|
sent.
|
|
|
|
@since: 1.1
|
|
"""
|
|
while s:
|
|
if self.closed:
|
|
raise socket.error('Socket is closed')
|
|
sent = self.send_stderr(s)
|
|
s = s[sent:]
|
|
return None
|
|
|
|
def makefile(self, *params):
|
|
"""
|
|
Return a file-like object associated with this channel. The optional
|
|
C{mode} and C{bufsize} arguments are interpreted the same way as by
|
|
the built-in C{file()} function in python.
|
|
|
|
@return: object which can be used for python file I/O.
|
|
@rtype: L{ChannelFile}
|
|
"""
|
|
return ChannelFile(*([self] + list(params)))
|
|
|
|
def makefile_stderr(self, *params):
|
|
"""
|
|
Return a file-like object associated with this channel's stderr
|
|
stream. Only channels using L{exec_command} or L{invoke_shell}
|
|
without a pty will ever have data on the stderr stream.
|
|
|
|
The optional C{mode} and C{bufsize} arguments are interpreted the
|
|
same way as by the built-in C{file()} function in python. For a
|
|
client, it only makes sense to open this file for reading. For a
|
|
server, it only makes sense to open this file for writing.
|
|
|
|
@return: object which can be used for python file I/O.
|
|
@rtype: L{ChannelFile}
|
|
|
|
@since: 1.1
|
|
"""
|
|
return ChannelStderrFile(*([self] + list(params)))
|
|
|
|
def fileno(self):
|
|
"""
|
|
Returns an OS-level file descriptor which can be used for polling, but
|
|
but I{not} for reading or writing). This is primaily to allow python's
|
|
C{select} module to work.
|
|
|
|
The first time C{fileno} is called on a channel, a pipe is created to
|
|
simulate real OS-level file descriptor (FD) behavior. Because of this,
|
|
two OS-level FDs are created, which will use up FDs faster than normal.
|
|
You won't notice this effect unless you open hundreds or thousands of
|
|
channels simultaneously, but it's still notable.
|
|
|
|
@return: an OS-level file descriptor
|
|
@rtype: int
|
|
|
|
@warning: This method causes channel reads to be slightly less
|
|
efficient.
|
|
"""
|
|
self.lock.acquire()
|
|
try:
|
|
if self.pipe_rfd != None:
|
|
return self.pipe_rfd
|
|
# create the pipe and feed in any existing data
|
|
self.pipe_rfd, self.pipe_wfd = self._make_pipe()
|
|
if len(self.in_buffer) > 0:
|
|
self._set_pipe()
|
|
return self.pipe_rfd
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def shutdown(self, how):
|
|
"""
|
|
Shut down one or both halves of the connection. If C{how} is 0,
|
|
further receives are disallowed. If C{how} is 1, further sends
|
|
are disallowed. If C{how} is 2, further sends and receives are
|
|
disallowed. This closes the stream in one or both directions.
|
|
|
|
@param how: 0 (stop receiving), 1 (stop sending), or 2 (stop
|
|
receiving and sending).
|
|
@type how: int
|
|
"""
|
|
if (how == 0) or (how == 2):
|
|
# feign "read" shutdown
|
|
self.eof_received = 1
|
|
if (how == 1) or (how == 2):
|
|
self.lock.acquire()
|
|
try:
|
|
self._send_eof()
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def shutdown_read(self):
|
|
"""
|
|
Shutdown the receiving side of this socket, closing the stream in
|
|
the incoming direction. After this call, future reads on this
|
|
channel will fail instantly. This is a convenience method, equivalent
|
|
to C{shutdown(0)}, for people who don't make it a habit to
|
|
memorize unix constants from the 1970s.
|
|
|
|
@since: 1.2
|
|
"""
|
|
self.shutdown(0)
|
|
|
|
def shutdown_write(self):
|
|
"""
|
|
Shutdown the sending side of this socket, closing the stream in
|
|
the outgoing direction. After this call, future writes on this
|
|
channel will fail instantly. This is a convenience method, equivalent
|
|
to C{shutdown(1)}, for people who don't make it a habit to
|
|
memorize unix constants from the 1970s.
|
|
|
|
@since: 1.2
|
|
"""
|
|
self.shutdown(1)
|
|
|
|
|
|
### calls from Transport
|
|
|
|
|
|
def _set_transport(self, transport):
|
|
self.transport = transport
|
|
self.logger = util.get_logger(self.transport.get_log_channel() + '.' + self.name)
|
|
|
|
def _set_window(self, window_size, max_packet_size):
|
|
self.in_window_size = window_size
|
|
self.in_max_packet_size = max_packet_size
|
|
# threshold of bytes we receive before we bother to send a window update
|
|
self.in_window_threshold = window_size // 10
|
|
self.in_window_sofar = 0
|
|
self._log(DEBUG, 'Max packet in: %d bytes' % max_packet_size)
|
|
|
|
def _set_remote_channel(self, chanid, window_size, max_packet_size):
|
|
self.remote_chanid = chanid
|
|
self.out_window_size = window_size
|
|
self.out_max_packet_size = max(max_packet_size, self.MIN_PACKET_SIZE)
|
|
self.active = 1
|
|
self._log(DEBUG, 'Max packet out: %d bytes' % max_packet_size)
|
|
|
|
def _request_success(self, m):
|
|
self._log(DEBUG, 'Sesch channel %d request ok' % self.chanid)
|
|
self.event.set()
|
|
return
|
|
|
|
def _request_failed(self, m):
|
|
self.close()
|
|
|
|
def _feed(self, m):
|
|
if type(m) is str:
|
|
# passed from _feed_extended
|
|
s = m
|
|
else:
|
|
s = m.get_string()
|
|
self.lock.acquire()
|
|
try:
|
|
if self.ultra_debug:
|
|
self._log(DEBUG, 'fed %d bytes' % len(s))
|
|
if self.pipe_wfd != None:
|
|
self._set_pipe()
|
|
self.in_buffer += s
|
|
self.in_buffer_cv.notifyAll()
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def _feed_extended(self, m):
|
|
code = m.get_int()
|
|
s = m.get_string()
|
|
if code != 1:
|
|
self._log(ERROR, 'unknown extended_data type %d; discarding' % code)
|
|
return
|
|
if self.combine_stderr:
|
|
return self._feed(s)
|
|
self.lock.acquire()
|
|
try:
|
|
if self.ultra_debug:
|
|
self._log(DEBUG, 'fed %d stderr bytes' % len(s))
|
|
self.in_stderr_buffer += s
|
|
self.in_stderr_buffer_cv.notifyAll()
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def _window_adjust(self, m):
|
|
nbytes = m.get_int()
|
|
self.lock.acquire()
|
|
try:
|
|
if self.ultra_debug:
|
|
self._log(DEBUG, 'window up %d' % nbytes)
|
|
self.out_window_size += nbytes
|
|
self.out_buffer_cv.notifyAll()
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def _handle_request(self, m):
|
|
key = m.get_string()
|
|
want_reply = m.get_boolean()
|
|
server = self.transport.server_object
|
|
ok = False
|
|
if key == 'exit-status':
|
|
self.exit_status = m.get_int()
|
|
self.status_event.set()
|
|
ok = True
|
|
elif key == 'xon-xoff':
|
|
# ignore
|
|
ok = True
|
|
elif key == 'pty-req':
|
|
term = m.get_string()
|
|
width = m.get_int()
|
|
height = m.get_int()
|
|
pixelwidth = m.get_int()
|
|
pixelheight = m.get_int()
|
|
modes = m.get_string()
|
|
if server is None:
|
|
ok = False
|
|
else:
|
|
ok = server.check_channel_pty_request(self, term, width, height, pixelwidth,
|
|
pixelheight, modes)
|
|
elif key == 'shell':
|
|
if server is None:
|
|
ok = False
|
|
else:
|
|
ok = server.check_channel_shell_request(self)
|
|
elif key == 'exec':
|
|
cmd = m.get_string()
|
|
if server is None:
|
|
ok = False
|
|
else:
|
|
ok = server.check_channel_exec_request(self, cmd)
|
|
elif key == 'subsystem':
|
|
name = m.get_string()
|
|
if server is None:
|
|
ok = False
|
|
else:
|
|
ok = server.check_channel_subsystem_request(self, name)
|
|
elif key == 'window-change':
|
|
width = m.get_int()
|
|
height = m.get_int()
|
|
pixelwidth = m.get_int()
|
|
pixelheight = m.get_int()
|
|
if server is None:
|
|
ok = False
|
|
else:
|
|
ok = server.check_channel_window_change_request(self, width, height, pixelwidth,
|
|
pixelheight)
|
|
else:
|
|
self._log(DEBUG, 'Unhandled channel request "%s"' % key)
|
|
ok = False
|
|
if want_reply:
|
|
m = Message()
|
|
if ok:
|
|
m.add_byte(chr(MSG_CHANNEL_SUCCESS))
|
|
else:
|
|
m.add_byte(chr(MSG_CHANNEL_FAILURE))
|
|
m.add_int(self.remote_chanid)
|
|
self.transport._send_user_message(m)
|
|
|
|
def _handle_eof(self, m):
|
|
self.lock.acquire()
|
|
try:
|
|
if not self.eof_received:
|
|
self.eof_received = True
|
|
self.in_buffer_cv.notifyAll()
|
|
self.in_stderr_buffer_cv.notifyAll()
|
|
if self.pipe_wfd != None:
|
|
os.close(self.pipe_wfd)
|
|
self.pipe_wfd = None
|
|
finally:
|
|
self.lock.release()
|
|
self._log(DEBUG, 'EOF received')
|
|
|
|
def _handle_close(self, m):
|
|
self.close()
|
|
self.lock.acquire()
|
|
try:
|
|
self.transport._unlink_channel(self.chanid)
|
|
if self.pipe_wfd != None:
|
|
os.close(self.pipe_wfd)
|
|
self.pipe_wfd = None
|
|
finally:
|
|
self.lock.release()
|
|
|
|
|
|
### internals...
|
|
|
|
|
|
def _log(self, level, msg):
|
|
self.logger.log(level, msg)
|
|
|
|
def _set_closed(self):
|
|
# you are holding the lock.
|
|
self.closed = True
|
|
self.in_buffer_cv.notifyAll()
|
|
self.in_stderr_buffer_cv.notifyAll()
|
|
self.out_buffer_cv.notifyAll()
|
|
|
|
def _send_eof(self):
|
|
# you are holding the lock.
|
|
if self.eof_sent:
|
|
return
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_EOF))
|
|
m.add_int(self.remote_chanid)
|
|
self.transport._send_user_message(m)
|
|
self.eof_sent = True
|
|
self._log(DEBUG, 'EOF sent')
|
|
return
|
|
|
|
def _set_pipe(self):
|
|
"you are already holding the lock"
|
|
if self.pipe_set:
|
|
return
|
|
self.pipe_set = True
|
|
os.write(self.pipe_wfd, '*')
|
|
|
|
def _clear_pipe(self):
|
|
"you are already holding the lock"
|
|
if not self.pipe_set:
|
|
return
|
|
os.read(self.pipe_rfd, 1)
|
|
self.pipe_set = False
|
|
|
|
def _unlink(self):
|
|
# server connection could die before we become active: still signal the close!
|
|
if self.closed:
|
|
return
|
|
self.lock.acquire()
|
|
try:
|
|
self._set_closed()
|
|
self.transport._unlink_channel(self.chanid)
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def _check_add_window(self, n):
|
|
# already holding the lock!
|
|
if self.closed or self.eof_received or not self.active:
|
|
return
|
|
if self.ultra_debug:
|
|
self._log(DEBUG, 'addwindow %d' % n)
|
|
self.in_window_sofar += n
|
|
if self.in_window_sofar > self.in_window_threshold:
|
|
if self.ultra_debug:
|
|
self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar)
|
|
m = Message()
|
|
m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST))
|
|
m.add_int(self.remote_chanid)
|
|
m.add_int(self.in_window_sofar)
|
|
self.transport._send_user_message(m)
|
|
self.in_window_sofar = 0
|
|
|
|
def _wait_for_send_window(self, size):
|
|
"""
|
|
(You are already holding the lock.)
|
|
Wait for the send window to open up, and allocate up to C{size} bytes
|
|
for transmission. If no space opens up before the timeout, a timeout
|
|
exception is raised. Returns the number of bytes available to send
|
|
(may be less than requested).
|
|
"""
|
|
# you are already holding the lock
|
|
if self.closed or self.eof_sent:
|
|
return 0
|
|
if self.out_window_size == 0:
|
|
# should we block?
|
|
if self.timeout == 0.0:
|
|
raise socket.timeout()
|
|
# loop here in case we get woken up but a different thread has filled the buffer
|
|
timeout = self.timeout
|
|
while self.out_window_size == 0:
|
|
if self.closed or self.eof_sent:
|
|
return 0
|
|
then = time.time()
|
|
self.out_buffer_cv.wait(timeout)
|
|
if timeout != None:
|
|
timeout -= time.time() - then
|
|
if timeout <= 0.0:
|
|
raise socket.timeout()
|
|
# we have some window to squeeze into
|
|
if self.closed or self.eof_sent:
|
|
return 0
|
|
if self.out_window_size < size:
|
|
size = self.out_window_size
|
|
if self.out_max_packet_size - 64 < size:
|
|
size = self.out_max_packet_size - 64
|
|
self.out_window_size -= size
|
|
if self.ultra_debug:
|
|
self._log(DEBUG, 'window down to %d' % self.out_window_size)
|
|
return size
|
|
|
|
def _make_pipe (self):
|
|
"""
|
|
Create a pipe in such a way that the readable end may be used in select()
|
|
on the host OS. For posix (Linux, MacOS, etc) this means just returning
|
|
an OS-level pipe. For Windows, we need to do some convolutions to create
|
|
an actual OS-level "WinSock", because on Windows, only a "WinSock" may be
|
|
selected on. Sigh.
|
|
|
|
@return: (read_end, write_end) tuple
|
|
"""
|
|
if sys.platform[:3] != 'win':
|
|
return os.pipe()
|
|
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
serv.bind(('127.0.0.1', 0))
|
|
serv.listen(1)
|
|
|
|
# need to save sockets in pipe_rsock/pipe_wsock so they don't get closed
|
|
self.pipe_rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.pipe_rsock.connect(('127.0.0.1', serv.getsockname()[1]))
|
|
|
|
self.pipe_wsock, addr = serv.accept()
|
|
serv.close()
|
|
return self.pipe_rsock.fileno(), self.pipe_wsock.fileno()
|
|
|
|
|
|
|
|
class ChannelFile (BufferedFile):
|
|
"""
|
|
A file-like wrapper around L{Channel}. A ChannelFile is created by calling
|
|
L{Channel.makefile}.
|
|
|
|
@bug: To correctly emulate the file object created from a socket's
|
|
C{makefile} method, a L{Channel} and its C{ChannelFile} should be able
|
|
to be closed or garbage-collected independently. Currently, closing
|
|
the C{ChannelFile} does nothing but flush the buffer.
|
|
"""
|
|
|
|
def __init__(self, channel, mode = 'r', bufsize = -1):
|
|
self.channel = channel
|
|
BufferedFile.__init__(self)
|
|
self._set_mode(mode, bufsize)
|
|
|
|
def __repr__(self):
|
|
"""
|
|
Returns a string representation of this object, for debugging.
|
|
|
|
@rtype: str
|
|
"""
|
|
return '<paramiko.ChannelFile from ' + repr(self.channel) + '>'
|
|
|
|
def _read(self, size):
|
|
return self.channel.recv(size)
|
|
|
|
def _write(self, data):
|
|
self.channel.sendall(data)
|
|
return len(data)
|
|
|
|
|
|
class ChannelStderrFile (ChannelFile):
|
|
def __init__(self, channel, mode = 'r', bufsize = -1):
|
|
ChannelFile.__init__(self, channel, mode, bufsize)
|
|
|
|
def _read(self, size):
|
|
return self.channel.recv_stderr(size)
|
|
|
|
def _write(self, data):
|
|
self.channel.sendall_stderr(data)
|
|
return len(data)
|
|
|
|
|
|
# vim: set shiftwidth=4 expandtab :
|