Fix new test for Py3 and start server in tests instead of in setUp so we can skip starting server for test 5
This commit is contained in:
parent
b9e62182e5
commit
ae078f51d6
|
@ -21,16 +21,14 @@ Some unit tests for SSHClient.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import socket
|
import socket
|
||||||
|
from tempfile import mkstemp
|
||||||
import threading
|
import threading
|
||||||
import time
|
|
||||||
import unittest
|
import unittest
|
||||||
import weakref
|
import weakref
|
||||||
import warnings
|
import warnings
|
||||||
import os
|
import os
|
||||||
from binascii import hexlify
|
|
||||||
from tests.util import test_path
|
from tests.util import test_path
|
||||||
import paramiko
|
import paramiko
|
||||||
from paramiko.py3compat import b
|
|
||||||
|
|
||||||
|
|
||||||
class NullServer (paramiko.ServerInterface):
|
class NullServer (paramiko.ServerInterface):
|
||||||
|
@ -67,8 +65,6 @@ class SSHClientTest (unittest.TestCase):
|
||||||
self.sockl.listen(1)
|
self.sockl.listen(1)
|
||||||
self.addr, self.port = self.sockl.getsockname()
|
self.addr, self.port = self.sockl.getsockname()
|
||||||
self.event = threading.Event()
|
self.event = threading.Event()
|
||||||
thread = threading.Thread(target=self._run)
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
for attr in "tc ts socks sockl".split():
|
for attr in "tc ts socks sockl".split():
|
||||||
|
@ -83,11 +79,11 @@ class SSHClientTest (unittest.TestCase):
|
||||||
server = NullServer()
|
server = NullServer()
|
||||||
self.ts.start_server(self.event, server)
|
self.ts.start_server(self.event, server)
|
||||||
|
|
||||||
|
|
||||||
def test_1_client(self):
|
def test_1_client(self):
|
||||||
"""
|
"""
|
||||||
verify that the SSHClient stuff works too.
|
verify that the SSHClient stuff works too.
|
||||||
"""
|
"""
|
||||||
|
threading.Thread(target=self._run).start()
|
||||||
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
||||||
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
||||||
|
|
||||||
|
@ -121,6 +117,7 @@ class SSHClientTest (unittest.TestCase):
|
||||||
"""
|
"""
|
||||||
verify that SSHClient works with a DSA key.
|
verify that SSHClient works with a DSA key.
|
||||||
"""
|
"""
|
||||||
|
threading.Thread(target=self._run).start()
|
||||||
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
||||||
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
||||||
|
|
||||||
|
@ -154,6 +151,7 @@ class SSHClientTest (unittest.TestCase):
|
||||||
"""
|
"""
|
||||||
verify that SSHClient accepts and tries multiple key files.
|
verify that SSHClient accepts and tries multiple key files.
|
||||||
"""
|
"""
|
||||||
|
threading.Thread(target=self._run).start()
|
||||||
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
||||||
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
||||||
|
|
||||||
|
@ -171,6 +169,7 @@ class SSHClientTest (unittest.TestCase):
|
||||||
"""
|
"""
|
||||||
verify that SSHClient's AutoAddPolicy works.
|
verify that SSHClient's AutoAddPolicy works.
|
||||||
"""
|
"""
|
||||||
|
threading.Thread(target=self._run).start()
|
||||||
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
||||||
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
||||||
|
|
||||||
|
@ -193,9 +192,10 @@ class SSHClientTest (unittest.TestCase):
|
||||||
"""
|
"""
|
||||||
warnings.filterwarnings('ignore', 'tempnam.*')
|
warnings.filterwarnings('ignore', 'tempnam.*')
|
||||||
|
|
||||||
host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
|
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
||||||
public_host_key = paramiko.RSAKey(data=str(host_key))
|
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
||||||
localname = os.tempnam()
|
fd, localname = mkstemp()
|
||||||
|
os.close(fd)
|
||||||
|
|
||||||
client = paramiko.SSHClient()
|
client = paramiko.SSHClient()
|
||||||
self.assertEquals(0, len(client.get_host_keys()))
|
self.assertEquals(0, len(client.get_host_keys()))
|
||||||
|
@ -218,6 +218,7 @@ class SSHClientTest (unittest.TestCase):
|
||||||
verify that when an SSHClient is collected, its transport (and the
|
verify that when an SSHClient is collected, its transport (and the
|
||||||
transport's packetizer) is closed.
|
transport's packetizer) is closed.
|
||||||
"""
|
"""
|
||||||
|
threading.Thread(target=self._run).start()
|
||||||
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
||||||
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue