add another test to check out private key auth.
This commit is contained in:
parent
f384749a8c
commit
ecb8ffe373
|
@ -25,6 +25,7 @@ import threading
|
||||||
import time
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
import weakref
|
import weakref
|
||||||
|
from binascii import hexlify
|
||||||
|
|
||||||
import paramiko
|
import paramiko
|
||||||
|
|
||||||
|
@ -41,6 +42,11 @@ class NullServer (paramiko.ServerInterface):
|
||||||
return paramiko.AUTH_SUCCESSFUL
|
return paramiko.AUTH_SUCCESSFUL
|
||||||
return paramiko.AUTH_FAILED
|
return paramiko.AUTH_FAILED
|
||||||
|
|
||||||
|
def check_auth_publickey(self, username, key):
|
||||||
|
if (key.get_name() == 'ssh-dss') and (hexlify(key.get_fingerprint()) == '4478f0b9a23cc5182009ff755bc1d26c'):
|
||||||
|
return paramiko.AUTH_SUCCESSFUL
|
||||||
|
return paramiko.AUTH_FAILED
|
||||||
|
|
||||||
def check_channel_request(self, kind, chanid):
|
def check_channel_request(self, kind, chanid):
|
||||||
return paramiko.OPEN_SUCCEEDED
|
return paramiko.OPEN_SUCCEEDED
|
||||||
|
|
||||||
|
@ -109,8 +115,41 @@ class SSHClientTest (unittest.TestCase):
|
||||||
stdin.close()
|
stdin.close()
|
||||||
stdout.close()
|
stdout.close()
|
||||||
stderr.close()
|
stderr.close()
|
||||||
|
|
||||||
|
def test_2_client_dsa(self):
|
||||||
|
"""
|
||||||
|
verify that SSHClient works with a DSA key.
|
||||||
|
"""
|
||||||
|
host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
|
||||||
|
public_host_key = paramiko.RSAKey(data=str(host_key))
|
||||||
|
|
||||||
|
self.tc = paramiko.SSHClient()
|
||||||
|
self.tc.get_host_keys().add(self.addr, 'ssh-rsa', public_host_key)
|
||||||
|
self.tc.connect(self.addr, self.port, username='slowdive', key_filename='tests/test_dss.key')
|
||||||
|
|
||||||
def test_2_auto_add_policy(self):
|
self.event.wait(1.0)
|
||||||
|
self.assert_(self.event.isSet())
|
||||||
|
self.assert_(self.ts.is_active())
|
||||||
|
self.assertEquals('slowdive', self.ts.get_username())
|
||||||
|
self.assertEquals(True, self.ts.is_authenticated())
|
||||||
|
|
||||||
|
stdin, stdout, stderr = self.tc.exec_command('yes')
|
||||||
|
schan = self.ts.accept(1.0)
|
||||||
|
|
||||||
|
schan.send('Hello there.\n')
|
||||||
|
schan.send_stderr('This is on stderr.\n')
|
||||||
|
schan.close()
|
||||||
|
|
||||||
|
self.assertEquals('Hello there.\n', stdout.readline())
|
||||||
|
self.assertEquals('', stdout.readline())
|
||||||
|
self.assertEquals('This is on stderr.\n', stderr.readline())
|
||||||
|
self.assertEquals('', stderr.readline())
|
||||||
|
|
||||||
|
stdin.close()
|
||||||
|
stdout.close()
|
||||||
|
stderr.close()
|
||||||
|
|
||||||
|
def test_3_auto_add_policy(self):
|
||||||
"""
|
"""
|
||||||
verify that SSHClient's AutoAddPolicy works.
|
verify that SSHClient's AutoAddPolicy works.
|
||||||
"""
|
"""
|
||||||
|
@ -130,7 +169,7 @@ class SSHClientTest (unittest.TestCase):
|
||||||
self.assertEquals(1, len(self.tc.get_host_keys()))
|
self.assertEquals(1, len(self.tc.get_host_keys()))
|
||||||
self.assertEquals(public_host_key, self.tc.get_host_keys()[self.addr]['ssh-rsa'])
|
self.assertEquals(public_host_key, self.tc.get_host_keys()[self.addr]['ssh-rsa'])
|
||||||
|
|
||||||
def test_3_cleanup(self):
|
def test_4_cleanup(self):
|
||||||
"""
|
"""
|
||||||
verify that when an SSHClient is collected, its transport (and the
|
verify that when an SSHClient is collected, its transport (and the
|
||||||
transport's packetizer) is closed.
|
transport's packetizer) is closed.
|
||||||
|
|
Loading…
Reference in New Issue