diff --git a/tests/test_client.py b/tests/test_client.py index a8f8bc8..a801e3b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -25,6 +25,7 @@ import threading import time import unittest import weakref +from binascii import hexlify import paramiko @@ -41,6 +42,11 @@ class NullServer (paramiko.ServerInterface): return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED + def check_auth_publickey(self, username, key): + if (key.get_name() == 'ssh-dss') and (hexlify(key.get_fingerprint()) == '4478f0b9a23cc5182009ff755bc1d26c'): + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + def check_channel_request(self, kind, chanid): return paramiko.OPEN_SUCCEEDED @@ -109,8 +115,41 @@ class SSHClientTest (unittest.TestCase): stdin.close() stdout.close() stderr.close() + + def test_2_client_dsa(self): + """ + verify that SSHClient works with a DSA key. + """ + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = paramiko.RSAKey(data=str(host_key)) + + self.tc = paramiko.SSHClient() + self.tc.get_host_keys().add(self.addr, 'ssh-rsa', public_host_key) + self.tc.connect(self.addr, self.port, username='slowdive', key_filename='tests/test_dss.key') - def test_2_auto_add_policy(self): + self.event.wait(1.0) + self.assert_(self.event.isSet()) + self.assert_(self.ts.is_active()) + self.assertEquals('slowdive', self.ts.get_username()) + self.assertEquals(True, self.ts.is_authenticated()) + + stdin, stdout, stderr = self.tc.exec_command('yes') + schan = self.ts.accept(1.0) + + schan.send('Hello there.\n') + schan.send_stderr('This is on stderr.\n') + schan.close() + + self.assertEquals('Hello there.\n', stdout.readline()) + self.assertEquals('', stdout.readline()) + self.assertEquals('This is on stderr.\n', stderr.readline()) + self.assertEquals('', stderr.readline()) + + stdin.close() + stdout.close() + stderr.close() + + def test_3_auto_add_policy(self): """ verify that SSHClient's AutoAddPolicy works. """ @@ -130,7 +169,7 @@ class SSHClientTest (unittest.TestCase): self.assertEquals(1, len(self.tc.get_host_keys())) self.assertEquals(public_host_key, self.tc.get_host_keys()[self.addr]['ssh-rsa']) - def test_3_cleanup(self): + def test_4_cleanup(self): """ verify that when an SSHClient is collected, its transport (and the transport's packetizer) is closed.