[project @ Arch-1:robey@lag.net--2005-master-shake%paramiko--dev--1--patch-17]

more unit tests
This commit is contained in:
Robey Pointer 2005-06-28 06:02:44 +00:00
parent 1fb38470aa
commit 727cab9672
3 changed files with 101 additions and 21 deletions

View File

@ -39,7 +39,13 @@ class KeyTest (unittest.TestCase):
def tearDown(self): def tearDown(self):
pass pass
def test_1_load_rsa(self): def test_1_generate_key_bytes(self):
from Crypto.Hash import MD5
key = util.generate_key_bytes(MD5, '\x01\x02\x03\x04', 'happy birthday', 30)
exp = util.unhexify('61E1F272F4C1C4561586BD322498C0E924672780F47BB37DDA7D54019E64')
self.assertEquals(exp, key)
def test_2_load_rsa(self):
key = RSAKey.from_private_key_file('tests/test_rsa.key') key = RSAKey.from_private_key_file('tests/test_rsa.key')
self.assertEquals('ssh-rsa', key.get_name()) self.assertEquals('ssh-rsa', key.get_name())
exp_rsa = FINGER_RSA.split()[1].replace(':', '') exp_rsa = FINGER_RSA.split()[1].replace(':', '')
@ -48,7 +54,16 @@ class KeyTest (unittest.TestCase):
self.assertEquals(PUB_RSA.split()[1], key.get_base64()) self.assertEquals(PUB_RSA.split()[1], key.get_base64())
self.assertEquals(1024, key.get_bits()) self.assertEquals(1024, key.get_bits())
def test_2_load_dss(self): def test_3_load_rsa_password(self):
key = RSAKey.from_private_key_file('tests/test_rsa_password.key', 'television')
self.assertEquals('ssh-rsa', key.get_name())
exp_rsa = FINGER_RSA.split()[1].replace(':', '')
my_rsa = util.hexify(key.get_fingerprint()).lower()
self.assertEquals(exp_rsa, my_rsa)
self.assertEquals(PUB_RSA.split()[1], key.get_base64())
self.assertEquals(1024, key.get_bits())
def test_4_load_dss(self):
key = DSSKey.from_private_key_file('tests/test_dss.key') key = DSSKey.from_private_key_file('tests/test_dss.key')
self.assertEquals('ssh-dss', key.get_name()) self.assertEquals('ssh-dss', key.get_name())
exp_dss = FINGER_DSS.split()[1].replace(':', '') exp_dss = FINGER_DSS.split()[1].replace(':', '')
@ -57,7 +72,16 @@ class KeyTest (unittest.TestCase):
self.assertEquals(PUB_DSS.split()[1], key.get_base64()) self.assertEquals(PUB_DSS.split()[1], key.get_base64())
self.assertEquals(1024, key.get_bits()) self.assertEquals(1024, key.get_bits())
def test_3_compare_rsa(self): def test_5_load_dss_password(self):
key = DSSKey.from_private_key_file('tests/test_dss_password.key', 'television')
self.assertEquals('ssh-dss', key.get_name())
exp_dss = FINGER_DSS.split()[1].replace(':', '')
my_dss = util.hexify(key.get_fingerprint()).lower()
self.assertEquals(exp_dss, my_dss)
self.assertEquals(PUB_DSS.split()[1], key.get_base64())
self.assertEquals(1024, key.get_bits())
def test_6_compare_rsa(self):
# verify that the private & public keys compare equal # verify that the private & public keys compare equal
key = RSAKey.from_private_key_file('tests/test_rsa.key') key = RSAKey.from_private_key_file('tests/test_rsa.key')
self.assertEquals(key, key) self.assertEquals(key, key)
@ -66,7 +90,7 @@ class KeyTest (unittest.TestCase):
self.assert_(not pub.can_sign()) self.assert_(not pub.can_sign())
self.assertEquals(key, pub) self.assertEquals(key, pub)
def test_4_compare_dss(self): def test_7_compare_dss(self):
# verify that the private & public keys compare equal # verify that the private & public keys compare equal
key = DSSKey.from_private_key_file('tests/test_dss.key') key = DSSKey.from_private_key_file('tests/test_dss.key')
self.assertEquals(key, key) self.assertEquals(key, key)
@ -75,7 +99,7 @@ class KeyTest (unittest.TestCase):
self.assert_(not pub.can_sign()) self.assert_(not pub.can_sign())
self.assertEquals(key, pub) self.assertEquals(key, pub)
def test_5_sign_rsa(self): def test_8_sign_rsa(self):
# verify that the rsa private key can sign and verify # verify that the rsa private key can sign and verify
key = RSAKey.from_private_key_file('tests/test_rsa.key') key = RSAKey.from_private_key_file('tests/test_rsa.key')
msg = key.sign_ssh_data(randpool, 'ice weasels') msg = key.sign_ssh_data(randpool, 'ice weasels')
@ -87,7 +111,7 @@ class KeyTest (unittest.TestCase):
pub = RSAKey(data=str(key)) pub = RSAKey(data=str(key))
self.assert_(pub.verify_ssh_sig('ice weasels', msg)) self.assert_(pub.verify_ssh_sig('ice weasels', msg))
def test_6_sign_dss(self): def test_9_sign_dss(self):
# verify that the dss private key can sign and verify # verify that the dss private key can sign and verify
key = DSSKey.from_private_key_file('tests/test_dss.key') key = DSSKey.from_private_key_file('tests/test_dss.key')
msg = key.sign_ssh_data(randpool, 'ice weasels') msg = key.sign_ssh_data(randpool, 'ice weasels')
@ -101,12 +125,12 @@ class KeyTest (unittest.TestCase):
pub = DSSKey(data=str(key)) pub = DSSKey(data=str(key))
self.assert_(pub.verify_ssh_sig('ice weasels', msg)) self.assert_(pub.verify_ssh_sig('ice weasels', msg))
def test_7_generate_rsa(self): def test_A_generate_rsa(self):
key = RSAKey.generate(1024) key = RSAKey.generate(1024)
msg = key.sign_ssh_data(randpool, 'jerri blank') msg = key.sign_ssh_data(randpool, 'jerri blank')
self.assert_(key.verify_ssh_sig('jerri blank', msg)) self.assert_(key.verify_ssh_sig('jerri blank', msg))
def test_8_generate_dss(self): def test_B_generate_dss(self):
key = DSSKey.generate(1024) key = DSSKey.generate(1024)
msg = key.sign_ssh_data(randpool, 'jerri blank') msg = key.sign_ssh_data(randpool, 'jerri blank')
self.assert_(key.verify_ssh_sig('jerri blank', msg)) self.assert_(key.verify_ssh_sig('jerri blank', msg))

View File

@ -68,9 +68,10 @@ g_big_file_test = True
class SFTPTest (unittest.TestCase): class SFTPTest (unittest.TestCase):
def init(hostname, username, keyfile, passwd): def init(hostname, username, keyfile, passwd):
global sftp global sftp, tc
t = paramiko.Transport(hostname) t = paramiko.Transport(hostname)
tc = t
try: try:
key = paramiko.RSAKey.from_private_key_file(keyfile, passwd) key = paramiko.RSAKey.from_private_key_file(keyfile, passwd)
except paramiko.PasswordRequiredException: except paramiko.PasswordRequiredException:

View File

@ -1,5 +1,3 @@
#!/usr/bin/python
# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> # Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
# #
# This file is part of paramiko. # This file is part of paramiko.
@ -23,8 +21,9 @@ Some unit tests for the ssh2 protocol in Transport.
""" """
import sys, time, threading, unittest import sys, time, threading, unittest
import select
from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
SSHException, BadAuthenticationType SSHException, BadAuthenticationType, util
from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED from paramiko import OPEN_SUCCEEDED
from loop import LoopSocket from loop import LoopSocket
@ -114,8 +113,16 @@ class TransportTest (unittest.TestCase):
self.assert_(False) self.assert_(False)
except TypeError: except TypeError:
pass pass
def test_2_compute_key(self):
self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929L
self.tc.H = util.unhexify('0C8307CDE6856FF30BA93684EB0F04C2520E9ED3')
self.tc.session_id = self.tc.H
key = self.tc._compute_key('C', 32)
self.assertEquals('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995',
util.hexify(key))
def test_2_simple(self): def test_3_simple(self):
""" """
verify that we can establish an ssh link with ourselves across the verify that we can establish an ssh link with ourselves across the
loopback sockets. this is hardly "simple" but it's simpler than the loopback sockets. this is hardly "simple" but it's simpler than the
@ -142,7 +149,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals(True, self.tc.is_authenticated()) self.assertEquals(True, self.tc.is_authenticated())
self.assertEquals(True, self.ts.is_authenticated()) self.assertEquals(True, self.ts.is_authenticated())
def test_3_special(self): def test_4_special(self):
""" """
verify that the client can demand odd handshake settings, and can verify that the client can demand odd handshake settings, and can
renegotiate keys in mid-stream. renegotiate keys in mid-stream.
@ -171,7 +178,7 @@ class TransportTest (unittest.TestCase):
self.assert_(self.tc.renegotiate_keys()) self.assert_(self.tc.renegotiate_keys())
self.ts.send_ignore(1024) self.ts.send_ignore(1024)
def test_4_keepalive(self): def test_5_keepalive(self):
""" """
verify that the keepalive will be sent. verify that the keepalive will be sent.
""" """
@ -195,7 +202,7 @@ class TransportTest (unittest.TestCase):
time.sleep(2) time.sleep(2)
self.assertEquals('keepalive@lag.net', server._global_request) self.assertEquals('keepalive@lag.net', server._global_request)
def test_5_bad_auth_type(self): def test_6_bad_auth_type(self):
""" """
verify that we get the right exception when an unsupported auth verify that we get the right exception when an unsupported auth
type is requested. type is requested.
@ -216,7 +223,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals(BadAuthenticationType, etype) self.assertEquals(BadAuthenticationType, etype)
self.assertEquals(['publickey'], evalue.allowed_types) self.assertEquals(['publickey'], evalue.allowed_types)
def test_6_bad_password(self): def test_7_bad_password(self):
""" """
verify that a bad password gets the right exception, and that a retry verify that a bad password gets the right exception, and that a retry
with the right password works. with the right password works.
@ -241,7 +248,7 @@ class TransportTest (unittest.TestCase):
self.assert_(event.isSet()) self.assert_(event.isSet())
self.assert_(self.ts.is_active()) self.assert_(self.ts.is_active())
def test_7_multipart_auth(self): def test_8_multipart_auth(self):
""" """
verify that multipart auth works. verify that multipart auth works.
""" """
@ -263,7 +270,7 @@ class TransportTest (unittest.TestCase):
self.assert_(event.isSet()) self.assert_(event.isSet())
self.assert_(self.ts.is_active()) self.assert_(self.ts.is_active())
def test_8_exec_command(self): def test_9_exec_command(self):
""" """
verify that exec_command() does something reasonable. verify that exec_command() does something reasonable.
""" """
@ -313,7 +320,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals('This is on stderr.\n', f.readline()) self.assertEquals('This is on stderr.\n', f.readline())
self.assertEquals('', f.readline()) self.assertEquals('', f.readline())
def test_9_invoke_shell(self): def test_A_invoke_shell(self):
""" """
verify that invoke_shell() does something reasonable. verify that invoke_shell() does something reasonable.
""" """
@ -340,7 +347,7 @@ class TransportTest (unittest.TestCase):
chan.close() chan.close()
self.assertEquals('', f.readline()) self.assertEquals('', f.readline())
def test_A_exit_status(self): def test_B_exit_status(self):
""" """
verify that get_exit_status() works. verify that get_exit_status() works.
""" """
@ -373,3 +380,51 @@ class TransportTest (unittest.TestCase):
self.assertEquals('', f.readline()) self.assertEquals('', f.readline())
self.assertEquals(23, chan.recv_exit_status()) self.assertEquals(23, chan.recv_exit_status())
chan.close() chan.close()
def test_C_select(self):
"""
verify that select() on a channel works.
"""
host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
public_host_key = RSAKey(data=str(host_key))
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
self.assert_(not event.isSet())
self.ts.start_server(event, server)
self.tc.ultra_debug = True
self.tc.connect(hostkey=public_host_key)
self.tc.auth_password(username='slowdive', password='pygmalion')
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
chan = self.tc.open_session()
self.assert_(chan.invoke_shell())
schan = self.ts.accept(1.0)
# nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEquals([], r)
self.assertEquals([], w)
self.assertEquals([], e)
schan.send('hello\n')
# something should be ready now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEquals([chan], r)
self.assertEquals([], w)
self.assertEquals([], e)
self.assertEquals('hello\n', chan.recv(6))
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEquals([], r)
self.assertEquals([], w)
self.assertEquals([], e)
chan.close()
schan.close()