[project @ Arch-1:robey@lag.net--2005-master-shake%paramiko--dev--1--patch-72]

don't attempt to start a rekey negotiation from within send_message -- always do it from the feeder thread.  this prevents a situation where more than one thread may decide spontaneously to rekey, sending multiple kexinit messages, which confuses the hell out of the remote host :)  also, do some locking around the clear-to-send event, to avoid a race when we first go into rekeying.  add some tests for these things too
This commit is contained in:
Robey Pointer 2005-10-29 20:47:04 +00:00
parent 3c67e35b5f
commit 66919286b6
5 changed files with 111 additions and 20 deletions

View File

@ -33,6 +33,10 @@ from paramiko.ssh_exception import SSHException
from paramiko.message import Message
class NeedRekeyException (Exception):
pass
class Packetizer (object):
"""
Implementation of the base SSH packet protocol.
@ -187,6 +191,8 @@ class Packetizer (object):
except socket.timeout:
if self.__closed:
raise EOFError()
if self.__need_rekey:
raise NeedRekeyException()
self._check_keepalive()
return out
@ -269,6 +275,7 @@ class Packetizer (object):
done).
@raise SSHException: if the packet is mangled
@raise NeedRekeyException: if the transport should rekey
"""
header = self.read_all(self.__block_size_in)
if self.__block_engine_in != None:

View File

@ -35,7 +35,7 @@ from paramiko.ssh_exception import SSHException, BadAuthenticationType
from paramiko.message import Message
from paramiko.channel import Channel
from paramiko.sftp_client import SFTPClient
from paramiko.packet import Packetizer
from paramiko.packet import Packetizer, NeedRekeyException
from paramiko.rsakey import RSAKey
from paramiko.dsskey import DSSKey
from paramiko.kex_group1 import KexGroup1
@ -247,6 +247,7 @@ class Transport (threading.Thread):
self.max_packet_size = 32768
self.saved_exception = None
self.clear_to_send = threading.Event()
self.clear_to_send_lock = threading.Lock()
self.log_name = 'paramiko.transport'
self.logger = util.get_logger(self.log_name)
self.packetizer.set_log(self.logger)
@ -592,9 +593,9 @@ class Transport (threading.Thread):
self.channels_seen[chanid] = True
chan._set_transport(self)
chan._set_window(self.window_size, self.max_packet_size)
self._send_user_message(m)
finally:
self.lock.release()
self._send_user_message(m)
while 1:
event.wait(0.1);
if not self.active:
@ -1166,8 +1167,6 @@ class Transport (threading.Thread):
def _send_message(self, data):
self.packetizer.send_message(data)
if self.packetizer.need_rekey() and not self.in_kex:
self._send_kex_init()
def _send_user_message(self, data):
"""
@ -1179,9 +1178,14 @@ class Transport (threading.Thread):
if not self.active:
self._log(DEBUG, 'Dropping user packet because connection is dead.')
return
self.clear_to_send_lock.acquire()
if self.clear_to_send.isSet():
break
self._send_message(data)
self.clear_to_send_lock.release()
try:
self._send_message(data)
finally:
self.clear_to_send_lock.release()
def _set_K_H(self, k, h):
"used by a kex object to set the K (root key) and H (exchange hash)"
@ -1246,7 +1250,10 @@ class Transport (threading.Thread):
while self.active:
if self.packetizer.need_rekey() and not self.in_kex:
self._send_kex_init()
ptype, m = self.packetizer.read_message()
try:
ptype, m = self.packetizer.read_message()
except NeedRekeyException:
continue
if ptype == MSG_IGNORE:
continue
elif ptype == MSG_DISCONNECT:
@ -1324,7 +1331,11 @@ class Transport (threading.Thread):
def _negotiate_keys(self, m):
# throws SSHException on anything unusual
self.clear_to_send.clear()
self.clear_to_send_lock.acquire()
try:
self.clear_to_send.clear()
finally:
self.clear_to_send_lock.release()
if self.local_kex_init == None:
# remote side wants to renegotiate
self._send_kex_init()
@ -1371,7 +1382,11 @@ class Transport (threading.Thread):
announce to the other side that we'd like to negotiate keys, and what
kind of key negotiation we support.
"""
self.clear_to_send.clear()
self.clear_to_send_lock.acquire()
try:
self.clear_to_send.clear()
finally:
self.clear_to_send_lock.release()
self.in_kex = True
if self.server_mode:
if (self._modulus_pack is None) and ('diffie-hellman-group-exchange-sha1' in self._preferred_kex):
@ -1559,7 +1574,12 @@ class Transport (threading.Thread):
# it's now okay to send data again (if this was a re-key)
if not self.packetizer.need_rekey():
self.in_kex = False
self.clear_to_send.set()
self._log(DEBUG, 'clear to send')
self.clear_to_send_lock.acquire()
try:
self.clear_to_send.set()
finally:
self.clear_to_send_lock.release()
return
def _parse_disconnect(self, m):

View File

@ -257,7 +257,7 @@ def log_to_file(filename, level=DEBUG):
l.setLevel(level)
f = open(filename, 'w')
lh = logging.StreamHandler(f)
lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] thr=%(_threadid)-3d %(name)s: %(message)s',
lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s.%(msecs)03d] thr=%(_threadid)-3d %(name)s: %(message)s',
'%Y%m%d-%H:%M:%S'))
l.addHandler(lh)

View File

@ -62,7 +62,7 @@ liver insulin receptors. Their sensitivity to insulin is, however, similarly
decreased compared with chicken.
'''
FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing')
FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
sftp = None
tc = None
@ -121,7 +121,6 @@ class SFTPTest (unittest.TestCase):
ts.start_server(event, server)
tc.connect(username='slowdive', password='pygmalion')
event.wait(1.0)
# self.assert_(self.ts.is_active())
sftp = paramiko.SFTP.from_transport(tc)
init_loopback = staticmethod(init_loopback)
@ -132,7 +131,14 @@ class SFTPTest (unittest.TestCase):
set_big_file_test = staticmethod(set_big_file_test)
def setUp(self):
sftp.mkdir(FOLDER)
global FOLDER
for i in xrange(1000):
FOLDER = FOLDER[:-3] + '%03d' % i
try:
sftp.mkdir(FOLDER)
break
except (IOError, OSError):
pass
def tearDown(self):
sftp.rmdir(FOLDER)
@ -511,17 +517,39 @@ class SFTPTest (unittest.TestCase):
global g_big_file_test
if not g_big_file_test:
return
kblob = (1024 * 1024 * 'x')
mblob = (1024 * 1024 * 'x')
try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
f.write(kblob)
f.write(mblob)
f.close()
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
finally:
sftp.remove('%s/hongry.txt' % FOLDER)
def test_H_realpath(self):
def test_H_big_file_renegotiate(self):
"""
write a 1MB file, forcing key renegotiation in the middle.
"""
global g_big_file_test
if not g_big_file_test:
return
t = sftp.sock.get_transport()
t.packetizer.REKEY_BYTES = 512 * 1024
k32blob = (32 * 1024 * 'x')
try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
for i in xrange(32):
f.write(k32blob)
f.close()
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
self.assertNotEquals(t.H, t.session_id)
finally:
sftp.remove('%s/hongry.txt' % FOLDER)
t.packetizer.REKEY_BYTES = pow(2, 30)
def test_I_realpath(self):
"""
test that realpath is returning something non-empty and not an
error.
@ -532,7 +560,7 @@ class SFTPTest (unittest.TestCase):
self.assert_(len(f) > 0)
self.assertEquals(os.path.join(pwd, FOLDER), f)
def test_I_mkdir(self):
def test_J_mkdir(self):
"""
verify that mkdir/rmdir work.
"""
@ -555,7 +583,7 @@ class SFTPTest (unittest.TestCase):
except IOError:
pass
def test_J_chdir(self):
def test_K_chdir(self):
"""
verify that chdir/getcwd work.
"""
@ -592,7 +620,7 @@ class SFTPTest (unittest.TestCase):
except:
pass
def test_K_get_put(self):
def test_L_get_put(self):
"""
verify that get/put work.
"""
@ -621,7 +649,7 @@ class SFTPTest (unittest.TestCase):
os.unlink(localname)
sftp.unlink(FOLDER + '/bunny.txt')
def test_L_check(self):
def test_M_check(self):
"""
verify that file.check() works against our own server.
(it's an sftp extension that we support, and may be the only ones who

View File

@ -504,3 +504,39 @@ class TransportTest (unittest.TestCase):
self.assertEquals('', chan.recv(16))
chan.close()
def test_F_renegotiate(self):
"""
verify that a transport can correctly renegotiate mid-stream.
"""
host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
public_host_key = RSAKey(data=str(host_key))
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
self.ts.start_server(event, server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
self.tc.packetizer.REKEY_BYTES = 16384
chan = self.tc.open_session()
self.assert_(chan.exec_command('yes'))
schan = self.ts.accept(1.0)
self.assertEquals(self.tc.H, self.tc.session_id)
for i in range(20):
chan.send('x' * 1024)
chan.close()
# allow a few seconds for the rekeying to complete
for i in xrange(50):
if self.tc.H != self.tc.session_id:
break
time.sleep(0.1)
self.assertNotEquals(self.tc.H, self.tc.session_id)
schan.close()