From 7444a999931cddc1e61bb35270468aa45da2687e Mon Sep 17 00:00:00 2001 From: Scott Maxwell Date: Sat, 2 Nov 2013 20:18:18 -0700 Subject: [PATCH] Fix some deprecation and resource warnings --- demos/demo_server.py | 4 +- demos/demo_sftp.py | 8 +- paramiko/hostkeys.py | 50 +++++----- paramiko/pkey.py | 22 +++-- paramiko/primes.py | 20 ++-- paramiko/py3compat.py | 12 ++- tests/test_auth.py | 34 +++---- tests/test_buffered_pipe.py | 28 +++--- tests/test_client.py | 60 ++++++------ tests/test_file.py | 12 +-- tests/test_hostkeys.py | 48 +++++----- tests/test_kex.py | 82 ++++++++--------- tests/test_message.py | 54 +++++------ tests/test_packetizer.py | 12 +-- tests/test_pkey.py | 112 +++++++++++------------ tests/test_sftp.py | 4 +- tests/test_sftp_big.py | 1 + tests/test_transport.py | 176 ++++++++++++++++++------------------ tests/test_util.py | 34 +++---- 19 files changed, 398 insertions(+), 375 deletions(-) diff --git a/demos/demo_server.py b/demos/demo_server.py index deb2138..5a41a71 100644 --- a/demos/demo_server.py +++ b/demos/demo_server.py @@ -27,7 +27,7 @@ import threading import traceback import paramiko -from paramiko.py3compat import b, u +from paramiko.py3compat import b, u, decodebytes # setup logging @@ -46,7 +46,7 @@ class Server (paramiko.ServerInterface): 'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC' + \ 'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT' + \ 'UWT10hcuO4Ks8=') - good_pub_key = paramiko.RSAKey(data=base64.decodestring(data)) + good_pub_key = paramiko.RSAKey(data=decodebytes(data)) def __init__(self): self.event = threading.Event() diff --git a/demos/demo_sftp.py b/demos/demo_sftp.py index 2dba172..d7f2808 100755 --- a/demos/demo_sftp.py +++ b/demos/demo_sftp.py @@ -20,6 +20,8 @@ # based on code provided by raymond mosteller (thanks!) +from __future__ import with_statement + import base64 import getpass import os @@ -95,13 +97,15 @@ try: except IOError: print('(assuming demo_sftp_folder/ already exists)') sftp.open('demo_sftp_folder/README', 'w').write('This was created by demo_sftp.py.\n') - data = open('demo_sftp.py', 'r').read() + with open('demo_sftp.py', 'r') as f: + data = f.read() sftp.open('demo_sftp_folder/demo_sftp.py', 'w').write(data) print('created demo_sftp_folder/ on the server') # copy the README back here data = sftp.open('demo_sftp_folder/README', 'r').read() - open('README_demo_sftp', 'w').write(data) + with open('README_demo_sftp', 'w') as f: + f.write(data) print('copied README back here') # BETTER: use the get() and put() methods diff --git a/paramiko/hostkeys.py b/paramiko/hostkeys.py index a7f9b43..9da883e 100644 --- a/paramiko/hostkeys.py +++ b/paramiko/hostkeys.py @@ -83,11 +83,11 @@ class HostKeyEntry: try: key = b(key) if keytype == 'ssh-rsa': - key = RSAKey(data=base64.decodestring(key)) + key = RSAKey(data=decodebytes(key)) elif keytype == 'ssh-dss': - key = DSSKey(data=base64.decodestring(key)) + key = DSSKey(data=decodebytes(key)) elif keytype == 'ecdsa-sha2-nistp256': - key = ECDSAKey(data=base64.decodestring(key)) + key = ECDSAKey(data=decodebytes(key)) else: log.info("Unable to handle key of type %s" % (keytype,)) return None @@ -173,19 +173,21 @@ class HostKeys (MutableMapping): @raise IOError: if there was an error reading the file """ f = open(filename, 'r') - for lineno, line in enumerate(f): - line = line.strip() - if (len(line) == 0) or (line[0] == '#'): - continue - e = HostKeyEntry.from_line(line, lineno) - if e is not None: - _hostnames = e.hostnames - for h in _hostnames: - if self.check(h, e.key): - e.hostnames.remove(h) - if len(e.hostnames): - self._entries.append(e) - f.close() + try: + for lineno, line in enumerate(f): + line = line.strip() + if (len(line) == 0) or (line[0] == '#'): + continue + e = HostKeyEntry.from_line(line, lineno) + if e is not None: + _hostnames = e.hostnames + for h in _hostnames: + if self.check(h, e.key): + e.hostnames.remove(h) + if len(e.hostnames): + self._entries.append(e) + finally: + f.close() def save(self, filename): """ @@ -202,11 +204,13 @@ class HostKeys (MutableMapping): @since: 1.6.1 """ f = open(filename, 'w') - for e in self._entries: - line = e.to_line() - if line: - f.write(line) - f.close() + try: + for e in self._entries: + line = e.to_line() + if line: + f.write(line) + finally: + f.close() def lookup(self, hostname): """ @@ -362,10 +366,10 @@ class HostKeys (MutableMapping): else: if salt.startswith('|1|'): salt = salt.split('|')[2] - salt = base64.decodestring(b(salt)) + salt = decodebytes(b(salt)) assert len(salt) == SHA.digest_size hmac = HMAC.HMAC(salt, b(hostname), SHA).digest() - hostkey = '|1|%s|%s' % (u(base64.encodestring(salt)), u(base64.encodestring(hmac))) + hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac))) return hostkey.replace('\n', '') hash_host = staticmethod(hash_host) diff --git a/paramiko/pkey.py b/paramiko/pkey.py index 53361d0..3d786ae 100644 --- a/paramiko/pkey.py +++ b/paramiko/pkey.py @@ -148,7 +148,7 @@ class PKey (object): @return: a base64 string containing the public part of the key. @rtype: str """ - return u(base64.encodestring(self.asbytes())).replace('\n', '') + return u(encodebytes(self.asbytes())).replace('\n', '') def sign_ssh_data(self, rng, data): """ @@ -283,8 +283,10 @@ class PKey (object): @raise SSHException: if the key file is invalid. """ f = open(filename, 'r') - data = self._read_private_key(tag, f, password) - f.close() + try: + data = self._read_private_key(tag, f, password) + finally: + f.close() return data def _read_private_key(self, tag, f, password=None): @@ -309,7 +311,7 @@ class PKey (object): end += 1 # if we trudged to the end of the file, just try to cope. try: - data = base64.decodestring(b(''.join(lines[start:end]))) + data = decodebytes(b(''.join(lines[start:end]))) except base64.binascii.Error: raise SSHException('base64 decoding error: ' + str(sys.exc_info()[1])) if 'proc-type' not in headers: @@ -353,10 +355,12 @@ class PKey (object): @raise IOError: if there was an error writing the file. """ f = open(filename, 'w', o600) - # grrr... the mode doesn't always take hold - os.chmod(filename, o600) - self._write_private_key(tag, f, data, password) - f.close() + try: + # grrr... the mode doesn't always take hold + os.chmod(filename, o600) + self._write_private_key(tag, f, data, password) + finally: + f.close() def _write_private_key(self, tag, f, data, password=None): f.write('-----BEGIN %s PRIVATE KEY-----\n' % tag) @@ -378,7 +382,7 @@ class PKey (object): f.write('Proc-Type: 4,ENCRYPTED\n') f.write('DEK-Info: %s,%s\n' % (cipher_name, u(hexlify(salt)).upper())) f.write('\n') - s = u(base64.encodestring(data)) + s = u(encodebytes(data)) # re-wrap to 64-char lines s = ''.join(s.split('\n')) s = '\n'.join([s[i : i+64] for i in range(0, len(s), 64)]) diff --git a/paramiko/primes.py b/paramiko/primes.py index 4db6d52..13ec52d 100644 --- a/paramiko/primes.py +++ b/paramiko/primes.py @@ -114,15 +114,17 @@ class ModulusPack (object): """ self.pack = {} f = open(filename, 'r') - for line in f: - line = line.strip() - if (len(line) == 0) or (line[0] == '#'): - continue - try: - self._parse_modulus(line) - except: - continue - f.close() + try: + for line in f: + line = line.strip() + if (len(line) == 0) or (line[0] == '#'): + continue + try: + self._parse_modulus(line) + except: + continue + finally: + f.close() def get_modulus(self, min, prefer, max): bitsizes = sorted(self.pack.keys()) diff --git a/paramiko/py3compat.py b/paramiko/py3compat.py index 8a01ba0..0aad361 100644 --- a/paramiko/py3compat.py +++ b/paramiko/py3compat.py @@ -1,6 +1,9 @@ import sys +import base64 -__all__ = ['PY3', 'string_types', 'integer_types', 'text_type', 'bytes_types', 'bytes', 'long', 'input', 'bytestring', 'byte_ord', 'byte_chr', 'byte_mask', 'b', 'u', 'b2s', 'StringIO', 'BytesIO', 'is_callable', 'MAXSIZE', 'next'] +__all__ = ['PY3', 'string_types', 'integer_types', 'text_type', 'bytes_types', 'bytes', 'long', 'input', + 'decodebytes', 'encodebytes', 'bytestring', 'byte_ord', 'byte_chr', 'byte_mask', + 'b', 'u', 'b2s', 'StringIO', 'BytesIO', 'is_callable', 'MAXSIZE', 'next'] PY3 = sys.version_info[0] >= 3 @@ -12,8 +15,11 @@ if PY3: bytes = bytes bytes_types = bytes integer_types = int - long = int + class long(int): + pass input = input + decodebytes = base64.decodebytes + encodebytes = base64.encodebytes def bytestring(s): return s @@ -72,6 +78,8 @@ else: integer_types = (int, long) long = long input = raw_input + decodebytes = base64.decodestring + encodebytes = base64.encodestring def bytestring(s): # NOQA if isinstance(s, unicode): diff --git a/tests/test_auth.py b/tests/test_auth.py index 586289b..d26b180 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -119,13 +119,13 @@ class AuthTest (unittest.TestCase): self.ts.add_server_key(host_key) self.event = threading.Event() self.server = NullServer() - self.assert_(not self.event.isSet()) + self.assertTrue(not self.event.isSet()) self.ts.start_server(self.event, self.server) def verify_finished(self): self.event.wait(1.0) - self.assert_(self.event.isSet()) - self.assert_(self.ts.is_active()) + self.assertTrue(self.event.isSet()) + self.assertTrue(self.ts.is_active()) def test_1_bad_auth_type(self): """ @@ -136,11 +136,11 @@ class AuthTest (unittest.TestCase): try: self.tc.connect(hostkey=self.public_host_key, username='unknown', password='error') - self.assert_(False) + self.assertTrue(False) except: etype, evalue, etb = sys.exc_info() - self.assertEquals(BadAuthenticationType, etype) - self.assertEquals(['publickey'], evalue.allowed_types) + self.assertEqual(BadAuthenticationType, etype) + self.assertEqual(['publickey'], evalue.allowed_types) def test_2_bad_password(self): """ @@ -151,10 +151,10 @@ class AuthTest (unittest.TestCase): self.tc.connect(hostkey=self.public_host_key) try: self.tc.auth_password(username='slowdive', password='error') - self.assert_(False) + self.assertTrue(False) except: etype, evalue, etb = sys.exc_info() - self.assert_(issubclass(etype, AuthenticationException)) + self.assertTrue(issubclass(etype, AuthenticationException)) self.tc.auth_password(username='slowdive', password='pygmalion') self.verify_finished() @@ -165,10 +165,10 @@ class AuthTest (unittest.TestCase): self.start_server() self.tc.connect(hostkey=self.public_host_key) remain = self.tc.auth_password(username='paranoid', password='paranoid') - self.assertEquals(['publickey'], remain) + self.assertEqual(['publickey'], remain) key = DSSKey.from_private_key_file(test_path('test_dss.key')) remain = self.tc.auth_publickey(username='paranoid', key=key) - self.assertEquals([], remain) + self.assertEqual([], remain) self.verify_finished() def test_4_interactive_auth(self): @@ -184,9 +184,9 @@ class AuthTest (unittest.TestCase): self.got_prompts = prompts return ['cat'] remain = self.tc.auth_interactive('commie', handler) - self.assertEquals(self.got_title, 'password') - self.assertEquals(self.got_prompts, [('Password', False)]) - self.assertEquals([], remain) + self.assertEqual(self.got_title, 'password') + self.assertEqual(self.got_prompts, [('Password', False)]) + self.assertEqual([], remain) self.verify_finished() def test_5_interactive_auth_fallback(self): @@ -197,7 +197,7 @@ class AuthTest (unittest.TestCase): self.start_server() self.tc.connect(hostkey=self.public_host_key) remain = self.tc.auth_password('commie', 'cat') - self.assertEquals([], remain) + self.assertEqual([], remain) self.verify_finished() def test_6_auth_utf8(self): @@ -207,7 +207,7 @@ class AuthTest (unittest.TestCase): self.start_server() self.tc.connect(hostkey=self.public_host_key) remain = self.tc.auth_password('utf8', _pwd) - self.assertEquals([], remain) + self.assertEqual([], remain) self.verify_finished() def test_7_auth_non_utf8(self): @@ -218,7 +218,7 @@ class AuthTest (unittest.TestCase): self.start_server() self.tc.connect(hostkey=self.public_host_key) remain = self.tc.auth_password('non-utf8', '\xff') - self.assertEquals([], remain) + self.assertEqual([], remain) self.verify_finished() def test_8_auth_gets_disconnected(self): @@ -232,4 +232,4 @@ class AuthTest (unittest.TestCase): remain = self.tc.auth_password('bad-server', 'hello') except: etype, evalue, etb = sys.exc_info() - self.assert_(issubclass(etype, AuthenticationException)) + self.assertTrue(issubclass(etype, AuthenticationException)) diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py index 7f48b70..5a088d8 100644 --- a/tests/test_buffered_pipe.py +++ b/tests/test_buffered_pipe.py @@ -45,39 +45,39 @@ def close_thread(pipe): class BufferedPipeTest(ParamikoTest): def test_1_buffered_pipe(self): p = BufferedPipe() - self.assert_(not p.read_ready()) + self.assertTrue(not p.read_ready()) p.feed('hello.') - self.assert_(p.read_ready()) + self.assertTrue(p.read_ready()) data = p.read(6) - self.assertEquals(b('hello.'), data) + self.assertEqual(b('hello.'), data) p.feed('plus/minus') - self.assertEquals(b('plu'), p.read(3)) - self.assertEquals(b('s/m'), p.read(3)) - self.assertEquals(b('inus'), p.read(4)) + self.assertEqual(b('plu'), p.read(3)) + self.assertEqual(b('s/m'), p.read(3)) + self.assertEqual(b('inus'), p.read(4)) p.close() - self.assert_(not p.read_ready()) - self.assertEquals(b(''), p.read(1)) + self.assertTrue(not p.read_ready()) + self.assertEqual(b(''), p.read(1)) def test_2_delay(self): p = BufferedPipe() - self.assert_(not p.read_ready()) + self.assertTrue(not p.read_ready()) threading.Thread(target=delay_thread, args=(p,)).start() - self.assertEquals(b('a'), p.read(1, 0.1)) + self.assertEqual(b('a'), p.read(1, 0.1)) try: p.read(1, 0.1) - self.assert_(False) + self.assertTrue(False) except PipeTimeout: pass - self.assertEquals(b('b'), p.read(1, 1.0)) - self.assertEquals(b(''), p.read(1)) + self.assertEqual(b('b'), p.read(1, 1.0)) + self.assertEqual(b(''), p.read(1)) def test_3_close_while_reading(self): p = BufferedPipe() threading.Thread(target=close_thread, args=(p,)).start() data = p.read(1, 1.0) - self.assertEquals(b(''), data) + self.assertEqual(b(''), data) def test_4_or_pipe(self): p = pipe.make_pipe() diff --git a/tests/test_client.py b/tests/test_client.py index b77b90d..e96a426 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -95,10 +95,10 @@ class SSHClientTest (unittest.TestCase): self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') self.event.wait(1.0) - self.assert_(self.event.isSet()) - self.assert_(self.ts.is_active()) - self.assertEquals('slowdive', self.ts.get_username()) - self.assertEquals(True, self.ts.is_authenticated()) + self.assertTrue(self.event.isSet()) + self.assertTrue(self.ts.is_active()) + self.assertEqual('slowdive', self.ts.get_username()) + self.assertEqual(True, self.ts.is_authenticated()) stdin, stdout, stderr = self.tc.exec_command('yes') schan = self.ts.accept(1.0) @@ -107,10 +107,10 @@ class SSHClientTest (unittest.TestCase): schan.send_stderr('This is on stderr.\n') schan.close() - self.assertEquals('Hello there.\n', stdout.readline()) - self.assertEquals('', stdout.readline()) - self.assertEquals('This is on stderr.\n', stderr.readline()) - self.assertEquals('', stderr.readline()) + self.assertEqual('Hello there.\n', stdout.readline()) + self.assertEqual('', stdout.readline()) + self.assertEqual('This is on stderr.\n', stderr.readline()) + self.assertEqual('', stderr.readline()) stdin.close() stdout.close() @@ -128,10 +128,10 @@ class SSHClientTest (unittest.TestCase): self.tc.connect(self.addr, self.port, username='slowdive', key_filename=test_path('test_dss.key')) self.event.wait(1.0) - self.assert_(self.event.isSet()) - self.assert_(self.ts.is_active()) - self.assertEquals('slowdive', self.ts.get_username()) - self.assertEquals(True, self.ts.is_authenticated()) + self.assertTrue(self.event.isSet()) + self.assertTrue(self.ts.is_active()) + self.assertEqual('slowdive', self.ts.get_username()) + self.assertEqual(True, self.ts.is_authenticated()) stdin, stdout, stderr = self.tc.exec_command('yes') schan = self.ts.accept(1.0) @@ -140,10 +140,10 @@ class SSHClientTest (unittest.TestCase): schan.send_stderr('This is on stderr.\n') schan.close() - self.assertEquals('Hello there.\n', stdout.readline()) - self.assertEquals('', stdout.readline()) - self.assertEquals('This is on stderr.\n', stderr.readline()) - self.assertEquals('', stderr.readline()) + self.assertEqual('Hello there.\n', stdout.readline()) + self.assertEqual('', stdout.readline()) + self.assertEqual('This is on stderr.\n', stderr.readline()) + self.assertEqual('', stderr.readline()) stdin.close() stdout.close() @@ -161,10 +161,10 @@ class SSHClientTest (unittest.TestCase): self.tc.connect(self.addr, self.port, username='slowdive', key_filename=[ test_path('test_rsa.key'), test_path('test_dss.key') ]) self.event.wait(1.0) - self.assert_(self.event.isSet()) - self.assert_(self.ts.is_active()) - self.assertEquals('slowdive', self.ts.get_username()) - self.assertEquals(True, self.ts.is_authenticated()) + self.assertTrue(self.event.isSet()) + self.assertTrue(self.ts.is_active()) + self.assertEqual('slowdive', self.ts.get_username()) + self.assertEqual(True, self.ts.is_authenticated()) def test_4_auto_add_policy(self): """ @@ -175,16 +175,16 @@ class SSHClientTest (unittest.TestCase): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.assertEquals(0, len(self.tc.get_host_keys())) + self.assertEqual(0, len(self.tc.get_host_keys())) self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') self.event.wait(1.0) - self.assert_(self.event.isSet()) - self.assert_(self.ts.is_active()) - self.assertEquals('slowdive', self.ts.get_username()) - self.assertEquals(True, self.ts.is_authenticated()) - self.assertEquals(1, len(self.tc.get_host_keys())) - self.assertEquals(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa']) + self.assertTrue(self.event.isSet()) + self.assertTrue(self.ts.is_active()) + self.assertEqual('slowdive', self.ts.get_username()) + self.assertEqual(True, self.ts.is_authenticated()) + self.assertEqual(1, len(self.tc.get_host_keys())) + self.assertEqual(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa']) def test_5_cleanup(self): """ @@ -196,12 +196,12 @@ class SSHClientTest (unittest.TestCase): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.assertEquals(0, len(self.tc.get_host_keys())) + self.assertEqual(0, len(self.tc.get_host_keys())) self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') self.event.wait(1.0) - self.assert_(self.event.isSet()) - self.assert_(self.ts.is_active()) + self.assertTrue(self.event.isSet()) + self.assertTrue(self.ts.is_active()) p = weakref.ref(self.tc._transport.packetizer) self.assertTrue(p() is not None) diff --git a/tests/test_file.py b/tests/test_file.py index 0430040..33a4913 100755 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -54,7 +54,7 @@ class BufferedFileTest (unittest.TestCase): f = LoopbackFile('r') try: f.write('hi') - self.assert_(False, 'no exception on write to read-only file') + self.assertTrue(False, 'no exception on write to read-only file') except: pass f.close() @@ -62,7 +62,7 @@ class BufferedFileTest (unittest.TestCase): f = LoopbackFile('w') try: f.read(1) - self.assert_(False, 'no exception to read from write-only file') + self.assertTrue(False, 'no exception to read from write-only file') except: pass f.close() @@ -81,12 +81,12 @@ class BufferedFileTest (unittest.TestCase): f.close() try: f.readline() - self.assert_(False, 'no exception on readline of closed file') + self.assertTrue(False, 'no exception on readline of closed file') except IOError: pass - self.assert_(linefeed_byte in f.newlines) - self.assert_(crlf in f.newlines) - self.assert_(cr_byte not in f.newlines) + self.assertTrue(linefeed_byte in f.newlines) + self.assertTrue(crlf in f.newlines) + self.assertTrue(cr_byte not in f.newlines) def test_3_lf(self): """ diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index 66b41b6..a7621d8 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -25,7 +25,7 @@ from binascii import hexlify import os import unittest import paramiko -from paramiko.py3compat import b +from paramiko.py3compat import b, decodebytes test_hosts_file = """\ @@ -65,42 +65,42 @@ class HostKeysTest (unittest.TestCase): def test_1_load(self): hostdict = paramiko.HostKeys('hostfile.temp') - self.assertEquals(2, len(hostdict)) - self.assertEquals(1, len(list(hostdict.values())[0])) - self.assertEquals(1, len(list(hostdict.values())[1])) + self.assertEqual(2, len(hostdict)) + self.assertEqual(1, len(list(hostdict.values())[0])) + self.assertEqual(1, len(list(hostdict.values())[1])) fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper() - self.assertEquals(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp) + self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp) def test_2_add(self): hostdict = paramiko.HostKeys('hostfile.temp') hh = '|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c=' - key = paramiko.RSAKey(data=base64.decodestring(keyblob)) + key = paramiko.RSAKey(data=decodebytes(keyblob)) hostdict.add(hh, 'ssh-rsa', key) - self.assertEquals(3, len(list(hostdict))) + self.assertEqual(3, len(list(hostdict))) x = hostdict['foo.example.com'] fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper() - self.assertEquals(b('7EC91BB336CB6D810B124B1353C32396'), fp) - self.assert_(hostdict.check('foo.example.com', key)) + self.assertEqual(b('7EC91BB336CB6D810B124B1353C32396'), fp) + self.assertTrue(hostdict.check('foo.example.com', key)) def test_3_dict(self): hostdict = paramiko.HostKeys('hostfile.temp') - self.assert_('secure.example.com' in hostdict) - self.assert_('not.example.com' not in hostdict) - self.assert_('secure.example.com' in hostdict) - self.assert_('not.example.com' not in hostdict) + self.assertTrue('secure.example.com' in hostdict) + self.assertTrue('not.example.com' not in hostdict) + self.assertTrue('secure.example.com' in hostdict) + self.assertTrue('not.example.com' not in hostdict) x = hostdict.get('secure.example.com', None) - self.assert_(x is not None) + self.assertTrue(x is not None) fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper() - self.assertEquals(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp) + self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp) i = 0 for key in hostdict: i += 1 - self.assertEquals(2, i) + self.assertEqual(2, i) def test_4_dict_set(self): hostdict = paramiko.HostKeys('hostfile.temp') - key = paramiko.RSAKey(data=base64.decodestring(keyblob)) - key_dss = paramiko.DSSKey(data=base64.decodestring(keyblob_dss)) + key = paramiko.RSAKey(data=decodebytes(keyblob)) + key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss)) hostdict['secure.example.com'] = { 'ssh-rsa': key, 'ssh-dss': key_dss @@ -108,11 +108,11 @@ class HostKeysTest (unittest.TestCase): hostdict['fake.example.com'] = {} hostdict['fake.example.com']['ssh-rsa'] = key - self.assertEquals(3, len(hostdict)) - self.assertEquals(2, len(list(hostdict.values())[0])) - self.assertEquals(1, len(list(hostdict.values())[1])) - self.assertEquals(1, len(list(hostdict.values())[2])) + self.assertEqual(3, len(hostdict)) + self.assertEqual(2, len(list(hostdict.values())[0])) + self.assertEqual(1, len(list(hostdict.values())[1])) + self.assertEqual(1, len(list(hostdict.values())[2])) fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper() - self.assertEquals(b('7EC91BB336CB6D810B124B1353C32396'), fp) + self.assertEqual(b('7EC91BB336CB6D810B124B1353C32396'), fp) fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper() - self.assertEquals(b('4478F0B9A23CC5182009FF755BC1D26C'), fp) + self.assertEqual(b('4478F0B9A23CC5182009FF755BC1D26C'), fp) diff --git a/tests/test_kex.py b/tests/test_kex.py index f7cb064..dbe377a 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -92,8 +92,8 @@ class KexTest (unittest.TestCase): kex = KexGroup1(transport) kex.start_kex() x = b('1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4') - self.assertEquals(x, hexlify(transport._message.asbytes()).upper()) - self.assertEquals((paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect) # fake "reply" msg = Message() @@ -103,17 +103,17 @@ class KexTest (unittest.TestCase): msg.rewind() kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg) H = b('03079780F3D3AD0B3C6DB30C8D21685F367A86D2') - self.assertEquals(self.K, transport._K) - self.assertEquals(H, hexlify(transport._H).upper()) - self.assertEquals((b('fake-host-key'), b('fake-sig')), transport._verify) - self.assert_(transport._activated) + self.assertEqual(self.K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify) + self.assertTrue(transport._activated) def test_2_group1_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGroup1(transport) kex.start_kex() - self.assertEquals((paramiko.kex_group1._MSG_KEXDH_INIT,), transport._expect) + self.assertEqual((paramiko.kex_group1._MSG_KEXDH_INIT,), transport._expect) msg = Message() msg.add_mpint(69) @@ -121,10 +121,10 @@ class KexTest (unittest.TestCase): kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg) H = b('B16BF34DD10945EDE84E9C1EF24A14BFDC843389') x = b('1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967') - self.assertEquals(self.K, transport._K) - self.assertEquals(H, hexlify(transport._H).upper()) - self.assertEquals(x, hexlify(transport._message.asbytes()).upper()) - self.assert_(transport._activated) + self.assertEqual(self.K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertTrue(transport._activated) def test_3_gex_client(self): transport = FakeTransport() @@ -132,8 +132,8 @@ class KexTest (unittest.TestCase): kex = KexGex(transport) kex.start_kex() x = b('22000004000000080000002000') - self.assertEquals(x, hexlify(transport._message.asbytes()).upper()) - self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect) msg = Message() msg.add_mpint(FakeModulusPack.P) @@ -141,8 +141,8 @@ class KexTest (unittest.TestCase): msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) x = b('20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4') - self.assertEquals(x, hexlify(transport._message.asbytes()).upper()) - self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect) msg = Message() msg.add_string('fake-host-key') @@ -151,10 +151,10 @@ class KexTest (unittest.TestCase): msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) H = b('A265563F2FA87F1A89BF007EE90D58BE2E4A4BD0') - self.assertEquals(self.K, transport._K) - self.assertEquals(H, hexlify(transport._H).upper()) - self.assertEquals((b('fake-host-key'), b('fake-sig')), transport._verify) - self.assert_(transport._activated) + self.assertEqual(self.K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify) + self.assertTrue(transport._activated) def test_4_gex_old_client(self): transport = FakeTransport() @@ -162,8 +162,8 @@ class KexTest (unittest.TestCase): kex = KexGex(transport) kex.start_kex(_test_old_style=True) x = b('1E00000800') - self.assertEquals(x, hexlify(transport._message.asbytes()).upper()) - self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect) msg = Message() msg.add_mpint(FakeModulusPack.P) @@ -171,8 +171,8 @@ class KexTest (unittest.TestCase): msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) x = b('20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4') - self.assertEquals(x, hexlify(transport._message.asbytes()).upper()) - self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect) msg = Message() msg.add_string('fake-host-key') @@ -181,17 +181,17 @@ class KexTest (unittest.TestCase): msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) H = b('807F87B269EF7AC5EC7E75676808776A27D5864C') - self.assertEquals(self.K, transport._K) - self.assertEquals(H, hexlify(transport._H).upper()) - self.assertEquals((b('fake-host-key'), b('fake-sig')), transport._verify) - self.assert_(transport._activated) + self.assertEqual(self.K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify) + self.assertTrue(transport._activated) def test_5_gex_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGex(transport) kex.start_kex() - self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect) msg = Message() msg.add_int(1024) @@ -200,8 +200,8 @@ class KexTest (unittest.TestCase): msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg) x = b('1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102') - self.assertEquals(x, hexlify(transport._message.asbytes()).upper()) - self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect) msg = Message() msg.add_mpint(12345) @@ -210,25 +210,25 @@ class KexTest (unittest.TestCase): K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 H = b('CE754197C21BF3452863B4F44D0B3951F12516EF') x = b('210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967') - self.assertEquals(K, transport._K) - self.assertEquals(H, hexlify(transport._H).upper()) - self.assertEquals(x, hexlify(transport._message.asbytes()).upper()) - self.assert_(transport._activated) + self.assertEqual(K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertTrue(transport._activated) def test_6_gex_server_with_old_client(self): transport = FakeTransport() transport.server_mode = True kex = KexGex(transport) kex.start_kex() - self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect) msg = Message() msg.add_int(2048) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg) x = b('1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102') - self.assertEquals(x, hexlify(transport._message.asbytes()).upper()) - self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect) msg = Message() msg.add_mpint(12345) @@ -237,7 +237,7 @@ class KexTest (unittest.TestCase): K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 H = b('B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B') x = b('210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967') - self.assertEquals(K, transport._K) - self.assertEquals(H, hexlify(transport._H).upper()) - self.assertEquals(x, hexlify(transport._message.asbytes()).upper()) - self.assert_(transport._activated) + self.assertEqual(K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertTrue(transport._activated) diff --git a/tests/test_message.py b/tests/test_message.py index 0aed88c..f983b4d 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -40,7 +40,7 @@ class MessageTest (unittest.TestCase): msg.add_string('q') msg.add_string('hello') msg.add_string('x' * 1000) - self.assertEquals(msg.asbytes(), self.__a) + self.assertEqual(msg.asbytes(), self.__a) msg = Message() msg.add_boolean(True) @@ -49,7 +49,7 @@ class MessageTest (unittest.TestCase): msg.add_bytes(zero_byte + byte_chr(0x3f)) msg.add_list(['huey', 'dewey', 'louie']) - self.assertEquals(msg.asbytes(), self.__b) + self.assertEqual(msg.asbytes(), self.__b) msg = Message() msg.add_int64(5) @@ -57,29 +57,29 @@ class MessageTest (unittest.TestCase): msg.add_mpint(17) msg.add_mpint(0xf5e4d3c2b109) msg.add_mpint(-0x65e4d3c2b109) - self.assertEquals(msg.asbytes(), self.__c) + self.assertEqual(msg.asbytes(), self.__c) def test_2_decode(self): msg = Message(self.__a) - self.assertEquals(msg.get_int(), 23) - self.assertEquals(msg.get_int(), 123789456) - self.assertEquals(msg.get_text(), 'q') - self.assertEquals(msg.get_text(), 'hello') - self.assertEquals(msg.get_text(), 'x' * 1000) + self.assertEqual(msg.get_int(), 23) + self.assertEqual(msg.get_int(), 123789456) + self.assertEqual(msg.get_text(), 'q') + self.assertEqual(msg.get_text(), 'hello') + self.assertEqual(msg.get_text(), 'x' * 1000) msg = Message(self.__b) - self.assertEquals(msg.get_boolean(), True) - self.assertEquals(msg.get_boolean(), False) - self.assertEquals(msg.get_byte(), byte_chr(0xf3)) - self.assertEquals(msg.get_bytes(2), zero_byte + byte_chr(0x3f)) - self.assertEquals(msg.get_list(), ['huey', 'dewey', 'louie']) + self.assertEqual(msg.get_boolean(), True) + self.assertEqual(msg.get_boolean(), False) + self.assertEqual(msg.get_byte(), byte_chr(0xf3)) + self.assertEqual(msg.get_bytes(2), zero_byte + byte_chr(0x3f)) + self.assertEqual(msg.get_list(), ['huey', 'dewey', 'louie']) msg = Message(self.__c) - self.assertEquals(msg.get_int64(), 5) - self.assertEquals(msg.get_int64(), 0xf5e4d3c2b109) - self.assertEquals(msg.get_mpint(), 17) - self.assertEquals(msg.get_mpint(), 0xf5e4d3c2b109) - self.assertEquals(msg.get_mpint(), -0x65e4d3c2b109) + self.assertEqual(msg.get_int64(), 5) + self.assertEqual(msg.get_int64(), 0xf5e4d3c2b109) + self.assertEqual(msg.get_mpint(), 17) + self.assertEqual(msg.get_mpint(), 0xf5e4d3c2b109) + self.assertEqual(msg.get_mpint(), -0x65e4d3c2b109) def test_3_add(self): msg = Message() @@ -89,16 +89,16 @@ class MessageTest (unittest.TestCase): msg.add(True) msg.add('cat') msg.add(['a', 'b']) - self.assertEquals(msg.asbytes(), self.__d) + self.assertEqual(msg.asbytes(), self.__d) def test_4_misc(self): msg = Message(self.__d) - self.assertEquals(msg.get_int(), 5) - self.assertEquals(msg.get_int(), 0x1122334455) - self.assertEquals(msg.get_int(), 0xf00000000000000000) - self.assertEquals(msg.get_so_far(), self.__d[:29]) - self.assertEquals(msg.get_remainder(), self.__d[29:]) + self.assertEqual(msg.get_int(), 5) + self.assertEqual(msg.get_int(), 0x1122334455) + self.assertEqual(msg.get_int(), 0xf00000000000000000) + self.assertEqual(msg.get_so_far(), self.__d[:29]) + self.assertEqual(msg.get_remainder(), self.__d[29:]) msg.rewind() - self.assertEquals(msg.get_int(), 5) - self.assertEquals(msg.get_so_far(), self.__d[:4]) - self.assertEquals(msg.get_remainder(), self.__d[4:]) + self.assertEqual(msg.get_int(), 5) + self.assertEqual(msg.get_so_far(), self.__d[:4]) + self.assertEqual(msg.get_remainder(), self.__d[4:]) diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index a4ada72..43dea38 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -54,8 +54,8 @@ class PacketizerTest (unittest.TestCase): p.send_message(m) data = rsock.recv(100) # 32 + 12 bytes of MAC = 44 - self.assertEquals(44, len(data)) - self.assertEquals(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c0')), data[:16]) + self.assertEqual(44, len(data)) + self.assertEqual(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c0')), data[:16]) def test_2_read (self): rsock = LoopSocket() @@ -68,7 +68,7 @@ class PacketizerTest (unittest.TestCase): p.set_inbound_cipher(cipher, 16, SHA, 12, x1f * 20) wsock.send(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c090d216560d717361387c4c3dfb977de26e03b1a0c21cd641414cb459'))) cmd, m = p.read_message() - self.assertEquals(100, cmd) - self.assertEquals(100, m.get_int()) - self.assertEquals(1, m.get_int()) - self.assertEquals(900, m.get_int()) + self.assertEqual(100, cmd) + self.assertEqual(100, m.get_int()) + self.assertEqual(1, m.get_int()) + self.assertEqual(900, m.get_int()) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 8ab21a3..f854946 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -92,157 +92,157 @@ class KeyTest (unittest.TestCase): from Crypto.Hash import MD5 key = util.generate_key_bytes(MD5, x1234, 'happy birthday', 30) exp = unhexlify(b('61E1F272F4C1C4561586BD322498C0E924672780F47BB37DDA7D54019E64')) - self.assertEquals(exp, key) + self.assertEqual(exp, key) def test_2_load_rsa(self): key = RSAKey.from_private_key_file(test_path('test_rsa.key')) - self.assertEquals('ssh-rsa', key.get_name()) + self.assertEqual('ssh-rsa', key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(':', '')) my_rsa = hexlify(key.get_fingerprint()) - self.assertEquals(exp_rsa, my_rsa) - self.assertEquals(PUB_RSA.split()[1], key.get_base64()) - self.assertEquals(1024, key.get_bits()) + self.assertEqual(exp_rsa, my_rsa) + self.assertEqual(PUB_RSA.split()[1], key.get_base64()) + self.assertEqual(1024, key.get_bits()) s = StringIO() key.write_private_key(s) - self.assertEquals(RSA_PRIVATE_OUT, s.getvalue()) + self.assertEqual(RSA_PRIVATE_OUT, s.getvalue()) s.seek(0) key2 = RSAKey.from_private_key(s) - self.assertEquals(key, key2) + self.assertEqual(key, key2) def test_3_load_rsa_password(self): key = RSAKey.from_private_key_file(test_path('test_rsa_password.key'), 'television') - self.assertEquals('ssh-rsa', key.get_name()) + self.assertEqual('ssh-rsa', key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(':', '')) my_rsa = hexlify(key.get_fingerprint()) - self.assertEquals(exp_rsa, my_rsa) - self.assertEquals(PUB_RSA.split()[1], key.get_base64()) - self.assertEquals(1024, key.get_bits()) + self.assertEqual(exp_rsa, my_rsa) + self.assertEqual(PUB_RSA.split()[1], key.get_base64()) + self.assertEqual(1024, key.get_bits()) def test_4_load_dss(self): key = DSSKey.from_private_key_file(test_path('test_dss.key')) - self.assertEquals('ssh-dss', key.get_name()) + self.assertEqual('ssh-dss', key.get_name()) exp_dss = b(FINGER_DSS.split()[1].replace(':', '')) my_dss = hexlify(key.get_fingerprint()) - self.assertEquals(exp_dss, my_dss) - self.assertEquals(PUB_DSS.split()[1], key.get_base64()) - self.assertEquals(1024, key.get_bits()) + self.assertEqual(exp_dss, my_dss) + self.assertEqual(PUB_DSS.split()[1], key.get_base64()) + self.assertEqual(1024, key.get_bits()) s = StringIO() key.write_private_key(s) - self.assertEquals(DSS_PRIVATE_OUT, s.getvalue()) + self.assertEqual(DSS_PRIVATE_OUT, s.getvalue()) s.seek(0) key2 = DSSKey.from_private_key(s) - self.assertEquals(key, key2) + self.assertEqual(key, key2) def test_5_load_dss_password(self): key = DSSKey.from_private_key_file(test_path('test_dss_password.key'), 'television') - self.assertEquals('ssh-dss', key.get_name()) + self.assertEqual('ssh-dss', key.get_name()) exp_dss = b(FINGER_DSS.split()[1].replace(':', '')) my_dss = hexlify(key.get_fingerprint()) - self.assertEquals(exp_dss, my_dss) - self.assertEquals(PUB_DSS.split()[1], key.get_base64()) - self.assertEquals(1024, key.get_bits()) + self.assertEqual(exp_dss, my_dss) + self.assertEqual(PUB_DSS.split()[1], key.get_base64()) + self.assertEqual(1024, key.get_bits()) def test_6_compare_rsa(self): # verify that the private & public keys compare equal key = RSAKey.from_private_key_file(test_path('test_rsa.key')) - self.assertEquals(key, key) + self.assertEqual(key, key) pub = RSAKey(data=key.asbytes()) - self.assert_(key.can_sign()) - self.assert_(not pub.can_sign()) - self.assertEquals(key, pub) + self.assertTrue(key.can_sign()) + self.assertTrue(not pub.can_sign()) + self.assertEqual(key, pub) def test_7_compare_dss(self): # verify that the private & public keys compare equal key = DSSKey.from_private_key_file(test_path('test_dss.key')) - self.assertEquals(key, key) + self.assertEqual(key, key) pub = DSSKey(data=key.asbytes()) - self.assert_(key.can_sign()) - self.assert_(not pub.can_sign()) - self.assertEquals(key, pub) + self.assertTrue(key.can_sign()) + self.assertTrue(not pub.can_sign()) + self.assertEqual(key, pub) def test_8_sign_rsa(self): # verify that the rsa private key can sign and verify key = RSAKey.from_private_key_file(test_path('test_rsa.key')) msg = key.sign_ssh_data(rng, b('ice weasels')) - self.assert_(type(msg) is Message) + self.assertTrue(type(msg) is Message) msg.rewind() - self.assertEquals('ssh-rsa', msg.get_text()) + self.assertEqual('ssh-rsa', msg.get_text()) sig = bytes().join([byte_chr(int(x, 16)) for x in SIGNED_RSA.split(':')]) - self.assertEquals(sig, msg.get_binary()) + self.assertEqual(sig, msg.get_binary()) msg.rewind() pub = RSAKey(data=key.asbytes()) - self.assert_(pub.verify_ssh_sig(b('ice weasels'), msg)) + self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg)) def test_9_sign_dss(self): # verify that the dss private key can sign and verify key = DSSKey.from_private_key_file(test_path('test_dss.key')) msg = key.sign_ssh_data(rng, b('ice weasels')) - self.assert_(type(msg) is Message) + self.assertTrue(type(msg) is Message) msg.rewind() - self.assertEquals('ssh-dss', msg.get_text()) + self.assertEqual('ssh-dss', msg.get_text()) # can't do the same test as we do for RSA, because DSS signatures # are usually different each time. but we can test verification # anyway so it's ok. - self.assertEquals(40, len(msg.get_binary())) + self.assertEqual(40, len(msg.get_binary())) msg.rewind() pub = DSSKey(data=key.asbytes()) - self.assert_(pub.verify_ssh_sig(b('ice weasels'), msg)) + self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg)) def test_A_generate_rsa(self): key = RSAKey.generate(1024) msg = key.sign_ssh_data(rng, b('jerri blank')) msg.rewind() - self.assert_(key.verify_ssh_sig(b('jerri blank'), msg)) + self.assertTrue(key.verify_ssh_sig(b('jerri blank'), msg)) def test_B_generate_dss(self): key = DSSKey.generate(1024) msg = key.sign_ssh_data(rng, b('jerri blank')) msg.rewind() - self.assert_(key.verify_ssh_sig(b('jerri blank'), msg)) + self.assertTrue(key.verify_ssh_sig(b('jerri blank'), msg)) def test_10_load_ecdsa(self): key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key')) - self.assertEquals('ecdsa-sha2-nistp256', key.get_name()) + self.assertEqual('ecdsa-sha2-nistp256', key.get_name()) exp_ecdsa = b(FINGER_ECDSA.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) - self.assertEquals(exp_ecdsa, my_ecdsa) - self.assertEquals(PUB_ECDSA.split()[1], key.get_base64()) - self.assertEquals(256, key.get_bits()) + self.assertEqual(exp_ecdsa, my_ecdsa) + self.assertEqual(PUB_ECDSA.split()[1], key.get_base64()) + self.assertEqual(256, key.get_bits()) s = StringIO() key.write_private_key(s) - self.assertEquals(ECDSA_PRIVATE_OUT, s.getvalue()) + self.assertEqual(ECDSA_PRIVATE_OUT, s.getvalue()) s.seek(0) key2 = ECDSAKey.from_private_key(s) - self.assertEquals(key, key2) + self.assertEqual(key, key2) def test_11_load_ecdsa_password(self): key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password.key'), b('television')) - self.assertEquals('ecdsa-sha2-nistp256', key.get_name()) + self.assertEqual('ecdsa-sha2-nistp256', key.get_name()) exp_ecdsa = b(FINGER_ECDSA.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) - self.assertEquals(exp_ecdsa, my_ecdsa) - self.assertEquals(PUB_ECDSA.split()[1], key.get_base64()) - self.assertEquals(256, key.get_bits()) + self.assertEqual(exp_ecdsa, my_ecdsa) + self.assertEqual(PUB_ECDSA.split()[1], key.get_base64()) + self.assertEqual(256, key.get_bits()) def test_12_compare_ecdsa(self): # verify that the private & public keys compare equal key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key')) - self.assertEquals(key, key) + self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) - self.assert_(key.can_sign()) - self.assert_(not pub.can_sign()) - self.assertEquals(key, pub) + self.assertTrue(key.can_sign()) + self.assertTrue(not pub.can_sign()) + self.assertEqual(key, pub) def test_13_sign_ecdsa(self): # verify that the rsa private key can sign and verify key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key')) msg = key.sign_ssh_data(rng, b('ice weasels')) - self.assert_(type(msg) is Message) + self.assertTrue(type(msg) is Message) msg.rewind() - self.assertEquals('ecdsa-sha2-nistp256', msg.get_text()) + self.assertEqual('ecdsa-sha2-nistp256', msg.get_text()) # ECDSA signatures, like DSS signatures, tend to be different # each time, so we can't compare against a "known correct" # signature. @@ -250,4 +250,4 @@ class KeyTest (unittest.TestCase): msg.rewind() pub = ECDSAKey(data=key.asbytes()) - self.assert_(pub.verify_ssh_sig(b('ice weasels'), msg)) + self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg)) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index c17defa..b84b3fd 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -162,8 +162,8 @@ class SFTPTest (unittest.TestCase): f = sftp.open(FOLDER + '/test', 'w') try: self.assertEqual(f.stat().st_size, 0) - f.close() finally: + f.close() sftp.remove(FOLDER + '/test') def test_2_close(self): @@ -219,8 +219,8 @@ class SFTPTest (unittest.TestCase): self.assertEqual(f.stat().st_size, 37) f.seek(-26, f.SEEK_CUR) self.assertEqual(f.readline(), 'second line\n') - f.close() finally: + f.close() sftp.remove(FOLDER + '/append.txt') def test_5_rename(self): diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index a53a6c3..dc9cba9 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -262,6 +262,7 @@ class BigSFTPTest (unittest.TestCase): for i in range(10): f = sftp.open('%s/hongry.txt' % FOLDER, 'r') f.prefetch() + f.close() f = sftp.open('%s/hongry.txt' % FOLDER, 'r') f.prefetch() for n in range(1024): diff --git a/tests/test_transport.py b/tests/test_transport.py index 22a02a8..397b00c 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -132,28 +132,28 @@ class TransportTest(ParamikoTest): event = threading.Event() self.server = NullServer() - self.assert_(not event.isSet()) + self.assertTrue(not event.isSet()) self.ts.start_server(event, self.server) self.tc.connect(hostkey=public_host_key, username='slowdive', password='pygmalion') event.wait(1.0) - self.assert_(event.isSet()) - self.assert_(self.ts.is_active()) + self.assertTrue(event.isSet()) + self.assertTrue(self.ts.is_active()) def test_1_security_options(self): o = self.tc.get_security_options() - self.assertEquals(type(o), SecurityOptions) - self.assert_(('aes256-cbc', 'blowfish-cbc') != o.ciphers) + self.assertEqual(type(o), SecurityOptions) + self.assertTrue(('aes256-cbc', 'blowfish-cbc') != o.ciphers) o.ciphers = ('aes256-cbc', 'blowfish-cbc') - self.assertEquals(('aes256-cbc', 'blowfish-cbc'), o.ciphers) + self.assertEqual(('aes256-cbc', 'blowfish-cbc'), o.ciphers) try: o.ciphers = ('aes256-cbc', 'made-up-cipher') - self.assert_(False) + self.assertTrue(False) except ValueError: pass try: o.ciphers = 23 - self.assert_(False) + self.assertTrue(False) except TypeError: pass @@ -162,7 +162,7 @@ class TransportTest(ParamikoTest): self.tc.H = unhexlify(b('0C8307CDE6856FF30BA93684EB0F04C2520E9ED3')) self.tc.session_id = self.tc.H key = self.tc._compute_key('C', 32) - self.assertEquals(b('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995'), + self.assertEqual(b('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995'), hexlify(key).upper()) def test_3_simple(self): @@ -176,21 +176,21 @@ class TransportTest(ParamikoTest): self.ts.add_server_key(host_key) event = threading.Event() server = NullServer() - self.assert_(not event.isSet()) - self.assertEquals(None, self.tc.get_username()) - self.assertEquals(None, self.ts.get_username()) - self.assertEquals(False, self.tc.is_authenticated()) - self.assertEquals(False, self.ts.is_authenticated()) + self.assertTrue(not event.isSet()) + self.assertEqual(None, self.tc.get_username()) + self.assertEqual(None, self.ts.get_username()) + self.assertEqual(False, self.tc.is_authenticated()) + self.assertEqual(False, self.ts.is_authenticated()) self.ts.start_server(event, server) self.tc.connect(hostkey=public_host_key, username='slowdive', password='pygmalion') event.wait(1.0) - self.assert_(event.isSet()) - self.assert_(self.ts.is_active()) - self.assertEquals('slowdive', self.tc.get_username()) - self.assertEquals('slowdive', self.ts.get_username()) - self.assertEquals(True, self.tc.is_authenticated()) - self.assertEquals(True, self.ts.is_authenticated()) + self.assertTrue(event.isSet()) + self.assertTrue(self.ts.is_active()) + self.assertEqual('slowdive', self.tc.get_username()) + self.assertEqual('slowdive', self.ts.get_username()) + self.assertEqual(True, self.tc.is_authenticated()) + self.assertEqual(True, self.ts.is_authenticated()) def test_3a_long_banner(self): """ @@ -201,14 +201,14 @@ class TransportTest(ParamikoTest): self.ts.add_server_key(host_key) event = threading.Event() server = NullServer() - self.assert_(not event.isSet()) + self.assertTrue(not event.isSet()) self.socks.send(LONG_BANNER) self.ts.start_server(event, server) self.tc.connect(hostkey=public_host_key, username='slowdive', password='pygmalion') event.wait(1.0) - self.assert_(event.isSet()) - self.assert_(self.ts.is_active()) + self.assertTrue(event.isSet()) + self.assertTrue(self.ts.is_active()) def test_4_special(self): """ @@ -219,10 +219,10 @@ class TransportTest(ParamikoTest): options.ciphers = ('aes256-cbc',) options.digests = ('hmac-md5-96',) self.setup_test_server(client_options=force_algorithms) - self.assertEquals('aes256-cbc', self.tc.local_cipher) - self.assertEquals('aes256-cbc', self.tc.remote_cipher) - self.assertEquals(12, self.tc.packetizer.get_mac_size_out()) - self.assertEquals(12, self.tc.packetizer.get_mac_size_in()) + self.assertEqual('aes256-cbc', self.tc.local_cipher) + self.assertEqual('aes256-cbc', self.tc.remote_cipher) + self.assertEqual(12, self.tc.packetizer.get_mac_size_out()) + self.assertEqual(12, self.tc.packetizer.get_mac_size_in()) self.tc.send_ignore(1024) self.tc.renegotiate_keys() @@ -233,10 +233,10 @@ class TransportTest(ParamikoTest): verify that the keepalive will be sent. """ self.setup_test_server() - self.assertEquals(None, getattr(self.server, '_global_request', None)) + self.assertEqual(None, getattr(self.server, '_global_request', None)) self.tc.set_keepalive(1) time.sleep(2) - self.assertEquals('keepalive@lag.net', self.server._global_request) + self.assertEqual('keepalive@lag.net', self.server._global_request) def test_6_exec_command(self): """ @@ -248,7 +248,7 @@ class TransportTest(ParamikoTest): schan = self.ts.accept(1.0) try: chan.exec_command('no') - self.assert_(False) + self.assertTrue(False) except SSHException: pass @@ -260,11 +260,11 @@ class TransportTest(ParamikoTest): schan.close() f = chan.makefile() - self.assertEquals('Hello there.\n', f.readline()) - self.assertEquals('', f.readline()) + self.assertEqual('Hello there.\n', f.readline()) + self.assertEqual('', f.readline()) f = chan.makefile_stderr() - self.assertEquals('This is on stderr.\n', f.readline()) - self.assertEquals('', f.readline()) + self.assertEqual('This is on stderr.\n', f.readline()) + self.assertEqual('', f.readline()) # now try it with combined stdout/stderr chan = self.tc.open_session() @@ -276,9 +276,9 @@ class TransportTest(ParamikoTest): chan.set_combine_stderr(True) f = chan.makefile() - self.assertEquals('Hello there.\n', f.readline()) - self.assertEquals('This is on stderr.\n', f.readline()) - self.assertEquals('', f.readline()) + self.assertEqual('Hello there.\n', f.readline()) + self.assertEqual('This is on stderr.\n', f.readline()) + self.assertEqual('', f.readline()) def test_7_invoke_shell(self): """ @@ -290,9 +290,9 @@ class TransportTest(ParamikoTest): schan = self.ts.accept(1.0) chan.send('communist j. cat\n') f = schan.makefile() - self.assertEquals('communist j. cat\n', f.readline()) + self.assertEqual('communist j. cat\n', f.readline()) chan.close() - self.assertEquals('', f.readline()) + self.assertEqual('', f.readline()) def test_8_channel_exception(self): """ @@ -304,7 +304,7 @@ class TransportTest(ParamikoTest): self.fail('expected exception') except ChannelException: x = sys.exc_info()[1] - self.assert_(x.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED) + self.assertTrue(x.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED) def test_9_exit_status(self): """ @@ -316,7 +316,7 @@ class TransportTest(ParamikoTest): schan = self.ts.accept(1.0) chan.exec_command('yes') schan.send('Hello there.\n') - self.assert_(not chan.exit_status_ready()) + self.assertTrue(not chan.exit_status_ready()) # trigger an EOF schan.shutdown_read() schan.shutdown_write() @@ -324,15 +324,15 @@ class TransportTest(ParamikoTest): schan.close() f = chan.makefile() - self.assertEquals('Hello there.\n', f.readline()) - self.assertEquals('', f.readline()) + self.assertEqual('Hello there.\n', f.readline()) + self.assertEqual('', f.readline()) count = 0 while not chan.exit_status_ready(): time.sleep(0.1) count += 1 if count > 50: raise Exception("timeout") - self.assertEquals(23, chan.recv_exit_status()) + self.assertEqual(23, chan.recv_exit_status()) chan.close() def test_A_select(self): @@ -346,9 +346,9 @@ class TransportTest(ParamikoTest): # nothing should be ready r, w, e = select.select([chan], [], [], 0.1) - self.assertEquals([], r) - self.assertEquals([], w) - self.assertEquals([], e) + self.assertEqual([], r) + self.assertEqual([], w) + self.assertEqual([], e) schan.send('hello\n') @@ -358,17 +358,17 @@ class TransportTest(ParamikoTest): if chan in r: break time.sleep(0.1) - self.assertEquals([chan], r) - self.assertEquals([], w) - self.assertEquals([], e) + self.assertEqual([chan], r) + self.assertEqual([], w) + self.assertEqual([], e) - self.assertEquals(b('hello\n'), chan.recv(6)) + self.assertEqual(b('hello\n'), chan.recv(6)) # and, should be dead again now r, w, e = select.select([chan], [], [], 0.1) - self.assertEquals([], r) - self.assertEquals([], w) - self.assertEquals([], e) + self.assertEqual([], r) + self.assertEqual([], w) + self.assertEqual([], e) schan.close() @@ -378,17 +378,17 @@ class TransportTest(ParamikoTest): if chan in r: break time.sleep(0.1) - self.assertEquals([chan], r) - self.assertEquals([], w) - self.assertEquals([], e) - self.assertEquals(bytes(), chan.recv(16)) + self.assertEqual([chan], r) + self.assertEqual([], w) + self.assertEqual([], e) + self.assertEqual(bytes(), chan.recv(16)) # make sure the pipe is still open for now... p = chan._pipe - self.assertEquals(False, p._closed) + self.assertEqual(False, p._closed) chan.close() # ...and now is closed. - self.assertEquals(True, p._closed) + self.assertEqual(True, p._closed) def test_B_renegotiate(self): """ @@ -400,7 +400,7 @@ class TransportTest(ParamikoTest): chan.exec_command('yes') schan = self.ts.accept(1.0) - self.assertEquals(self.tc.H, self.tc.session_id) + self.assertEqual(self.tc.H, self.tc.session_id) for i in range(20): chan.send('x' * 1024) chan.close() @@ -410,7 +410,7 @@ class TransportTest(ParamikoTest): if self.tc.H != self.tc.session_id: break time.sleep(0.1) - self.assertNotEquals(self.tc.H, self.tc.session_id) + self.assertNotEqual(self.tc.H, self.tc.session_id) schan.close() @@ -429,8 +429,8 @@ class TransportTest(ParamikoTest): chan.send('x' * 1024) bytes2 = self.tc.packetizer._Packetizer__sent_bytes # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :) - self.assert_(bytes2 - bytes < 1024) - self.assertEquals(52, bytes2 - bytes) + self.assertTrue(bytes2 - bytes < 1024) + self.assertEqual(52, bytes2 - bytes) chan.close() schan.close() @@ -450,20 +450,20 @@ class TransportTest(ParamikoTest): requested.append((addr, port)) self.tc._queue_incoming_channel(c) - self.assertEquals(None, getattr(self.server, '_x11_screen_number', None)) + self.assertEqual(None, getattr(self.server, '_x11_screen_number', None)) cookie = chan.request_x11(0, single_connection=True, handler=handler) - self.assertEquals(0, self.server._x11_screen_number) - self.assertEquals('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol) - self.assertEquals(cookie, self.server._x11_auth_cookie) - self.assertEquals(True, self.server._x11_single_connection) + self.assertEqual(0, self.server._x11_screen_number) + self.assertEqual('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol) + self.assertEqual(cookie, self.server._x11_auth_cookie) + self.assertEqual(True, self.server._x11_single_connection) x11_server = self.ts.open_x11_channel(('localhost', 6093)) x11_client = self.tc.accept() - self.assertEquals('localhost', requested[0][0]) - self.assertEquals(6093, requested[0][1]) + self.assertEqual('localhost', requested[0][0]) + self.assertEqual(6093, requested[0][1]) x11_server.send('hello') - self.assertEquals(b('hello'), x11_client.recv(5)) + self.assertEqual(b('hello'), x11_client.recv(5)) x11_server.close() x11_client.close() @@ -487,7 +487,7 @@ class TransportTest(ParamikoTest): self.tc._queue_incoming_channel(c) port = self.tc.request_port_forward('127.0.0.1', 0, handler) - self.assertEquals(port, self.server._listen.getsockname()[1]) + self.assertEqual(port, self.server._listen.getsockname()[1]) cs = socket.socket() cs.connect(('127.0.0.1', port)) @@ -496,7 +496,7 @@ class TransportTest(ParamikoTest): cch = self.tc.accept() sch.send('hello') - self.assertEquals(b('hello'), cch.recv(5)) + self.assertEqual(b('hello'), cch.recv(5)) sch.close() cch.close() ss.close() @@ -533,7 +533,7 @@ class TransportTest(ParamikoTest): sch.send(cch.recv(8192)) sch.close() - self.assertEquals(b('Hello!\n'), cs.recv(7)) + self.assertEqual(b('Hello!\n'), cs.recv(7)) cs.close() def test_G_stderr_select(self): @@ -548,9 +548,9 @@ class TransportTest(ParamikoTest): # nothing should be ready r, w, e = select.select([chan], [], [], 0.1) - self.assertEquals([], r) - self.assertEquals([], w) - self.assertEquals([], e) + self.assertEqual([], r) + self.assertEqual([], w) + self.assertEqual([], e) schan.send_stderr('hello\n') @@ -560,17 +560,17 @@ class TransportTest(ParamikoTest): if chan in r: break time.sleep(0.1) - self.assertEquals([chan], r) - self.assertEquals([], w) - self.assertEquals([], e) + self.assertEqual([chan], r) + self.assertEqual([], w) + self.assertEqual([], e) - self.assertEquals(b('hello\n'), chan.recv_stderr(6)) + self.assertEqual(b('hello\n'), chan.recv_stderr(6)) # and, should be dead again now r, w, e = select.select([chan], [], [], 0.1) - self.assertEquals([], r) - self.assertEquals([], w) - self.assertEquals([], e) + self.assertEqual([], r) + self.assertEqual([], w) + self.assertEqual([], e) schan.close() chan.close() @@ -584,7 +584,7 @@ class TransportTest(ParamikoTest): chan.invoke_shell() schan = self.ts.accept(1.0) - self.assertEquals(chan.send_ready(), True) + self.assertEqual(chan.send_ready(), True) total = 0 K = '*' * 1024 while total < 1024 * 1024: @@ -592,11 +592,11 @@ class TransportTest(ParamikoTest): total += len(K) if not chan.send_ready(): break - self.assert_(total < 1024 * 1024) + self.assertTrue(total < 1024 * 1024) schan.close() chan.close() - self.assertEquals(chan.send_ready(), True) + self.assertEqual(chan.send_ready(), True) def test_I_rekey_deadlock(self): """ diff --git a/tests/test_util.py b/tests/test_util.py index 84d5e88..dba5223 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -103,7 +103,7 @@ class UtilTest(ParamikoTest): global test_config_file f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) - self.assertEquals(config._config, + self.assertEqual(config._config, [{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}}, {'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}}, {'host': ['*'], 'config': {'crazy': 'something dumb '}}, @@ -131,7 +131,7 @@ class UtilTest(ParamikoTest): hostname=host, identityfile=[os.path.expanduser("~/.ssh/id_rsa")] ) - self.assertEquals( + self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), values ) @@ -139,7 +139,7 @@ class UtilTest(ParamikoTest): def test_4_generate_key_bytes(self): x = paramiko.util.generate_key_bytes(SHA, b('ABCDEFGH'), 'This is my secret passphrase.', 64) hex = ''.join(['%02x' % byte_ord(c) for c in x]) - self.assertEquals(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b') + self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b') def test_5_host_keys(self): f = open('hostfile.temp', 'w') @@ -147,11 +147,11 @@ class UtilTest(ParamikoTest): f.close() try: hostdict = paramiko.util.load_host_keys('hostfile.temp') - self.assertEquals(2, len(hostdict)) - self.assertEquals(1, len(list(hostdict.values())[0])) - self.assertEquals(1, len(list(hostdict.values())[1])) + self.assertEqual(2, len(hostdict)) + self.assertEqual(1, len(list(hostdict.values())[0])) + self.assertEqual(1, len(list(hostdict.values())[1])) fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper() - self.assertEquals(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp) + self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp) finally: os.unlink('hostfile.temp') @@ -159,7 +159,7 @@ class UtilTest(ParamikoTest): from paramiko.common import rng # just verify that we can pull out 32 bytes and not get an exception. x = rng.read(32) - self.assertEquals(len(x), 32) + self.assertEqual(len(x), 32) def test_7_host_config_expose_issue_33(self): test_config_file = """ @@ -175,13 +175,13 @@ Host * f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) host = 'www13.example.com' - self.assertEquals( + self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), {'hostname': host, 'port': '22'} ) def test_8_eintr_retry(self): - self.assertEquals('foo', paramiko.util.retry_on_signal(lambda: 'foo')) + self.assertEqual('foo', paramiko.util.retry_on_signal(lambda: 'foo')) # Variables that are set by raises_intr intr_errors_remaining = [3] @@ -192,8 +192,8 @@ Host * intr_errors_remaining[0] -= 1 raise IOError(errno.EINTR, 'file', 'interrupted system call') self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None) - self.assertEquals(0, intr_errors_remaining[0]) - self.assertEquals(4, call_count[0]) + self.assertEqual(0, intr_errors_remaining[0]) + self.assertEqual(4, call_count[0]) def raises_ioerror_not_eintr(): raise IOError(errno.ENOENT, 'file', 'file not found') @@ -219,7 +219,7 @@ Host equals-delimited f = StringIO(conf) config = paramiko.util.parse_ssh_config(f) for host in ('space-delimited', 'equals-delimited'): - self.assertEquals( + self.assertEqual( host_config(host, config)['proxycommand'], 'foo bar=biz baz' ) @@ -245,7 +245,7 @@ Host * ('specific', "host specific port 37 lol"), ('portonly', "host portonly port 155"), ): - self.assertEquals( + self.assertEqual( host_config(host, config)['proxycommand'], val ) @@ -267,7 +267,7 @@ Host * f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) host = 'www13.example.com' - self.assertEquals( + self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), {'hostname': host, 'port': '8080'} ) @@ -295,7 +295,7 @@ ProxyCommand foo=bar:%h-%p f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) - self.assertEquals( + self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), values ) @@ -325,7 +325,7 @@ IdentityFile id_dsa22 f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) - self.assertEquals( + self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), values )