From fb73c0ef7f969a873ccb4f091cacd3ba83d7bb85 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Sun, 25 Sep 2005 09:11:23 +0000 Subject: [PATCH] [project @ Arch-1:robey@lag.net--2005-master-shake%paramiko--dev--1--patch-60] add file pipelining for writes --- README | 23 +++++++++++++++++++---- paramiko/sftp_client.py | 38 ++++++++++++++++++++++++++++++-------- paramiko/sftp_file.py | 34 +++++++++++++++++++++++++++++----- tests/test_sftp.py | 34 ++++++++++++++++++++++++++++------ 4 files changed, 106 insertions(+), 23 deletions(-) diff --git a/README b/README index eb54c89..0dbff4a 100644 --- a/README +++ b/README @@ -134,6 +134,20 @@ the best and easiest examples of how to use the SFTP class. highlights of what's new in each release: +v1.5 P... +* added support for "keyboard-interactive" authentication +* added mode (on by default) where password authentication will try to + fallback to "keyboard-interactive" if it's supported +* added pipelining to SFTPFile.write and SFTPClient.put +* fixed bug with SFTPFile.close() not guarding against being called more + than once (thanks to Nathaniel Smith) +* fixed broken 'a' flag in SFTPClient.file() (thanks to Nathaniel Smith) +* fixed up epydocs to look nicer +* reorganized auth_transport into auth_handler, which seems to be a cleaner + separation +* demo scripts fixed to have a better chance of loading the host keys + correctly on windows/cygwin + v1.4 ODDISH * added SSH-agent support (for posix) from john rochester * added chdir() and getcwd() to SFTPClient, to emulate a "working directory" @@ -248,18 +262,19 @@ v0.9 FEAROW *** MISSING LINKS -* keyboard-interactive auth +* add comments to demo & demo_simple about how they don't work on windows * host-based auth (yuck!) -* move auth_transport to be a side plugin like in jaramiko * support compression * SFTP pipelining + - basically, just don't wait synchronously for server responses. queue + up "expected" responses and wait for them on close(). * SFTP implicit file locking? +* server-side keyboard-interactive * ctr forms of ciphers are missing (blowfish-ctr, aes128-ctr, aes256-ctr) * SFTP url parsing function to return (user, pass, host, port, path) components * sftp protocol 6 support (ugh....) -- once it settles down more -* why are big files so slow to transfer? profiling needed... * what is psyco? -* make a simple example demonstrating use of SocketServer +* make a simple example demonstrating use of SocketServer (besides forward.py?) diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py index 3a65a68..9efad31 100644 --- a/paramiko/sftp_client.py +++ b/paramiko/sftp_client.py @@ -56,6 +56,7 @@ class SFTPClient (BaseSFTP): self.ultra_debug = False self.request_number = 1 self._cwd = None + self._expecting = [] if type(sock) is Channel: # override default logger transport = self.sock.get_transport() @@ -449,6 +450,8 @@ class SFTPClient (BaseSFTP): Any exception raised by operations will be passed through. This method is primarily provided as a convenience. + The SFTP operations use pipelining for speed. + @param localpath: the local file to copy @type localpath: str @param remotepath: the destination path on the SFTP server @@ -458,6 +461,7 @@ class SFTPClient (BaseSFTP): """ fl = file(localpath, 'rb') fr = self.file(remotepath, 'wb') + fr.set_pipelined(True) size = 0 while True: data = fl.read(32768) @@ -504,6 +508,10 @@ class SFTPClient (BaseSFTP): def _request(self, t, *arg): + num = self._async_request(t, *arg) + return self._read_response(num) + + def _async_request(self, t, *arg): msg = Message() msg.add_int(self.request_number) for item in arg: @@ -516,18 +524,32 @@ class SFTPClient (BaseSFTP): elif type(item) is SFTPAttributes: item._pack(msg) else: - raise Exception('unknown type for ' + repr(item) + ' type ' + repr(type(item))) + raise Exception('unknown type for %r type %r' % (item, type(item))) self._send_packet(t, str(msg)) - t, data = self._read_packet() - msg = Message(data) - num = msg.get_int() - if num != self.request_number: - raise SFTPError('Expected response #%d, got response #%d' % (self.request_number, num)) + num = self.request_number self.request_number += 1 - if t == CMD_STATUS: - self._convert_status(msg) + self._expecting.append(num) + return num + + def _read_response(self, waitfor=None): + while True: + t, data = self._read_packet() + msg = Message(data) + num = msg.get_int() + if num not in self._expecting: + raise SFTPError('Expected response from %r, got response #%d' % + (self._expected, num)) + self._expecting.remove(num) + if t == CMD_STATUS: + self._convert_status(msg) + if (waitfor is None) or (num == waitfor): + break return t, msg + def _finish_responses(self): + while len(self._expecting) > 0: + self._read_response() + def _convert_status(self, msg): """ Raises EOFError or IOError on error status; otherwise does nothing. diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 854ccf4..3ccfcd2 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -40,6 +40,7 @@ class SFTPFile (BufferedFile): self.sftp = sftp self.handle = handle BufferedFile._set_mode(self, mode, bufsize) + self.pipelined = False def __del__(self): self.close() @@ -54,6 +55,8 @@ class SFTPFile (BufferedFile): # __del__.) if self._closed: return + if self.pipelined: + self.sftp._finish_responses() BufferedFile.close(self) try: self.sftp._request(CMD_CLOSE, self.handle) @@ -74,11 +77,12 @@ class SFTPFile (BufferedFile): def _write(self, data): # may write less than requested if it would exceed max packet size chunk = min(len(data), self.MAX_REQUEST_SIZE) - t, msg = self.sftp._request(CMD_WRITE, self.handle, long(self._realpos), - str(data[:chunk])) - if t != CMD_STATUS: - raise SFTPError('Expected status') - self.sftp._convert_status(msg) + self.sftp._async_request(CMD_WRITE, self.handle, long(self._realpos), str(data[:chunk])) + if not self.pipelined or self.sftp.sock.recv_ready(): + t, msg = self.sftp._read_response() + if t != CMD_STATUS: + raise SFTPError('Expected status') + self.sftp._convert_status(msg) return chunk def settimeout(self, timeout): @@ -193,6 +197,26 @@ class SFTPFile (BufferedFile): alg = msg.get_string() data = msg.get_remainder() return data + + def set_pipelined(self, pipelined=True): + """ + Turn on/off the pipelining of write operations to this file. When + pipelining is on, paramiko won't wait for the server response after + each write operation. Instead, they're collected as they come in. + At the first non-write operation (including L{close}), all remaining + server responses are collected. This means that if there was an error + with one of your later writes, an exception might be thrown from + within L{close} instead of L{write}. + + By default, files are I{not} pipelined. + + @param pipelined: C{True} if pipelining should be turned on for this + file; C{False} otherwise + #type pipelined: bool + + @since: 1.5 + """ + self.pipelined = pipelined ### internals... diff --git a/tests/test_sftp.py b/tests/test_sftp.py index fdbbe1a..4981936 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -451,7 +451,29 @@ class SFTPTest (unittest.TestCase): finally: sftp.remove('%s/hongry.txt' % FOLDER) - def test_F_big_file_big_buffer(self): + def test_F_big_file_pipelined(self): + """ + write a 1MB file, with no linefeeds, using pipelining. + """ + global g_big_file_test + if not g_big_file_test: + return + kblob = (1024 * 'x') + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w') + f.set_pipelined(True) + for n in range(1024): + f.write(kblob) + if n % 128 == 0: + sys.stderr.write('.') + f.close() + sys.stderr.write(' ') + + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + + def test_G_big_file_big_buffer(self): """ write a 1MB file, with no linefeeds, and a big buffer. """ @@ -468,7 +490,7 @@ class SFTPTest (unittest.TestCase): finally: sftp.remove('%s/hongry.txt' % FOLDER) - def test_G_realpath(self): + def test_H_realpath(self): """ test that realpath is returning something non-empty and not an error. @@ -479,7 +501,7 @@ class SFTPTest (unittest.TestCase): self.assert_(len(f) > 0) self.assertEquals(os.path.join(pwd, FOLDER), f) - def test_H_mkdir(self): + def test_I_mkdir(self): """ verify that mkdir/rmdir work. """ @@ -502,7 +524,7 @@ class SFTPTest (unittest.TestCase): except IOError: pass - def test_I_chdir(self): + def test_J_chdir(self): """ verify that chdir/getcwd work. """ @@ -539,7 +561,7 @@ class SFTPTest (unittest.TestCase): except: pass - def test_J_get_put(self): + def test_K_get_put(self): """ verify that get/put work. """ @@ -568,7 +590,7 @@ class SFTPTest (unittest.TestCase): os.unlink(localname) sftp.unlink(FOLDER + '/bunny.txt') - def test_K_check(self): + def test_L_check(self): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who