[project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--patch-133]

unit test madness
add some more testy bits and fix up some other bits.
This commit is contained in:
Robey Pointer 2004-12-13 07:32:14 +00:00
parent 1cf0d33f1d
commit b2eb38483c
1 changed files with 54 additions and 15 deletions

View File

@ -73,11 +73,9 @@ class NullServer (ServerInterface):
def check_channel_exec_request(self, channel, command): def check_channel_exec_request(self, channel, command):
if command != 'yes': if command != 'yes':
return False return False
self.exec_channel = channel
return True return True
def check_channel_shell_request(self, channel): def check_channel_shell_request(self, channel):
self.shell_channel = channel
return True return True
@ -125,14 +123,51 @@ class TransportTest (unittest.TestCase):
event = threading.Event() event = threading.Event()
server = NullServer() server = NullServer()
self.assert_(not event.isSet()) self.assert_(not event.isSet())
self.assertEquals(None, self.tc.get_username())
self.assertEquals(None, self.ts.get_username())
self.assertEquals(False, self.tc.is_authenticated())
self.assertEquals(False, self.ts.is_authenticated())
self.ts.start_server(event, server) self.ts.start_server(event, server)
self.tc.connect(hostkey=public_host_key, self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion') username='slowdive', password='pygmalion')
event.wait(1.0) event.wait(1.0)
self.assert_(event.isSet()) self.assert_(event.isSet())
self.assert_(self.ts.is_active()) self.assert_(self.ts.is_active())
self.assertEquals('slowdive', self.tc.get_username())
self.assertEquals('slowdive', self.ts.get_username())
self.assertEquals(True, self.tc.is_authenticated())
self.assertEquals(True, self.ts.is_authenticated())
def test_3_bad_auth_type(self): def test_3_special(self):
"""
verify that the client can demand odd handshake settings, and can
renegotiate keys in mid-stream.
"""
host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
public_host_key = RSAKey(data=str(host_key))
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
self.assert_(not event.isSet())
self.ts.start_server(event, server)
options = self.tc.get_security_options()
options.ciphers = ('aes256-cbc',)
options.digests = ('hmac-md5-96',)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
self.assertEquals('aes256-cbc', self.tc.local_cipher)
self.assertEquals('aes256-cbc', self.tc.remote_cipher)
self.assertEquals(12, self.tc.local_mac_len)
self.assertEquals(12, self.tc.remote_mac_len)
self.tc.send_ignore(1024)
self.assert_(self.tc.renegotiate_keys())
self.ts.send_ignore(1024)
def test_4_bad_auth_type(self):
""" """
verify that we get the right exception when an unsupported auth verify that we get the right exception when an unsupported auth
type is requested. type is requested.
@ -153,7 +188,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals(BadAuthenticationType, etype) self.assertEquals(BadAuthenticationType, etype)
self.assertEquals(['publickey'], evalue.allowed_types) self.assertEquals(['publickey'], evalue.allowed_types)
def test_4_bad_password(self): def test_5_bad_password(self):
""" """
verify that a bad password gets the right exception, and that a retry verify that a bad password gets the right exception, and that a retry
with the right password works. with the right password works.
@ -178,7 +213,7 @@ class TransportTest (unittest.TestCase):
self.assert_(event.isSet()) self.assert_(event.isSet())
self.assert_(self.ts.is_active()) self.assert_(self.ts.is_active())
def test_5_multipart_auth(self): def test_6_multipart_auth(self):
""" """
verify that multipart auth works. verify that multipart auth works.
""" """
@ -200,7 +235,7 @@ class TransportTest (unittest.TestCase):
self.assert_(event.isSet()) self.assert_(event.isSet())
self.assert_(self.ts.is_active()) self.assert_(self.ts.is_active())
def test_6_exec_command(self): def test_7_exec_command(self):
""" """
verify that exec_command() does something reasonable. verify that exec_command() does something reasonable.
""" """
@ -219,13 +254,15 @@ class TransportTest (unittest.TestCase):
self.assert_(self.ts.is_active()) self.assert_(self.ts.is_active())
chan = self.tc.open_session() chan = self.tc.open_session()
schan = self.ts.accept(1.0)
self.assert_(not chan.exec_command('no')) self.assert_(not chan.exec_command('no'))
chan = self.tc.open_session() chan = self.tc.open_session()
self.assert_(chan.exec_command('yes')) self.assert_(chan.exec_command('yes'))
server.exec_channel.send('Hello there.\n') schan = self.ts.accept(1.0)
server.exec_channel.send_stderr('This is on stderr.\n') schan.send('Hello there.\n')
server.exec_channel.close() schan.send_stderr('This is on stderr.\n')
schan.close()
f = chan.makefile() f = chan.makefile()
self.assertEquals('Hello there.\n', f.readline()) self.assertEquals('Hello there.\n', f.readline())
@ -236,10 +273,11 @@ class TransportTest (unittest.TestCase):
# now try it with combined stdout/stderr # now try it with combined stdout/stderr
chan = self.tc.open_session() chan = self.tc.open_session()
chan.exec_command('yes') self.assert_(chan.exec_command('yes'))
server.exec_channel.send('Hello there.\n') schan = self.ts.accept(1.0)
server.exec_channel.send_stderr('This is on stderr.\n') schan.send('Hello there.\n')
server.exec_channel.close() schan.send_stderr('This is on stderr.\n')
schan.close()
chan.set_combine_stderr(True) chan.set_combine_stderr(True)
f = chan.makefile() f = chan.makefile()
@ -247,7 +285,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals('This is on stderr.\n', f.readline()) self.assertEquals('This is on stderr.\n', f.readline())
self.assertEquals('', f.readline()) self.assertEquals('', f.readline())
def test_7_invoke_shell(self): def test_8_invoke_shell(self):
""" """
verify that invoke_shell() does something reasonable. verify that invoke_shell() does something reasonable.
""" """
@ -267,8 +305,9 @@ class TransportTest (unittest.TestCase):
chan = self.tc.open_session() chan = self.tc.open_session()
self.assert_(chan.invoke_shell()) self.assert_(chan.invoke_shell())
schan = self.ts.accept(1.0)
chan.send('communist j. cat\n') chan.send('communist j. cat\n')
f = server.shell_channel.makefile() f = schan.makefile()
self.assertEquals('communist j. cat\n', f.readline()) self.assertEquals('communist j. cat\n', f.readline())
chan.close() chan.close()
self.assertEquals('', f.readline()) self.assertEquals('', f.readline())