[project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--patch-128]

more unit tests
added unit tests for multi-part auth, exec_command, and invoke_shell.
This commit is contained in:
Robey Pointer 2004-12-12 09:38:24 +00:00
parent 1a32d2b4ef
commit 97eca767a2
1 changed files with 136 additions and 5 deletions

View File

@ -25,21 +25,61 @@ Some unit tests for the ssh2 protocol in Transport.
import sys, unittest, threading import sys, unittest, threading
from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
SSHException, BadAuthenticationType SSHException, BadAuthenticationType
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED
from loop import LoopSocket from loop import LoopSocket
class NullServer (ServerInterface): class NullServer (ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
paranoid_key = DSSKey.from_private_key_file('tests/test_dss.key')
def get_allowed_auths(self, username): def get_allowed_auths(self, username):
if username == 'slowdive': if username == 'slowdive':
return 'publickey,password' return 'publickey,password'
if username == 'paranoid':
if not self.paranoid_did_password and not self.paranoid_did_public_key:
return 'publickey,password'
elif self.paranoid_did_password:
return 'publickey'
else:
return 'password'
return 'publickey' return 'publickey'
def check_auth_password(self, username, password): def check_auth_password(self, username, password):
if (username == 'slowdive') and (password == 'pygmalion'): if (username == 'slowdive') and (password == 'pygmalion'):
return AUTH_SUCCESSFUL return AUTH_SUCCESSFUL
if (username == 'paranoid') and (password == 'paranoid'):
# 2-part auth (even openssh doesn't support this)
self.paranoid_did_password = True
if self.paranoid_did_public_key:
return AUTH_SUCCESSFUL
return AUTH_PARTIALLY_SUCCESSFUL
return AUTH_FAILED return AUTH_FAILED
def check_auth_publickey(self, username, key):
if (username == 'paranoid') and (key == self.paranoid_key):
# 2-part auth
self.paranoid_did_public_key = True
if self.paranoid_did_password:
return AUTH_SUCCESSFUL
return AUTH_PARTIALLY_SUCCESSFUL
return AUTH_FAILED
def check_channel_request(self, kind, chanid):
return OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
if command != 'yes':
return False
self.exec_channel = channel
return True
def check_channel_shell_request(self, channel):
self.shell_channel = channel
return True
class TransportTest (unittest.TestCase): class TransportTest (unittest.TestCase):
@ -86,7 +126,6 @@ class TransportTest (unittest.TestCase):
server = NullServer() server = NullServer()
self.assert_(not event.isSet()) self.assert_(not event.isSet())
self.ts.start_server(event, server) self.ts.start_server(event, server)
self.tc.ultra_debug = True
self.tc.connect(hostkey=public_host_key, self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion') username='slowdive', password='pygmalion')
event.wait(1.0) event.wait(1.0)
@ -105,7 +144,6 @@ class TransportTest (unittest.TestCase):
server = NullServer() server = NullServer()
self.assert_(not event.isSet()) self.assert_(not event.isSet())
self.ts.start_server(event, server) self.ts.start_server(event, server)
self.tc.ultra_debug = True
try: try:
self.tc.connect(hostkey=public_host_key, self.tc.connect(hostkey=public_host_key,
username='unknown', password='error') username='unknown', password='error')
@ -139,5 +177,98 @@ class TransportTest (unittest.TestCase):
event.wait(1.0) event.wait(1.0)
self.assert_(event.isSet()) self.assert_(event.isSet())
self.assert_(self.ts.is_active()) self.assert_(self.ts.is_active())
def test_5_multipart_auth(self):
"""
verify that multipart auth works.
"""
host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
public_host_key = RSAKey(data=str(host_key))
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
self.assert_(not event.isSet())
self.ts.start_server(event, server)
self.tc.ultra_debug = True
self.tc.connect(hostkey=public_host_key)
remain = self.tc.auth_password(username='paranoid', password='paranoid')
self.assertEquals(['publickey'], remain)
key = DSSKey.from_private_key_file('tests/test_dss.key')
remain = self.tc.auth_publickey(username='paranoid', key=key)
self.assertEquals([], remain)
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
def test_6_exec_command(self):
"""
verify that exec_command() does something reasonable.
"""
host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
public_host_key = RSAKey(data=str(host_key))
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
self.assert_(not event.isSet())
self.ts.start_server(event, server)
self.tc.ultra_debug = True
self.tc.connect(hostkey=public_host_key)
self.tc.auth_password(username='slowdive', password='pygmalion')
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
chan = self.tc.open_session()
self.assert_(not chan.exec_command('no'))
chan = self.tc.open_session()
self.assert_(chan.exec_command('yes'))
server.exec_channel.send('Hello there.\n')
server.exec_channel.send_stderr('This is on stderr.\n')
server.exec_channel.close()
f = chan.makefile()
self.assertEquals('Hello there.\n', f.readline())
self.assertEquals('', f.readline())
f = chan.makefile_stderr()
self.assertEquals('This is on stderr.\n', f.readline())
self.assertEquals('', f.readline())
# now try it with combined stdout/stderr
chan = self.tc.open_session()
chan.exec_command('yes')
server.exec_channel.send('Hello there.\n')
server.exec_channel.send_stderr('This is on stderr.\n')
server.exec_channel.close()
chan.set_combine_stderr(True)
f = chan.makefile()
self.assertEquals('Hello there.\n', f.readline())
self.assertEquals('This is on stderr.\n', f.readline())
self.assertEquals('', f.readline())
def test_7_invoke_shell(self):
"""
verify that invoke_shell() does something reasonable.
"""
host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
public_host_key = RSAKey(data=str(host_key))
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
self.assert_(not event.isSet())
self.ts.start_server(event, server)
self.tc.ultra_debug = True
self.tc.connect(hostkey=public_host_key)
self.tc.auth_password(username='slowdive', password='pygmalion')
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
chan = self.tc.open_session()
self.assert_(chan.invoke_shell())
chan.send('communist j. cat\n')
f = server.shell_channel.makefile()
self.assertEquals('communist j. cat\n', f.readline())
chan.close()
self.assertEquals('', f.readline())