From 66919286b6a9865d667e28654c42c78d43e876ca Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Sat, 29 Oct 2005 20:47:04 +0000 Subject: [PATCH] [project @ Arch-1:robey@lag.net--2005-master-shake%paramiko--dev--1--patch-72] don't attempt to start a rekey negotiation from within send_message -- always do it from the feeder thread. this prevents a situation where more than one thread may decide spontaneously to rekey, sending multiple kexinit messages, which confuses the hell out of the remote host :) also, do some locking around the clear-to-send event, to avoid a race when we first go into rekeying. add some tests for these things too --- paramiko/packet.py | 7 ++++++ paramiko/transport.py | 38 ++++++++++++++++++++++++-------- paramiko/util.py | 2 +- tests/test_sftp.py | 48 ++++++++++++++++++++++++++++++++--------- tests/test_transport.py | 36 +++++++++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 20 deletions(-) diff --git a/paramiko/packet.py b/paramiko/packet.py index 6de9971..cc389ce 100644 --- a/paramiko/packet.py +++ b/paramiko/packet.py @@ -33,6 +33,10 @@ from paramiko.ssh_exception import SSHException from paramiko.message import Message +class NeedRekeyException (Exception): + pass + + class Packetizer (object): """ Implementation of the base SSH packet protocol. @@ -187,6 +191,8 @@ class Packetizer (object): except socket.timeout: if self.__closed: raise EOFError() + if self.__need_rekey: + raise NeedRekeyException() self._check_keepalive() return out @@ -269,6 +275,7 @@ class Packetizer (object): done). @raise SSHException: if the packet is mangled + @raise NeedRekeyException: if the transport should rekey """ header = self.read_all(self.__block_size_in) if self.__block_engine_in != None: diff --git a/paramiko/transport.py b/paramiko/transport.py index 58ec46f..232fdaa 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -35,7 +35,7 @@ from paramiko.ssh_exception import SSHException, BadAuthenticationType from paramiko.message import Message from paramiko.channel import Channel from paramiko.sftp_client import SFTPClient -from paramiko.packet import Packetizer +from paramiko.packet import Packetizer, NeedRekeyException from paramiko.rsakey import RSAKey from paramiko.dsskey import DSSKey from paramiko.kex_group1 import KexGroup1 @@ -247,6 +247,7 @@ class Transport (threading.Thread): self.max_packet_size = 32768 self.saved_exception = None self.clear_to_send = threading.Event() + self.clear_to_send_lock = threading.Lock() self.log_name = 'paramiko.transport' self.logger = util.get_logger(self.log_name) self.packetizer.set_log(self.logger) @@ -592,9 +593,9 @@ class Transport (threading.Thread): self.channels_seen[chanid] = True chan._set_transport(self) chan._set_window(self.window_size, self.max_packet_size) - self._send_user_message(m) finally: self.lock.release() + self._send_user_message(m) while 1: event.wait(0.1); if not self.active: @@ -1166,8 +1167,6 @@ class Transport (threading.Thread): def _send_message(self, data): self.packetizer.send_message(data) - if self.packetizer.need_rekey() and not self.in_kex: - self._send_kex_init() def _send_user_message(self, data): """ @@ -1179,9 +1178,14 @@ class Transport (threading.Thread): if not self.active: self._log(DEBUG, 'Dropping user packet because connection is dead.') return + self.clear_to_send_lock.acquire() if self.clear_to_send.isSet(): break - self._send_message(data) + self.clear_to_send_lock.release() + try: + self._send_message(data) + finally: + self.clear_to_send_lock.release() def _set_K_H(self, k, h): "used by a kex object to set the K (root key) and H (exchange hash)" @@ -1246,7 +1250,10 @@ class Transport (threading.Thread): while self.active: if self.packetizer.need_rekey() and not self.in_kex: self._send_kex_init() - ptype, m = self.packetizer.read_message() + try: + ptype, m = self.packetizer.read_message() + except NeedRekeyException: + continue if ptype == MSG_IGNORE: continue elif ptype == MSG_DISCONNECT: @@ -1324,7 +1331,11 @@ class Transport (threading.Thread): def _negotiate_keys(self, m): # throws SSHException on anything unusual - self.clear_to_send.clear() + self.clear_to_send_lock.acquire() + try: + self.clear_to_send.clear() + finally: + self.clear_to_send_lock.release() if self.local_kex_init == None: # remote side wants to renegotiate self._send_kex_init() @@ -1371,7 +1382,11 @@ class Transport (threading.Thread): announce to the other side that we'd like to negotiate keys, and what kind of key negotiation we support. """ - self.clear_to_send.clear() + self.clear_to_send_lock.acquire() + try: + self.clear_to_send.clear() + finally: + self.clear_to_send_lock.release() self.in_kex = True if self.server_mode: if (self._modulus_pack is None) and ('diffie-hellman-group-exchange-sha1' in self._preferred_kex): @@ -1559,7 +1574,12 @@ class Transport (threading.Thread): # it's now okay to send data again (if this was a re-key) if not self.packetizer.need_rekey(): self.in_kex = False - self.clear_to_send.set() + self._log(DEBUG, 'clear to send') + self.clear_to_send_lock.acquire() + try: + self.clear_to_send.set() + finally: + self.clear_to_send_lock.release() return def _parse_disconnect(self, m): diff --git a/paramiko/util.py b/paramiko/util.py index cf90033..3956863 100644 --- a/paramiko/util.py +++ b/paramiko/util.py @@ -257,7 +257,7 @@ def log_to_file(filename, level=DEBUG): l.setLevel(level) f = open(filename, 'w') lh = logging.StreamHandler(f) - lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] thr=%(_threadid)-3d %(name)s: %(message)s', + lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s.%(msecs)03d] thr=%(_threadid)-3d %(name)s: %(message)s', '%Y%m%d-%H:%M:%S')) l.addHandler(lh) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index 4c5065e..94e9dea 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -62,7 +62,7 @@ liver insulin receptors. Their sensitivity to insulin is, however, similarly decreased compared with chicken. ''' -FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing') +FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000') sftp = None tc = None @@ -121,7 +121,6 @@ class SFTPTest (unittest.TestCase): ts.start_server(event, server) tc.connect(username='slowdive', password='pygmalion') event.wait(1.0) -# self.assert_(self.ts.is_active()) sftp = paramiko.SFTP.from_transport(tc) init_loopback = staticmethod(init_loopback) @@ -132,7 +131,14 @@ class SFTPTest (unittest.TestCase): set_big_file_test = staticmethod(set_big_file_test) def setUp(self): - sftp.mkdir(FOLDER) + global FOLDER + for i in xrange(1000): + FOLDER = FOLDER[:-3] + '%03d' % i + try: + sftp.mkdir(FOLDER) + break + except (IOError, OSError): + pass def tearDown(self): sftp.rmdir(FOLDER) @@ -511,17 +517,39 @@ class SFTPTest (unittest.TestCase): global g_big_file_test if not g_big_file_test: return - kblob = (1024 * 1024 * 'x') + mblob = (1024 * 1024 * 'x') try: f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) - f.write(kblob) + f.write(mblob) f.close() self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) finally: sftp.remove('%s/hongry.txt' % FOLDER) + + def test_H_big_file_renegotiate(self): + """ + write a 1MB file, forcing key renegotiation in the middle. + """ + global g_big_file_test + if not g_big_file_test: + return + t = sftp.sock.get_transport() + t.packetizer.REKEY_BYTES = 512 * 1024 + k32blob = (32 * 1024 * 'x') + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) + for i in xrange(32): + f.write(k32blob) + f.close() - def test_H_realpath(self): + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + self.assertNotEquals(t.H, t.session_id) + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + t.packetizer.REKEY_BYTES = pow(2, 30) + + def test_I_realpath(self): """ test that realpath is returning something non-empty and not an error. @@ -532,7 +560,7 @@ class SFTPTest (unittest.TestCase): self.assert_(len(f) > 0) self.assertEquals(os.path.join(pwd, FOLDER), f) - def test_I_mkdir(self): + def test_J_mkdir(self): """ verify that mkdir/rmdir work. """ @@ -555,7 +583,7 @@ class SFTPTest (unittest.TestCase): except IOError: pass - def test_J_chdir(self): + def test_K_chdir(self): """ verify that chdir/getcwd work. """ @@ -592,7 +620,7 @@ class SFTPTest (unittest.TestCase): except: pass - def test_K_get_put(self): + def test_L_get_put(self): """ verify that get/put work. """ @@ -621,7 +649,7 @@ class SFTPTest (unittest.TestCase): os.unlink(localname) sftp.unlink(FOLDER + '/bunny.txt') - def test_L_check(self): + def test_M_check(self): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who diff --git a/tests/test_transport.py b/tests/test_transport.py index 81254b4..c97fb16 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -504,3 +504,39 @@ class TransportTest (unittest.TestCase): self.assertEquals('', chan.recv(16)) chan.close() + + def test_F_renegotiate(self): + """ + verify that a transport can correctly renegotiate mid-stream. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.ts.start_server(event, server) + self.tc.connect(hostkey=public_host_key, + username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + self.tc.packetizer.REKEY_BYTES = 16384 + + chan = self.tc.open_session() + self.assert_(chan.exec_command('yes')) + schan = self.ts.accept(1.0) + + self.assertEquals(self.tc.H, self.tc.session_id) + for i in range(20): + chan.send('x' * 1024) + chan.close() + + # allow a few seconds for the rekeying to complete + for i in xrange(50): + if self.tc.H != self.tc.session_id: + break + time.sleep(0.1) + self.assertNotEquals(self.tc.H, self.tc.session_id) + + schan.close()