[project @ Arch-1:robey@lag.net--2005-master-shake%paramiko--dev--1--patch-60]

add file pipelining for writes
This commit is contained in:
Robey Pointer 2005-09-25 09:11:23 +00:00
parent 364479610e
commit fb73c0ef7f
4 changed files with 106 additions and 23 deletions

23
README
View File

@ -134,6 +134,20 @@ the best and easiest examples of how to use the SFTP class.
highlights of what's new in each release:
v1.5 P...
* added support for "keyboard-interactive" authentication
* added mode (on by default) where password authentication will try to
fallback to "keyboard-interactive" if it's supported
* added pipelining to SFTPFile.write and SFTPClient.put
* fixed bug with SFTPFile.close() not guarding against being called more
than once (thanks to Nathaniel Smith)
* fixed broken 'a' flag in SFTPClient.file() (thanks to Nathaniel Smith)
* fixed up epydocs to look nicer
* reorganized auth_transport into auth_handler, which seems to be a cleaner
separation
* demo scripts fixed to have a better chance of loading the host keys
correctly on windows/cygwin
v1.4 ODDISH
* added SSH-agent support (for posix) from john rochester
* added chdir() and getcwd() to SFTPClient, to emulate a "working directory"
@ -248,18 +262,19 @@ v0.9 FEAROW
*** MISSING LINKS
* keyboard-interactive auth
* add comments to demo & demo_simple about how they don't work on windows
* host-based auth (yuck!)
* move auth_transport to be a side plugin like in jaramiko
* support compression
* SFTP pipelining
- basically, just don't wait synchronously for server responses. queue
up "expected" responses and wait for them on close().
* SFTP implicit file locking?
* server-side keyboard-interactive
* ctr forms of ciphers are missing (blowfish-ctr, aes128-ctr, aes256-ctr)
* SFTP url parsing function to return (user, pass, host, port, path)
components
* sftp protocol 6 support (ugh....) -- once it settles down more
* why are big files so slow to transfer? profiling needed...
* what is psyco?
* make a simple example demonstrating use of SocketServer
* make a simple example demonstrating use of SocketServer (besides forward.py?)

View File

@ -56,6 +56,7 @@ class SFTPClient (BaseSFTP):
self.ultra_debug = False
self.request_number = 1
self._cwd = None
self._expecting = []
if type(sock) is Channel:
# override default logger
transport = self.sock.get_transport()
@ -449,6 +450,8 @@ class SFTPClient (BaseSFTP):
Any exception raised by operations will be passed through. This
method is primarily provided as a convenience.
The SFTP operations use pipelining for speed.
@param localpath: the local file to copy
@type localpath: str
@param remotepath: the destination path on the SFTP server
@ -458,6 +461,7 @@ class SFTPClient (BaseSFTP):
"""
fl = file(localpath, 'rb')
fr = self.file(remotepath, 'wb')
fr.set_pipelined(True)
size = 0
while True:
data = fl.read(32768)
@ -504,6 +508,10 @@ class SFTPClient (BaseSFTP):
def _request(self, t, *arg):
num = self._async_request(t, *arg)
return self._read_response(num)
def _async_request(self, t, *arg):
msg = Message()
msg.add_int(self.request_number)
for item in arg:
@ -516,18 +524,32 @@ class SFTPClient (BaseSFTP):
elif type(item) is SFTPAttributes:
item._pack(msg)
else:
raise Exception('unknown type for ' + repr(item) + ' type ' + repr(type(item)))
raise Exception('unknown type for %r type %r' % (item, type(item)))
self._send_packet(t, str(msg))
t, data = self._read_packet()
msg = Message(data)
num = msg.get_int()
if num != self.request_number:
raise SFTPError('Expected response #%d, got response #%d' % (self.request_number, num))
num = self.request_number
self.request_number += 1
if t == CMD_STATUS:
self._convert_status(msg)
self._expecting.append(num)
return num
def _read_response(self, waitfor=None):
while True:
t, data = self._read_packet()
msg = Message(data)
num = msg.get_int()
if num not in self._expecting:
raise SFTPError('Expected response from %r, got response #%d' %
(self._expected, num))
self._expecting.remove(num)
if t == CMD_STATUS:
self._convert_status(msg)
if (waitfor is None) or (num == waitfor):
break
return t, msg
def _finish_responses(self):
while len(self._expecting) > 0:
self._read_response()
def _convert_status(self, msg):
"""
Raises EOFError or IOError on error status; otherwise does nothing.

View File

@ -40,6 +40,7 @@ class SFTPFile (BufferedFile):
self.sftp = sftp
self.handle = handle
BufferedFile._set_mode(self, mode, bufsize)
self.pipelined = False
def __del__(self):
self.close()
@ -54,6 +55,8 @@ class SFTPFile (BufferedFile):
# __del__.)
if self._closed:
return
if self.pipelined:
self.sftp._finish_responses()
BufferedFile.close(self)
try:
self.sftp._request(CMD_CLOSE, self.handle)
@ -74,11 +77,12 @@ class SFTPFile (BufferedFile):
def _write(self, data):
# may write less than requested if it would exceed max packet size
chunk = min(len(data), self.MAX_REQUEST_SIZE)
t, msg = self.sftp._request(CMD_WRITE, self.handle, long(self._realpos),
str(data[:chunk]))
if t != CMD_STATUS:
raise SFTPError('Expected status')
self.sftp._convert_status(msg)
self.sftp._async_request(CMD_WRITE, self.handle, long(self._realpos), str(data[:chunk]))
if not self.pipelined or self.sftp.sock.recv_ready():
t, msg = self.sftp._read_response()
if t != CMD_STATUS:
raise SFTPError('Expected status')
self.sftp._convert_status(msg)
return chunk
def settimeout(self, timeout):
@ -193,6 +197,26 @@ class SFTPFile (BufferedFile):
alg = msg.get_string()
data = msg.get_remainder()
return data
def set_pipelined(self, pipelined=True):
"""
Turn on/off the pipelining of write operations to this file. When
pipelining is on, paramiko won't wait for the server response after
each write operation. Instead, they're collected as they come in.
At the first non-write operation (including L{close}), all remaining
server responses are collected. This means that if there was an error
with one of your later writes, an exception might be thrown from
within L{close} instead of L{write}.
By default, files are I{not} pipelined.
@param pipelined: C{True} if pipelining should be turned on for this
file; C{False} otherwise
#type pipelined: bool
@since: 1.5
"""
self.pipelined = pipelined
### internals...

View File

@ -451,7 +451,29 @@ class SFTPTest (unittest.TestCase):
finally:
sftp.remove('%s/hongry.txt' % FOLDER)
def test_F_big_file_big_buffer(self):
def test_F_big_file_pipelined(self):
"""
write a 1MB file, with no linefeeds, using pipelining.
"""
global g_big_file_test
if not g_big_file_test:
return
kblob = (1024 * 'x')
try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
sys.stderr.write('.')
f.close()
sys.stderr.write(' ')
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
finally:
sftp.remove('%s/hongry.txt' % FOLDER)
def test_G_big_file_big_buffer(self):
"""
write a 1MB file, with no linefeeds, and a big buffer.
"""
@ -468,7 +490,7 @@ class SFTPTest (unittest.TestCase):
finally:
sftp.remove('%s/hongry.txt' % FOLDER)
def test_G_realpath(self):
def test_H_realpath(self):
"""
test that realpath is returning something non-empty and not an
error.
@ -479,7 +501,7 @@ class SFTPTest (unittest.TestCase):
self.assert_(len(f) > 0)
self.assertEquals(os.path.join(pwd, FOLDER), f)
def test_H_mkdir(self):
def test_I_mkdir(self):
"""
verify that mkdir/rmdir work.
"""
@ -502,7 +524,7 @@ class SFTPTest (unittest.TestCase):
except IOError:
pass
def test_I_chdir(self):
def test_J_chdir(self):
"""
verify that chdir/getcwd work.
"""
@ -539,7 +561,7 @@ class SFTPTest (unittest.TestCase):
except:
pass
def test_J_get_put(self):
def test_K_get_put(self):
"""
verify that get/put work.
"""
@ -568,7 +590,7 @@ class SFTPTest (unittest.TestCase):
os.unlink(localname)
sftp.unlink(FOLDER + '/bunny.txt')
def test_K_check(self):
def test_L_check(self):
"""
verify that file.check() works against our own server.
(it's an sftp extension that we support, and may be the only ones who