From 727cab9672d5951ee8d11667ba760b29b58b7527 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Tue, 28 Jun 2005 06:02:44 +0000 Subject: [PATCH] [project @ Arch-1:robey@lag.net--2005-master-shake%paramiko--dev--1--patch-17] more unit tests --- tests/test_pkey.py | 40 ++++++++++++++++----- tests/test_sftp.py | 3 +- tests/test_transport.py | 79 ++++++++++++++++++++++++++++++++++------- 3 files changed, 101 insertions(+), 21 deletions(-) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 338ed20..3cd051f 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -39,7 +39,13 @@ class KeyTest (unittest.TestCase): def tearDown(self): pass - def test_1_load_rsa(self): + def test_1_generate_key_bytes(self): + from Crypto.Hash import MD5 + key = util.generate_key_bytes(MD5, '\x01\x02\x03\x04', 'happy birthday', 30) + exp = util.unhexify('61E1F272F4C1C4561586BD322498C0E924672780F47BB37DDA7D54019E64') + self.assertEquals(exp, key) + + def test_2_load_rsa(self): key = RSAKey.from_private_key_file('tests/test_rsa.key') self.assertEquals('ssh-rsa', key.get_name()) exp_rsa = FINGER_RSA.split()[1].replace(':', '') @@ -48,7 +54,16 @@ class KeyTest (unittest.TestCase): self.assertEquals(PUB_RSA.split()[1], key.get_base64()) self.assertEquals(1024, key.get_bits()) - def test_2_load_dss(self): + def test_3_load_rsa_password(self): + key = RSAKey.from_private_key_file('tests/test_rsa_password.key', 'television') + self.assertEquals('ssh-rsa', key.get_name()) + exp_rsa = FINGER_RSA.split()[1].replace(':', '') + my_rsa = util.hexify(key.get_fingerprint()).lower() + self.assertEquals(exp_rsa, my_rsa) + self.assertEquals(PUB_RSA.split()[1], key.get_base64()) + self.assertEquals(1024, key.get_bits()) + + def test_4_load_dss(self): key = DSSKey.from_private_key_file('tests/test_dss.key') self.assertEquals('ssh-dss', key.get_name()) exp_dss = FINGER_DSS.split()[1].replace(':', '') @@ -57,7 +72,16 @@ class KeyTest (unittest.TestCase): self.assertEquals(PUB_DSS.split()[1], key.get_base64()) self.assertEquals(1024, key.get_bits()) - def test_3_compare_rsa(self): + def test_5_load_dss_password(self): + key = DSSKey.from_private_key_file('tests/test_dss_password.key', 'television') + self.assertEquals('ssh-dss', key.get_name()) + exp_dss = FINGER_DSS.split()[1].replace(':', '') + my_dss = util.hexify(key.get_fingerprint()).lower() + self.assertEquals(exp_dss, my_dss) + self.assertEquals(PUB_DSS.split()[1], key.get_base64()) + self.assertEquals(1024, key.get_bits()) + + def test_6_compare_rsa(self): # verify that the private & public keys compare equal key = RSAKey.from_private_key_file('tests/test_rsa.key') self.assertEquals(key, key) @@ -66,7 +90,7 @@ class KeyTest (unittest.TestCase): self.assert_(not pub.can_sign()) self.assertEquals(key, pub) - def test_4_compare_dss(self): + def test_7_compare_dss(self): # verify that the private & public keys compare equal key = DSSKey.from_private_key_file('tests/test_dss.key') self.assertEquals(key, key) @@ -75,7 +99,7 @@ class KeyTest (unittest.TestCase): self.assert_(not pub.can_sign()) self.assertEquals(key, pub) - def test_5_sign_rsa(self): + def test_8_sign_rsa(self): # verify that the rsa private key can sign and verify key = RSAKey.from_private_key_file('tests/test_rsa.key') msg = key.sign_ssh_data(randpool, 'ice weasels') @@ -87,7 +111,7 @@ class KeyTest (unittest.TestCase): pub = RSAKey(data=str(key)) self.assert_(pub.verify_ssh_sig('ice weasels', msg)) - def test_6_sign_dss(self): + def test_9_sign_dss(self): # verify that the dss private key can sign and verify key = DSSKey.from_private_key_file('tests/test_dss.key') msg = key.sign_ssh_data(randpool, 'ice weasels') @@ -101,12 +125,12 @@ class KeyTest (unittest.TestCase): pub = DSSKey(data=str(key)) self.assert_(pub.verify_ssh_sig('ice weasels', msg)) - def test_7_generate_rsa(self): + def test_A_generate_rsa(self): key = RSAKey.generate(1024) msg = key.sign_ssh_data(randpool, 'jerri blank') self.assert_(key.verify_ssh_sig('jerri blank', msg)) - def test_8_generate_dss(self): + def test_B_generate_dss(self): key = DSSKey.generate(1024) msg = key.sign_ssh_data(randpool, 'jerri blank') self.assert_(key.verify_ssh_sig('jerri blank', msg)) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index 5031f02..280f82d 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -68,9 +68,10 @@ g_big_file_test = True class SFTPTest (unittest.TestCase): def init(hostname, username, keyfile, passwd): - global sftp + global sftp, tc t = paramiko.Transport(hostname) + tc = t try: key = paramiko.RSAKey.from_private_key_file(keyfile, passwd) except paramiko.PasswordRequiredException: diff --git a/tests/test_transport.py b/tests/test_transport.py index 5afc2e1..d25db52 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -1,5 +1,3 @@ -#!/usr/bin/python - # Copyright (C) 2003-2005 Robey Pointer # # This file is part of paramiko. @@ -23,8 +21,9 @@ Some unit tests for the ssh2 protocol in Transport. """ import sys, time, threading, unittest +import select from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ - SSHException, BadAuthenticationType + SSHException, BadAuthenticationType, util from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED from loop import LoopSocket @@ -114,8 +113,16 @@ class TransportTest (unittest.TestCase): self.assert_(False) except TypeError: pass + + def test_2_compute_key(self): + self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929L + self.tc.H = util.unhexify('0C8307CDE6856FF30BA93684EB0F04C2520E9ED3') + self.tc.session_id = self.tc.H + key = self.tc._compute_key('C', 32) + self.assertEquals('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995', + util.hexify(key)) - def test_2_simple(self): + def test_3_simple(self): """ verify that we can establish an ssh link with ourselves across the loopback sockets. this is hardly "simple" but it's simpler than the @@ -142,7 +149,7 @@ class TransportTest (unittest.TestCase): self.assertEquals(True, self.tc.is_authenticated()) self.assertEquals(True, self.ts.is_authenticated()) - def test_3_special(self): + def test_4_special(self): """ verify that the client can demand odd handshake settings, and can renegotiate keys in mid-stream. @@ -171,7 +178,7 @@ class TransportTest (unittest.TestCase): self.assert_(self.tc.renegotiate_keys()) self.ts.send_ignore(1024) - def test_4_keepalive(self): + def test_5_keepalive(self): """ verify that the keepalive will be sent. """ @@ -195,7 +202,7 @@ class TransportTest (unittest.TestCase): time.sleep(2) self.assertEquals('keepalive@lag.net', server._global_request) - def test_5_bad_auth_type(self): + def test_6_bad_auth_type(self): """ verify that we get the right exception when an unsupported auth type is requested. @@ -216,7 +223,7 @@ class TransportTest (unittest.TestCase): self.assertEquals(BadAuthenticationType, etype) self.assertEquals(['publickey'], evalue.allowed_types) - def test_6_bad_password(self): + def test_7_bad_password(self): """ verify that a bad password gets the right exception, and that a retry with the right password works. @@ -241,7 +248,7 @@ class TransportTest (unittest.TestCase): self.assert_(event.isSet()) self.assert_(self.ts.is_active()) - def test_7_multipart_auth(self): + def test_8_multipart_auth(self): """ verify that multipart auth works. """ @@ -263,7 +270,7 @@ class TransportTest (unittest.TestCase): self.assert_(event.isSet()) self.assert_(self.ts.is_active()) - def test_8_exec_command(self): + def test_9_exec_command(self): """ verify that exec_command() does something reasonable. """ @@ -313,7 +320,7 @@ class TransportTest (unittest.TestCase): self.assertEquals('This is on stderr.\n', f.readline()) self.assertEquals('', f.readline()) - def test_9_invoke_shell(self): + def test_A_invoke_shell(self): """ verify that invoke_shell() does something reasonable. """ @@ -340,7 +347,7 @@ class TransportTest (unittest.TestCase): chan.close() self.assertEquals('', f.readline()) - def test_A_exit_status(self): + def test_B_exit_status(self): """ verify that get_exit_status() works. """ @@ -373,3 +380,51 @@ class TransportTest (unittest.TestCase): self.assertEquals('', f.readline()) self.assertEquals(23, chan.recv_exit_status()) chan.close() + + def test_C_select(self): + """ + verify that select() on a channel works. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + self.tc.auth_password(username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + chan = self.tc.open_session() + self.assert_(chan.invoke_shell()) + schan = self.ts.accept(1.0) + + # nothing should be ready + r, w, e = select.select([chan], [], [], 0.1) + self.assertEquals([], r) + self.assertEquals([], w) + self.assertEquals([], e) + + schan.send('hello\n') + + # something should be ready now + r, w, e = select.select([chan], [], [], 0.1) + self.assertEquals([chan], r) + self.assertEquals([], w) + self.assertEquals([], e) + + self.assertEquals('hello\n', chan.recv(6)) + + # and, should be dead again now + r, w, e = select.select([chan], [], [], 0.1) + self.assertEquals([], r) + self.assertEquals([], w) + self.assertEquals([], e) + + chan.close() + schan.close() +