diff --git a/tests/test_transport.py b/tests/test_transport.py index 7147b43..3e51485 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -73,11 +73,9 @@ class NullServer (ServerInterface): def check_channel_exec_request(self, channel, command): if command != 'yes': return False - self.exec_channel = channel return True def check_channel_shell_request(self, channel): - self.shell_channel = channel return True @@ -125,14 +123,51 @@ class TransportTest (unittest.TestCase): event = threading.Event() server = NullServer() self.assert_(not event.isSet()) + self.assertEquals(None, self.tc.get_username()) + self.assertEquals(None, self.ts.get_username()) + self.assertEquals(False, self.tc.is_authenticated()) + self.assertEquals(False, self.ts.is_authenticated()) self.ts.start_server(event, server) self.tc.connect(hostkey=public_host_key, username='slowdive', password='pygmalion') event.wait(1.0) self.assert_(event.isSet()) self.assert_(self.ts.is_active()) + self.assertEquals('slowdive', self.tc.get_username()) + self.assertEquals('slowdive', self.ts.get_username()) + self.assertEquals(True, self.tc.is_authenticated()) + self.assertEquals(True, self.ts.is_authenticated()) - def test_3_bad_auth_type(self): + def test_3_special(self): + """ + verify that the client can demand odd handshake settings, and can + renegotiate keys in mid-stream. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + options = self.tc.get_security_options() + options.ciphers = ('aes256-cbc',) + options.digests = ('hmac-md5-96',) + self.tc.connect(hostkey=public_host_key, + username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + self.assertEquals('aes256-cbc', self.tc.local_cipher) + self.assertEquals('aes256-cbc', self.tc.remote_cipher) + self.assertEquals(12, self.tc.local_mac_len) + self.assertEquals(12, self.tc.remote_mac_len) + + self.tc.send_ignore(1024) + self.assert_(self.tc.renegotiate_keys()) + self.ts.send_ignore(1024) + + def test_4_bad_auth_type(self): """ verify that we get the right exception when an unsupported auth type is requested. @@ -153,7 +188,7 @@ class TransportTest (unittest.TestCase): self.assertEquals(BadAuthenticationType, etype) self.assertEquals(['publickey'], evalue.allowed_types) - def test_4_bad_password(self): + def test_5_bad_password(self): """ verify that a bad password gets the right exception, and that a retry with the right password works. @@ -178,7 +213,7 @@ class TransportTest (unittest.TestCase): self.assert_(event.isSet()) self.assert_(self.ts.is_active()) - def test_5_multipart_auth(self): + def test_6_multipart_auth(self): """ verify that multipart auth works. """ @@ -200,7 +235,7 @@ class TransportTest (unittest.TestCase): self.assert_(event.isSet()) self.assert_(self.ts.is_active()) - def test_6_exec_command(self): + def test_7_exec_command(self): """ verify that exec_command() does something reasonable. """ @@ -219,13 +254,15 @@ class TransportTest (unittest.TestCase): self.assert_(self.ts.is_active()) chan = self.tc.open_session() + schan = self.ts.accept(1.0) self.assert_(not chan.exec_command('no')) chan = self.tc.open_session() self.assert_(chan.exec_command('yes')) - server.exec_channel.send('Hello there.\n') - server.exec_channel.send_stderr('This is on stderr.\n') - server.exec_channel.close() + schan = self.ts.accept(1.0) + schan.send('Hello there.\n') + schan.send_stderr('This is on stderr.\n') + schan.close() f = chan.makefile() self.assertEquals('Hello there.\n', f.readline()) @@ -236,10 +273,11 @@ class TransportTest (unittest.TestCase): # now try it with combined stdout/stderr chan = self.tc.open_session() - chan.exec_command('yes') - server.exec_channel.send('Hello there.\n') - server.exec_channel.send_stderr('This is on stderr.\n') - server.exec_channel.close() + self.assert_(chan.exec_command('yes')) + schan = self.ts.accept(1.0) + schan.send('Hello there.\n') + schan.send_stderr('This is on stderr.\n') + schan.close() chan.set_combine_stderr(True) f = chan.makefile() @@ -247,7 +285,7 @@ class TransportTest (unittest.TestCase): self.assertEquals('This is on stderr.\n', f.readline()) self.assertEquals('', f.readline()) - def test_7_invoke_shell(self): + def test_8_invoke_shell(self): """ verify that invoke_shell() does something reasonable. """ @@ -267,8 +305,9 @@ class TransportTest (unittest.TestCase): chan = self.tc.open_session() self.assert_(chan.invoke_shell()) + schan = self.ts.accept(1.0) chan.send('communist j. cat\n') - f = server.shell_channel.makefile() + f = schan.makefile() self.assertEquals('communist j. cat\n', f.readline()) chan.close() self.assertEquals('', f.readline())