Fix some deprecation and resource warnings

This commit is contained in:
Scott Maxwell 2013-11-02 20:18:18 -07:00
parent 45e65b6e1e
commit 7444a99993
19 changed files with 398 additions and 375 deletions

View File

@ -27,7 +27,7 @@ import threading
import traceback
import paramiko
from paramiko.py3compat import b, u
from paramiko.py3compat import b, u, decodebytes
# setup logging
@ -46,7 +46,7 @@ class Server (paramiko.ServerInterface):
'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC' + \
'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT' + \
'UWT10hcuO4Ks8=')
good_pub_key = paramiko.RSAKey(data=base64.decodestring(data))
good_pub_key = paramiko.RSAKey(data=decodebytes(data))
def __init__(self):
self.event = threading.Event()

View File

@ -20,6 +20,8 @@
# based on code provided by raymond mosteller (thanks!)
from __future__ import with_statement
import base64
import getpass
import os
@ -95,13 +97,15 @@ try:
except IOError:
print('(assuming demo_sftp_folder/ already exists)')
sftp.open('demo_sftp_folder/README', 'w').write('This was created by demo_sftp.py.\n')
data = open('demo_sftp.py', 'r').read()
with open('demo_sftp.py', 'r') as f:
data = f.read()
sftp.open('demo_sftp_folder/demo_sftp.py', 'w').write(data)
print('created demo_sftp_folder/ on the server')
# copy the README back here
data = sftp.open('demo_sftp_folder/README', 'r').read()
open('README_demo_sftp', 'w').write(data)
with open('README_demo_sftp', 'w') as f:
f.write(data)
print('copied README back here')
# BETTER: use the get() and put() methods

View File

@ -83,11 +83,11 @@ class HostKeyEntry:
try:
key = b(key)
if keytype == 'ssh-rsa':
key = RSAKey(data=base64.decodestring(key))
key = RSAKey(data=decodebytes(key))
elif keytype == 'ssh-dss':
key = DSSKey(data=base64.decodestring(key))
key = DSSKey(data=decodebytes(key))
elif keytype == 'ecdsa-sha2-nistp256':
key = ECDSAKey(data=base64.decodestring(key))
key = ECDSAKey(data=decodebytes(key))
else:
log.info("Unable to handle key of type %s" % (keytype,))
return None
@ -173,19 +173,21 @@ class HostKeys (MutableMapping):
@raise IOError: if there was an error reading the file
"""
f = open(filename, 'r')
for lineno, line in enumerate(f):
line = line.strip()
if (len(line) == 0) or (line[0] == '#'):
continue
e = HostKeyEntry.from_line(line, lineno)
if e is not None:
_hostnames = e.hostnames
for h in _hostnames:
if self.check(h, e.key):
e.hostnames.remove(h)
if len(e.hostnames):
self._entries.append(e)
f.close()
try:
for lineno, line in enumerate(f):
line = line.strip()
if (len(line) == 0) or (line[0] == '#'):
continue
e = HostKeyEntry.from_line(line, lineno)
if e is not None:
_hostnames = e.hostnames
for h in _hostnames:
if self.check(h, e.key):
e.hostnames.remove(h)
if len(e.hostnames):
self._entries.append(e)
finally:
f.close()
def save(self, filename):
"""
@ -202,11 +204,13 @@ class HostKeys (MutableMapping):
@since: 1.6.1
"""
f = open(filename, 'w')
for e in self._entries:
line = e.to_line()
if line:
f.write(line)
f.close()
try:
for e in self._entries:
line = e.to_line()
if line:
f.write(line)
finally:
f.close()
def lookup(self, hostname):
"""
@ -362,10 +366,10 @@ class HostKeys (MutableMapping):
else:
if salt.startswith('|1|'):
salt = salt.split('|')[2]
salt = base64.decodestring(b(salt))
salt = decodebytes(b(salt))
assert len(salt) == SHA.digest_size
hmac = HMAC.HMAC(salt, b(hostname), SHA).digest()
hostkey = '|1|%s|%s' % (u(base64.encodestring(salt)), u(base64.encodestring(hmac)))
hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac)))
return hostkey.replace('\n', '')
hash_host = staticmethod(hash_host)

View File

@ -148,7 +148,7 @@ class PKey (object):
@return: a base64 string containing the public part of the key.
@rtype: str
"""
return u(base64.encodestring(self.asbytes())).replace('\n', '')
return u(encodebytes(self.asbytes())).replace('\n', '')
def sign_ssh_data(self, rng, data):
"""
@ -283,8 +283,10 @@ class PKey (object):
@raise SSHException: if the key file is invalid.
"""
f = open(filename, 'r')
data = self._read_private_key(tag, f, password)
f.close()
try:
data = self._read_private_key(tag, f, password)
finally:
f.close()
return data
def _read_private_key(self, tag, f, password=None):
@ -309,7 +311,7 @@ class PKey (object):
end += 1
# if we trudged to the end of the file, just try to cope.
try:
data = base64.decodestring(b(''.join(lines[start:end])))
data = decodebytes(b(''.join(lines[start:end])))
except base64.binascii.Error:
raise SSHException('base64 decoding error: ' + str(sys.exc_info()[1]))
if 'proc-type' not in headers:
@ -353,10 +355,12 @@ class PKey (object):
@raise IOError: if there was an error writing the file.
"""
f = open(filename, 'w', o600)
# grrr... the mode doesn't always take hold
os.chmod(filename, o600)
self._write_private_key(tag, f, data, password)
f.close()
try:
# grrr... the mode doesn't always take hold
os.chmod(filename, o600)
self._write_private_key(tag, f, data, password)
finally:
f.close()
def _write_private_key(self, tag, f, data, password=None):
f.write('-----BEGIN %s PRIVATE KEY-----\n' % tag)
@ -378,7 +382,7 @@ class PKey (object):
f.write('Proc-Type: 4,ENCRYPTED\n')
f.write('DEK-Info: %s,%s\n' % (cipher_name, u(hexlify(salt)).upper()))
f.write('\n')
s = u(base64.encodestring(data))
s = u(encodebytes(data))
# re-wrap to 64-char lines
s = ''.join(s.split('\n'))
s = '\n'.join([s[i : i+64] for i in range(0, len(s), 64)])

View File

@ -114,15 +114,17 @@ class ModulusPack (object):
"""
self.pack = {}
f = open(filename, 'r')
for line in f:
line = line.strip()
if (len(line) == 0) or (line[0] == '#'):
continue
try:
self._parse_modulus(line)
except:
continue
f.close()
try:
for line in f:
line = line.strip()
if (len(line) == 0) or (line[0] == '#'):
continue
try:
self._parse_modulus(line)
except:
continue
finally:
f.close()
def get_modulus(self, min, prefer, max):
bitsizes = sorted(self.pack.keys())

View File

@ -1,6 +1,9 @@
import sys
import base64
__all__ = ['PY3', 'string_types', 'integer_types', 'text_type', 'bytes_types', 'bytes', 'long', 'input', 'bytestring', 'byte_ord', 'byte_chr', 'byte_mask', 'b', 'u', 'b2s', 'StringIO', 'BytesIO', 'is_callable', 'MAXSIZE', 'next']
__all__ = ['PY3', 'string_types', 'integer_types', 'text_type', 'bytes_types', 'bytes', 'long', 'input',
'decodebytes', 'encodebytes', 'bytestring', 'byte_ord', 'byte_chr', 'byte_mask',
'b', 'u', 'b2s', 'StringIO', 'BytesIO', 'is_callable', 'MAXSIZE', 'next']
PY3 = sys.version_info[0] >= 3
@ -12,8 +15,11 @@ if PY3:
bytes = bytes
bytes_types = bytes
integer_types = int
long = int
class long(int):
pass
input = input
decodebytes = base64.decodebytes
encodebytes = base64.encodebytes
def bytestring(s):
return s
@ -72,6 +78,8 @@ else:
integer_types = (int, long)
long = long
input = raw_input
decodebytes = base64.decodestring
encodebytes = base64.encodestring
def bytestring(s): # NOQA
if isinstance(s, unicode):

View File

@ -119,13 +119,13 @@ class AuthTest (unittest.TestCase):
self.ts.add_server_key(host_key)
self.event = threading.Event()
self.server = NullServer()
self.assert_(not self.event.isSet())
self.assertTrue(not self.event.isSet())
self.ts.start_server(self.event, self.server)
def verify_finished(self):
self.event.wait(1.0)
self.assert_(self.event.isSet())
self.assert_(self.ts.is_active())
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
def test_1_bad_auth_type(self):
"""
@ -136,11 +136,11 @@ class AuthTest (unittest.TestCase):
try:
self.tc.connect(hostkey=self.public_host_key,
username='unknown', password='error')
self.assert_(False)
self.assertTrue(False)
except:
etype, evalue, etb = sys.exc_info()
self.assertEquals(BadAuthenticationType, etype)
self.assertEquals(['publickey'], evalue.allowed_types)
self.assertEqual(BadAuthenticationType, etype)
self.assertEqual(['publickey'], evalue.allowed_types)
def test_2_bad_password(self):
"""
@ -151,10 +151,10 @@ class AuthTest (unittest.TestCase):
self.tc.connect(hostkey=self.public_host_key)
try:
self.tc.auth_password(username='slowdive', password='error')
self.assert_(False)
self.assertTrue(False)
except:
etype, evalue, etb = sys.exc_info()
self.assert_(issubclass(etype, AuthenticationException))
self.assertTrue(issubclass(etype, AuthenticationException))
self.tc.auth_password(username='slowdive', password='pygmalion')
self.verify_finished()
@ -165,10 +165,10 @@ class AuthTest (unittest.TestCase):
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password(username='paranoid', password='paranoid')
self.assertEquals(['publickey'], remain)
self.assertEqual(['publickey'], remain)
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
remain = self.tc.auth_publickey(username='paranoid', key=key)
self.assertEquals([], remain)
self.assertEqual([], remain)
self.verify_finished()
def test_4_interactive_auth(self):
@ -184,9 +184,9 @@ class AuthTest (unittest.TestCase):
self.got_prompts = prompts
return ['cat']
remain = self.tc.auth_interactive('commie', handler)
self.assertEquals(self.got_title, 'password')
self.assertEquals(self.got_prompts, [('Password', False)])
self.assertEquals([], remain)
self.assertEqual(self.got_title, 'password')
self.assertEqual(self.got_prompts, [('Password', False)])
self.assertEqual([], remain)
self.verify_finished()
def test_5_interactive_auth_fallback(self):
@ -197,7 +197,7 @@ class AuthTest (unittest.TestCase):
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password('commie', 'cat')
self.assertEquals([], remain)
self.assertEqual([], remain)
self.verify_finished()
def test_6_auth_utf8(self):
@ -207,7 +207,7 @@ class AuthTest (unittest.TestCase):
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password('utf8', _pwd)
self.assertEquals([], remain)
self.assertEqual([], remain)
self.verify_finished()
def test_7_auth_non_utf8(self):
@ -218,7 +218,7 @@ class AuthTest (unittest.TestCase):
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password('non-utf8', '\xff')
self.assertEquals([], remain)
self.assertEqual([], remain)
self.verify_finished()
def test_8_auth_gets_disconnected(self):
@ -232,4 +232,4 @@ class AuthTest (unittest.TestCase):
remain = self.tc.auth_password('bad-server', 'hello')
except:
etype, evalue, etb = sys.exc_info()
self.assert_(issubclass(etype, AuthenticationException))
self.assertTrue(issubclass(etype, AuthenticationException))

View File

@ -45,39 +45,39 @@ def close_thread(pipe):
class BufferedPipeTest(ParamikoTest):
def test_1_buffered_pipe(self):
p = BufferedPipe()
self.assert_(not p.read_ready())
self.assertTrue(not p.read_ready())
p.feed('hello.')
self.assert_(p.read_ready())
self.assertTrue(p.read_ready())
data = p.read(6)
self.assertEquals(b('hello.'), data)
self.assertEqual(b('hello.'), data)
p.feed('plus/minus')
self.assertEquals(b('plu'), p.read(3))
self.assertEquals(b('s/m'), p.read(3))
self.assertEquals(b('inus'), p.read(4))
self.assertEqual(b('plu'), p.read(3))
self.assertEqual(b('s/m'), p.read(3))
self.assertEqual(b('inus'), p.read(4))
p.close()
self.assert_(not p.read_ready())
self.assertEquals(b(''), p.read(1))
self.assertTrue(not p.read_ready())
self.assertEqual(b(''), p.read(1))
def test_2_delay(self):
p = BufferedPipe()
self.assert_(not p.read_ready())
self.assertTrue(not p.read_ready())
threading.Thread(target=delay_thread, args=(p,)).start()
self.assertEquals(b('a'), p.read(1, 0.1))
self.assertEqual(b('a'), p.read(1, 0.1))
try:
p.read(1, 0.1)
self.assert_(False)
self.assertTrue(False)
except PipeTimeout:
pass
self.assertEquals(b('b'), p.read(1, 1.0))
self.assertEquals(b(''), p.read(1))
self.assertEqual(b('b'), p.read(1, 1.0))
self.assertEqual(b(''), p.read(1))
def test_3_close_while_reading(self):
p = BufferedPipe()
threading.Thread(target=close_thread, args=(p,)).start()
data = p.read(1, 1.0)
self.assertEquals(b(''), data)
self.assertEqual(b(''), data)
def test_4_or_pipe(self):
p = pipe.make_pipe()

View File

@ -95,10 +95,10 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
self.assert_(self.event.isSet())
self.assert_(self.ts.is_active())
self.assertEquals('slowdive', self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
@ -107,10 +107,10 @@ class SSHClientTest (unittest.TestCase):
schan.send_stderr('This is on stderr.\n')
schan.close()
self.assertEquals('Hello there.\n', stdout.readline())
self.assertEquals('', stdout.readline())
self.assertEquals('This is on stderr.\n', stderr.readline())
self.assertEquals('', stderr.readline())
self.assertEqual('Hello there.\n', stdout.readline())
self.assertEqual('', stdout.readline())
self.assertEqual('This is on stderr.\n', stderr.readline())
self.assertEqual('', stderr.readline())
stdin.close()
stdout.close()
@ -128,10 +128,10 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', key_filename=test_path('test_dss.key'))
self.event.wait(1.0)
self.assert_(self.event.isSet())
self.assert_(self.ts.is_active())
self.assertEquals('slowdive', self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
@ -140,10 +140,10 @@ class SSHClientTest (unittest.TestCase):
schan.send_stderr('This is on stderr.\n')
schan.close()
self.assertEquals('Hello there.\n', stdout.readline())
self.assertEquals('', stdout.readline())
self.assertEquals('This is on stderr.\n', stderr.readline())
self.assertEquals('', stderr.readline())
self.assertEqual('Hello there.\n', stdout.readline())
self.assertEqual('', stdout.readline())
self.assertEqual('This is on stderr.\n', stderr.readline())
self.assertEqual('', stderr.readline())
stdin.close()
stdout.close()
@ -161,10 +161,10 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', key_filename=[ test_path('test_rsa.key'), test_path('test_dss.key') ])
self.event.wait(1.0)
self.assert_(self.event.isSet())
self.assert_(self.ts.is_active())
self.assertEquals('slowdive', self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
def test_4_auto_add_policy(self):
"""
@ -175,16 +175,16 @@ class SSHClientTest (unittest.TestCase):
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEquals(0, len(self.tc.get_host_keys()))
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
self.assert_(self.event.isSet())
self.assert_(self.ts.is_active())
self.assertEquals('slowdive', self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
self.assertEquals(1, len(self.tc.get_host_keys()))
self.assertEquals(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa'])
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
self.assertEqual(1, len(self.tc.get_host_keys()))
self.assertEqual(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa'])
def test_5_cleanup(self):
"""
@ -196,12 +196,12 @@ class SSHClientTest (unittest.TestCase):
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEquals(0, len(self.tc.get_host_keys()))
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
self.assert_(self.event.isSet())
self.assert_(self.ts.is_active())
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
p = weakref.ref(self.tc._transport.packetizer)
self.assertTrue(p() is not None)

View File

@ -54,7 +54,7 @@ class BufferedFileTest (unittest.TestCase):
f = LoopbackFile('r')
try:
f.write('hi')
self.assert_(False, 'no exception on write to read-only file')
self.assertTrue(False, 'no exception on write to read-only file')
except:
pass
f.close()
@ -62,7 +62,7 @@ class BufferedFileTest (unittest.TestCase):
f = LoopbackFile('w')
try:
f.read(1)
self.assert_(False, 'no exception to read from write-only file')
self.assertTrue(False, 'no exception to read from write-only file')
except:
pass
f.close()
@ -81,12 +81,12 @@ class BufferedFileTest (unittest.TestCase):
f.close()
try:
f.readline()
self.assert_(False, 'no exception on readline of closed file')
self.assertTrue(False, 'no exception on readline of closed file')
except IOError:
pass
self.assert_(linefeed_byte in f.newlines)
self.assert_(crlf in f.newlines)
self.assert_(cr_byte not in f.newlines)
self.assertTrue(linefeed_byte in f.newlines)
self.assertTrue(crlf in f.newlines)
self.assertTrue(cr_byte not in f.newlines)
def test_3_lf(self):
"""

View File

@ -25,7 +25,7 @@ from binascii import hexlify
import os
import unittest
import paramiko
from paramiko.py3compat import b
from paramiko.py3compat import b, decodebytes
test_hosts_file = """\
@ -65,42 +65,42 @@ class HostKeysTest (unittest.TestCase):
def test_1_load(self):
hostdict = paramiko.HostKeys('hostfile.temp')
self.assertEquals(2, len(hostdict))
self.assertEquals(1, len(list(hostdict.values())[0]))
self.assertEquals(1, len(list(hostdict.values())[1]))
self.assertEqual(2, len(hostdict))
self.assertEqual(1, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
self.assertEquals(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
def test_2_add(self):
hostdict = paramiko.HostKeys('hostfile.temp')
hh = '|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c='
key = paramiko.RSAKey(data=base64.decodestring(keyblob))
key = paramiko.RSAKey(data=decodebytes(keyblob))
hostdict.add(hh, 'ssh-rsa', key)
self.assertEquals(3, len(list(hostdict)))
self.assertEqual(3, len(list(hostdict)))
x = hostdict['foo.example.com']
fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
self.assertEquals(b('7EC91BB336CB6D810B124B1353C32396'), fp)
self.assert_(hostdict.check('foo.example.com', key))
self.assertEqual(b('7EC91BB336CB6D810B124B1353C32396'), fp)
self.assertTrue(hostdict.check('foo.example.com', key))
def test_3_dict(self):
hostdict = paramiko.HostKeys('hostfile.temp')
self.assert_('secure.example.com' in hostdict)
self.assert_('not.example.com' not in hostdict)
self.assert_('secure.example.com' in hostdict)
self.assert_('not.example.com' not in hostdict)
self.assertTrue('secure.example.com' in hostdict)
self.assertTrue('not.example.com' not in hostdict)
self.assertTrue('secure.example.com' in hostdict)
self.assertTrue('not.example.com' not in hostdict)
x = hostdict.get('secure.example.com', None)
self.assert_(x is not None)
self.assertTrue(x is not None)
fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
self.assertEquals(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
i = 0
for key in hostdict:
i += 1
self.assertEquals(2, i)
self.assertEqual(2, i)
def test_4_dict_set(self):
hostdict = paramiko.HostKeys('hostfile.temp')
key = paramiko.RSAKey(data=base64.decodestring(keyblob))
key_dss = paramiko.DSSKey(data=base64.decodestring(keyblob_dss))
key = paramiko.RSAKey(data=decodebytes(keyblob))
key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss))
hostdict['secure.example.com'] = {
'ssh-rsa': key,
'ssh-dss': key_dss
@ -108,11 +108,11 @@ class HostKeysTest (unittest.TestCase):
hostdict['fake.example.com'] = {}
hostdict['fake.example.com']['ssh-rsa'] = key
self.assertEquals(3, len(hostdict))
self.assertEquals(2, len(list(hostdict.values())[0]))
self.assertEquals(1, len(list(hostdict.values())[1]))
self.assertEquals(1, len(list(hostdict.values())[2]))
self.assertEqual(3, len(hostdict))
self.assertEqual(2, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
self.assertEqual(1, len(list(hostdict.values())[2]))
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
self.assertEquals(b('7EC91BB336CB6D810B124B1353C32396'), fp)
self.assertEqual(b('7EC91BB336CB6D810B124B1353C32396'), fp)
fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper()
self.assertEquals(b('4478F0B9A23CC5182009FF755BC1D26C'), fp)
self.assertEqual(b('4478F0B9A23CC5182009FF755BC1D26C'), fp)

View File

@ -92,8 +92,8 @@ class KexTest (unittest.TestCase):
kex = KexGroup1(transport)
kex.start_kex()
x = b('1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4')
self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
self.assertEquals((paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect)
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual((paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect)
# fake "reply"
msg = Message()
@ -103,17 +103,17 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg)
H = b('03079780F3D3AD0B3C6DB30C8D21685F367A86D2')
self.assertEquals(self.K, transport._K)
self.assertEquals(H, hexlify(transport._H).upper())
self.assertEquals((b('fake-host-key'), b('fake-sig')), transport._verify)
self.assert_(transport._activated)
self.assertEqual(self.K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify)
self.assertTrue(transport._activated)
def test_2_group1_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGroup1(transport)
kex.start_kex()
self.assertEquals((paramiko.kex_group1._MSG_KEXDH_INIT,), transport._expect)
self.assertEqual((paramiko.kex_group1._MSG_KEXDH_INIT,), transport._expect)
msg = Message()
msg.add_mpint(69)
@ -121,10 +121,10 @@ class KexTest (unittest.TestCase):
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg)
H = b('B16BF34DD10945EDE84E9C1EF24A14BFDC843389')
x = b('1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967')
self.assertEquals(self.K, transport._K)
self.assertEquals(H, hexlify(transport._H).upper())
self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
self.assert_(transport._activated)
self.assertEqual(self.K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
def test_3_gex_client(self):
transport = FakeTransport()
@ -132,8 +132,8 @@ class KexTest (unittest.TestCase):
kex = KexGex(transport)
kex.start_kex()
x = b('22000004000000080000002000')
self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
msg = Message()
msg.add_mpint(FakeModulusPack.P)
@ -141,8 +141,8 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
x = b('20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4')
self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
msg = Message()
msg.add_string('fake-host-key')
@ -151,10 +151,10 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
H = b('A265563F2FA87F1A89BF007EE90D58BE2E4A4BD0')
self.assertEquals(self.K, transport._K)
self.assertEquals(H, hexlify(transport._H).upper())
self.assertEquals((b('fake-host-key'), b('fake-sig')), transport._verify)
self.assert_(transport._activated)
self.assertEqual(self.K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify)
self.assertTrue(transport._activated)
def test_4_gex_old_client(self):
transport = FakeTransport()
@ -162,8 +162,8 @@ class KexTest (unittest.TestCase):
kex = KexGex(transport)
kex.start_kex(_test_old_style=True)
x = b('1E00000800')
self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
msg = Message()
msg.add_mpint(FakeModulusPack.P)
@ -171,8 +171,8 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
x = b('20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4')
self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
msg = Message()
msg.add_string('fake-host-key')
@ -181,17 +181,17 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
H = b('807F87B269EF7AC5EC7E75676808776A27D5864C')
self.assertEquals(self.K, transport._K)
self.assertEquals(H, hexlify(transport._H).upper())
self.assertEquals((b('fake-host-key'), b('fake-sig')), transport._verify)
self.assert_(transport._activated)
self.assertEqual(self.K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify)
self.assertTrue(transport._activated)
def test_5_gex_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGex(transport)
kex.start_kex()
self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
msg = Message()
msg.add_int(1024)
@ -200,8 +200,8 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
x = b('1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102')
self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
msg = Message()
msg.add_mpint(12345)
@ -210,25 +210,25 @@ class KexTest (unittest.TestCase):
K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
H = b('CE754197C21BF3452863B4F44D0B3951F12516EF')
x = b('210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967')
self.assertEquals(K, transport._K)
self.assertEquals(H, hexlify(transport._H).upper())
self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
self.assert_(transport._activated)
self.assertEqual(K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
def test_6_gex_server_with_old_client(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGex(transport)
kex.start_kex()
self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
msg = Message()
msg.add_int(2048)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
x = b('1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102')
self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
msg = Message()
msg.add_mpint(12345)
@ -237,7 +237,7 @@ class KexTest (unittest.TestCase):
K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
H = b('B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B')
x = b('210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967')
self.assertEquals(K, transport._K)
self.assertEquals(H, hexlify(transport._H).upper())
self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
self.assert_(transport._activated)
self.assertEqual(K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)

View File

@ -40,7 +40,7 @@ class MessageTest (unittest.TestCase):
msg.add_string('q')
msg.add_string('hello')
msg.add_string('x' * 1000)
self.assertEquals(msg.asbytes(), self.__a)
self.assertEqual(msg.asbytes(), self.__a)
msg = Message()
msg.add_boolean(True)
@ -49,7 +49,7 @@ class MessageTest (unittest.TestCase):
msg.add_bytes(zero_byte + byte_chr(0x3f))
msg.add_list(['huey', 'dewey', 'louie'])
self.assertEquals(msg.asbytes(), self.__b)
self.assertEqual(msg.asbytes(), self.__b)
msg = Message()
msg.add_int64(5)
@ -57,29 +57,29 @@ class MessageTest (unittest.TestCase):
msg.add_mpint(17)
msg.add_mpint(0xf5e4d3c2b109)
msg.add_mpint(-0x65e4d3c2b109)
self.assertEquals(msg.asbytes(), self.__c)
self.assertEqual(msg.asbytes(), self.__c)
def test_2_decode(self):
msg = Message(self.__a)
self.assertEquals(msg.get_int(), 23)
self.assertEquals(msg.get_int(), 123789456)
self.assertEquals(msg.get_text(), 'q')
self.assertEquals(msg.get_text(), 'hello')
self.assertEquals(msg.get_text(), 'x' * 1000)
self.assertEqual(msg.get_int(), 23)
self.assertEqual(msg.get_int(), 123789456)
self.assertEqual(msg.get_text(), 'q')
self.assertEqual(msg.get_text(), 'hello')
self.assertEqual(msg.get_text(), 'x' * 1000)
msg = Message(self.__b)
self.assertEquals(msg.get_boolean(), True)
self.assertEquals(msg.get_boolean(), False)
self.assertEquals(msg.get_byte(), byte_chr(0xf3))
self.assertEquals(msg.get_bytes(2), zero_byte + byte_chr(0x3f))
self.assertEquals(msg.get_list(), ['huey', 'dewey', 'louie'])
self.assertEqual(msg.get_boolean(), True)
self.assertEqual(msg.get_boolean(), False)
self.assertEqual(msg.get_byte(), byte_chr(0xf3))
self.assertEqual(msg.get_bytes(2), zero_byte + byte_chr(0x3f))
self.assertEqual(msg.get_list(), ['huey', 'dewey', 'louie'])
msg = Message(self.__c)
self.assertEquals(msg.get_int64(), 5)
self.assertEquals(msg.get_int64(), 0xf5e4d3c2b109)
self.assertEquals(msg.get_mpint(), 17)
self.assertEquals(msg.get_mpint(), 0xf5e4d3c2b109)
self.assertEquals(msg.get_mpint(), -0x65e4d3c2b109)
self.assertEqual(msg.get_int64(), 5)
self.assertEqual(msg.get_int64(), 0xf5e4d3c2b109)
self.assertEqual(msg.get_mpint(), 17)
self.assertEqual(msg.get_mpint(), 0xf5e4d3c2b109)
self.assertEqual(msg.get_mpint(), -0x65e4d3c2b109)
def test_3_add(self):
msg = Message()
@ -89,16 +89,16 @@ class MessageTest (unittest.TestCase):
msg.add(True)
msg.add('cat')
msg.add(['a', 'b'])
self.assertEquals(msg.asbytes(), self.__d)
self.assertEqual(msg.asbytes(), self.__d)
def test_4_misc(self):
msg = Message(self.__d)
self.assertEquals(msg.get_int(), 5)
self.assertEquals(msg.get_int(), 0x1122334455)
self.assertEquals(msg.get_int(), 0xf00000000000000000)
self.assertEquals(msg.get_so_far(), self.__d[:29])
self.assertEquals(msg.get_remainder(), self.__d[29:])
self.assertEqual(msg.get_int(), 5)
self.assertEqual(msg.get_int(), 0x1122334455)
self.assertEqual(msg.get_int(), 0xf00000000000000000)
self.assertEqual(msg.get_so_far(), self.__d[:29])
self.assertEqual(msg.get_remainder(), self.__d[29:])
msg.rewind()
self.assertEquals(msg.get_int(), 5)
self.assertEquals(msg.get_so_far(), self.__d[:4])
self.assertEquals(msg.get_remainder(), self.__d[4:])
self.assertEqual(msg.get_int(), 5)
self.assertEqual(msg.get_so_far(), self.__d[:4])
self.assertEqual(msg.get_remainder(), self.__d[4:])

View File

@ -54,8 +54,8 @@ class PacketizerTest (unittest.TestCase):
p.send_message(m)
data = rsock.recv(100)
# 32 + 12 bytes of MAC = 44
self.assertEquals(44, len(data))
self.assertEquals(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c0')), data[:16])
self.assertEqual(44, len(data))
self.assertEqual(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c0')), data[:16])
def test_2_read (self):
rsock = LoopSocket()
@ -68,7 +68,7 @@ class PacketizerTest (unittest.TestCase):
p.set_inbound_cipher(cipher, 16, SHA, 12, x1f * 20)
wsock.send(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c090d216560d717361387c4c3dfb977de26e03b1a0c21cd641414cb459')))
cmd, m = p.read_message()
self.assertEquals(100, cmd)
self.assertEquals(100, m.get_int())
self.assertEquals(1, m.get_int())
self.assertEquals(900, m.get_int())
self.assertEqual(100, cmd)
self.assertEqual(100, m.get_int())
self.assertEqual(1, m.get_int())
self.assertEqual(900, m.get_int())

View File

@ -92,157 +92,157 @@ class KeyTest (unittest.TestCase):
from Crypto.Hash import MD5
key = util.generate_key_bytes(MD5, x1234, 'happy birthday', 30)
exp = unhexlify(b('61E1F272F4C1C4561586BD322498C0E924672780F47BB37DDA7D54019E64'))
self.assertEquals(exp, key)
self.assertEqual(exp, key)
def test_2_load_rsa(self):
key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
self.assertEquals('ssh-rsa', key.get_name())
self.assertEqual('ssh-rsa', key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(':', ''))
my_rsa = hexlify(key.get_fingerprint())
self.assertEquals(exp_rsa, my_rsa)
self.assertEquals(PUB_RSA.split()[1], key.get_base64())
self.assertEquals(1024, key.get_bits())
self.assertEqual(exp_rsa, my_rsa)
self.assertEqual(PUB_RSA.split()[1], key.get_base64())
self.assertEqual(1024, key.get_bits())
s = StringIO()
key.write_private_key(s)
self.assertEquals(RSA_PRIVATE_OUT, s.getvalue())
self.assertEqual(RSA_PRIVATE_OUT, s.getvalue())
s.seek(0)
key2 = RSAKey.from_private_key(s)
self.assertEquals(key, key2)
self.assertEqual(key, key2)
def test_3_load_rsa_password(self):
key = RSAKey.from_private_key_file(test_path('test_rsa_password.key'), 'television')
self.assertEquals('ssh-rsa', key.get_name())
self.assertEqual('ssh-rsa', key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(':', ''))
my_rsa = hexlify(key.get_fingerprint())
self.assertEquals(exp_rsa, my_rsa)
self.assertEquals(PUB_RSA.split()[1], key.get_base64())
self.assertEquals(1024, key.get_bits())
self.assertEqual(exp_rsa, my_rsa)
self.assertEqual(PUB_RSA.split()[1], key.get_base64())
self.assertEqual(1024, key.get_bits())
def test_4_load_dss(self):
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
self.assertEquals('ssh-dss', key.get_name())
self.assertEqual('ssh-dss', key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(':', ''))
my_dss = hexlify(key.get_fingerprint())
self.assertEquals(exp_dss, my_dss)
self.assertEquals(PUB_DSS.split()[1], key.get_base64())
self.assertEquals(1024, key.get_bits())
self.assertEqual(exp_dss, my_dss)
self.assertEqual(PUB_DSS.split()[1], key.get_base64())
self.assertEqual(1024, key.get_bits())
s = StringIO()
key.write_private_key(s)
self.assertEquals(DSS_PRIVATE_OUT, s.getvalue())
self.assertEqual(DSS_PRIVATE_OUT, s.getvalue())
s.seek(0)
key2 = DSSKey.from_private_key(s)
self.assertEquals(key, key2)
self.assertEqual(key, key2)
def test_5_load_dss_password(self):
key = DSSKey.from_private_key_file(test_path('test_dss_password.key'), 'television')
self.assertEquals('ssh-dss', key.get_name())
self.assertEqual('ssh-dss', key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(':', ''))
my_dss = hexlify(key.get_fingerprint())
self.assertEquals(exp_dss, my_dss)
self.assertEquals(PUB_DSS.split()[1], key.get_base64())
self.assertEquals(1024, key.get_bits())
self.assertEqual(exp_dss, my_dss)
self.assertEqual(PUB_DSS.split()[1], key.get_base64())
self.assertEqual(1024, key.get_bits())
def test_6_compare_rsa(self):
# verify that the private & public keys compare equal
key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
self.assertEquals(key, key)
self.assertEqual(key, key)
pub = RSAKey(data=key.asbytes())
self.assert_(key.can_sign())
self.assert_(not pub.can_sign())
self.assertEquals(key, pub)
self.assertTrue(key.can_sign())
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
def test_7_compare_dss(self):
# verify that the private & public keys compare equal
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
self.assertEquals(key, key)
self.assertEqual(key, key)
pub = DSSKey(data=key.asbytes())
self.assert_(key.can_sign())
self.assert_(not pub.can_sign())
self.assertEquals(key, pub)
self.assertTrue(key.can_sign())
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
def test_8_sign_rsa(self):
# verify that the rsa private key can sign and verify
key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
msg = key.sign_ssh_data(rng, b('ice weasels'))
self.assert_(type(msg) is Message)
self.assertTrue(type(msg) is Message)
msg.rewind()
self.assertEquals('ssh-rsa', msg.get_text())
self.assertEqual('ssh-rsa', msg.get_text())
sig = bytes().join([byte_chr(int(x, 16)) for x in SIGNED_RSA.split(':')])
self.assertEquals(sig, msg.get_binary())
self.assertEqual(sig, msg.get_binary())
msg.rewind()
pub = RSAKey(data=key.asbytes())
self.assert_(pub.verify_ssh_sig(b('ice weasels'), msg))
self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg))
def test_9_sign_dss(self):
# verify that the dss private key can sign and verify
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
msg = key.sign_ssh_data(rng, b('ice weasels'))
self.assert_(type(msg) is Message)
self.assertTrue(type(msg) is Message)
msg.rewind()
self.assertEquals('ssh-dss', msg.get_text())
self.assertEqual('ssh-dss', msg.get_text())
# can't do the same test as we do for RSA, because DSS signatures
# are usually different each time. but we can test verification
# anyway so it's ok.
self.assertEquals(40, len(msg.get_binary()))
self.assertEqual(40, len(msg.get_binary()))
msg.rewind()
pub = DSSKey(data=key.asbytes())
self.assert_(pub.verify_ssh_sig(b('ice weasels'), msg))
self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg))
def test_A_generate_rsa(self):
key = RSAKey.generate(1024)
msg = key.sign_ssh_data(rng, b('jerri blank'))
msg.rewind()
self.assert_(key.verify_ssh_sig(b('jerri blank'), msg))
self.assertTrue(key.verify_ssh_sig(b('jerri blank'), msg))
def test_B_generate_dss(self):
key = DSSKey.generate(1024)
msg = key.sign_ssh_data(rng, b('jerri blank'))
msg.rewind()
self.assert_(key.verify_ssh_sig(b('jerri blank'), msg))
self.assertTrue(key.verify_ssh_sig(b('jerri blank'), msg))
def test_10_load_ecdsa(self):
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
self.assertEquals('ecdsa-sha2-nistp256', key.get_name())
self.assertEqual('ecdsa-sha2-nistp256', key.get_name())
exp_ecdsa = b(FINGER_ECDSA.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
self.assertEquals(exp_ecdsa, my_ecdsa)
self.assertEquals(PUB_ECDSA.split()[1], key.get_base64())
self.assertEquals(256, key.get_bits())
self.assertEqual(exp_ecdsa, my_ecdsa)
self.assertEqual(PUB_ECDSA.split()[1], key.get_base64())
self.assertEqual(256, key.get_bits())
s = StringIO()
key.write_private_key(s)
self.assertEquals(ECDSA_PRIVATE_OUT, s.getvalue())
self.assertEqual(ECDSA_PRIVATE_OUT, s.getvalue())
s.seek(0)
key2 = ECDSAKey.from_private_key(s)
self.assertEquals(key, key2)
self.assertEqual(key, key2)
def test_11_load_ecdsa_password(self):
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password.key'), b('television'))
self.assertEquals('ecdsa-sha2-nistp256', key.get_name())
self.assertEqual('ecdsa-sha2-nistp256', key.get_name())
exp_ecdsa = b(FINGER_ECDSA.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
self.assertEquals(exp_ecdsa, my_ecdsa)
self.assertEquals(PUB_ECDSA.split()[1], key.get_base64())
self.assertEquals(256, key.get_bits())
self.assertEqual(exp_ecdsa, my_ecdsa)
self.assertEqual(PUB_ECDSA.split()[1], key.get_base64())
self.assertEqual(256, key.get_bits())
def test_12_compare_ecdsa(self):
# verify that the private & public keys compare equal
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
self.assertEquals(key, key)
self.assertEqual(key, key)
pub = ECDSAKey(data=key.asbytes())
self.assert_(key.can_sign())
self.assert_(not pub.can_sign())
self.assertEquals(key, pub)
self.assertTrue(key.can_sign())
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
def test_13_sign_ecdsa(self):
# verify that the rsa private key can sign and verify
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
msg = key.sign_ssh_data(rng, b('ice weasels'))
self.assert_(type(msg) is Message)
self.assertTrue(type(msg) is Message)
msg.rewind()
self.assertEquals('ecdsa-sha2-nistp256', msg.get_text())
self.assertEqual('ecdsa-sha2-nistp256', msg.get_text())
# ECDSA signatures, like DSS signatures, tend to be different
# each time, so we can't compare against a "known correct"
# signature.
@ -250,4 +250,4 @@ class KeyTest (unittest.TestCase):
msg.rewind()
pub = ECDSAKey(data=key.asbytes())
self.assert_(pub.verify_ssh_sig(b('ice weasels'), msg))
self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg))

View File

@ -162,8 +162,8 @@ class SFTPTest (unittest.TestCase):
f = sftp.open(FOLDER + '/test', 'w')
try:
self.assertEqual(f.stat().st_size, 0)
f.close()
finally:
f.close()
sftp.remove(FOLDER + '/test')
def test_2_close(self):
@ -219,8 +219,8 @@ class SFTPTest (unittest.TestCase):
self.assertEqual(f.stat().st_size, 37)
f.seek(-26, f.SEEK_CUR)
self.assertEqual(f.readline(), 'second line\n')
f.close()
finally:
f.close()
sftp.remove(FOLDER + '/append.txt')
def test_5_rename(self):

View File

@ -262,6 +262,7 @@ class BigSFTPTest (unittest.TestCase):
for i in range(10):
f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
f.prefetch()
f.close()
f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
f.prefetch()
for n in range(1024):

View File

@ -132,28 +132,28 @@ class TransportTest(ParamikoTest):
event = threading.Event()
self.server = NullServer()
self.assert_(not event.isSet())
self.assertTrue(not event.isSet())
self.ts.start_server(event, self.server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
self.assertTrue(event.isSet())
self.assertTrue(self.ts.is_active())
def test_1_security_options(self):
o = self.tc.get_security_options()
self.assertEquals(type(o), SecurityOptions)
self.assert_(('aes256-cbc', 'blowfish-cbc') != o.ciphers)
self.assertEqual(type(o), SecurityOptions)
self.assertTrue(('aes256-cbc', 'blowfish-cbc') != o.ciphers)
o.ciphers = ('aes256-cbc', 'blowfish-cbc')
self.assertEquals(('aes256-cbc', 'blowfish-cbc'), o.ciphers)
self.assertEqual(('aes256-cbc', 'blowfish-cbc'), o.ciphers)
try:
o.ciphers = ('aes256-cbc', 'made-up-cipher')
self.assert_(False)
self.assertTrue(False)
except ValueError:
pass
try:
o.ciphers = 23
self.assert_(False)
self.assertTrue(False)
except TypeError:
pass
@ -162,7 +162,7 @@ class TransportTest(ParamikoTest):
self.tc.H = unhexlify(b('0C8307CDE6856FF30BA93684EB0F04C2520E9ED3'))
self.tc.session_id = self.tc.H
key = self.tc._compute_key('C', 32)
self.assertEquals(b('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995'),
self.assertEqual(b('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995'),
hexlify(key).upper())
def test_3_simple(self):
@ -176,21 +176,21 @@ class TransportTest(ParamikoTest):
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
self.assert_(not event.isSet())
self.assertEquals(None, self.tc.get_username())
self.assertEquals(None, self.ts.get_username())
self.assertEquals(False, self.tc.is_authenticated())
self.assertEquals(False, self.ts.is_authenticated())
self.assertTrue(not event.isSet())
self.assertEqual(None, self.tc.get_username())
self.assertEqual(None, self.ts.get_username())
self.assertEqual(False, self.tc.is_authenticated())
self.assertEqual(False, self.ts.is_authenticated())
self.ts.start_server(event, server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
self.assertEquals('slowdive', self.tc.get_username())
self.assertEquals('slowdive', self.ts.get_username())
self.assertEquals(True, self.tc.is_authenticated())
self.assertEquals(True, self.ts.is_authenticated())
self.assertTrue(event.isSet())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.tc.get_username())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.tc.is_authenticated())
self.assertEqual(True, self.ts.is_authenticated())
def test_3a_long_banner(self):
"""
@ -201,14 +201,14 @@ class TransportTest(ParamikoTest):
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
self.assert_(not event.isSet())
self.assertTrue(not event.isSet())
self.socks.send(LONG_BANNER)
self.ts.start_server(event, server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
self.assertTrue(event.isSet())
self.assertTrue(self.ts.is_active())
def test_4_special(self):
"""
@ -219,10 +219,10 @@ class TransportTest(ParamikoTest):
options.ciphers = ('aes256-cbc',)
options.digests = ('hmac-md5-96',)
self.setup_test_server(client_options=force_algorithms)
self.assertEquals('aes256-cbc', self.tc.local_cipher)
self.assertEquals('aes256-cbc', self.tc.remote_cipher)
self.assertEquals(12, self.tc.packetizer.get_mac_size_out())
self.assertEquals(12, self.tc.packetizer.get_mac_size_in())
self.assertEqual('aes256-cbc', self.tc.local_cipher)
self.assertEqual('aes256-cbc', self.tc.remote_cipher)
self.assertEqual(12, self.tc.packetizer.get_mac_size_out())
self.assertEqual(12, self.tc.packetizer.get_mac_size_in())
self.tc.send_ignore(1024)
self.tc.renegotiate_keys()
@ -233,10 +233,10 @@ class TransportTest(ParamikoTest):
verify that the keepalive will be sent.
"""
self.setup_test_server()
self.assertEquals(None, getattr(self.server, '_global_request', None))
self.assertEqual(None, getattr(self.server, '_global_request', None))
self.tc.set_keepalive(1)
time.sleep(2)
self.assertEquals('keepalive@lag.net', self.server._global_request)
self.assertEqual('keepalive@lag.net', self.server._global_request)
def test_6_exec_command(self):
"""
@ -248,7 +248,7 @@ class TransportTest(ParamikoTest):
schan = self.ts.accept(1.0)
try:
chan.exec_command('no')
self.assert_(False)
self.assertTrue(False)
except SSHException:
pass
@ -260,11 +260,11 @@ class TransportTest(ParamikoTest):
schan.close()
f = chan.makefile()
self.assertEquals('Hello there.\n', f.readline())
self.assertEquals('', f.readline())
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('', f.readline())
f = chan.makefile_stderr()
self.assertEquals('This is on stderr.\n', f.readline())
self.assertEquals('', f.readline())
self.assertEqual('This is on stderr.\n', f.readline())
self.assertEqual('', f.readline())
# now try it with combined stdout/stderr
chan = self.tc.open_session()
@ -276,9 +276,9 @@ class TransportTest(ParamikoTest):
chan.set_combine_stderr(True)
f = chan.makefile()
self.assertEquals('Hello there.\n', f.readline())
self.assertEquals('This is on stderr.\n', f.readline())
self.assertEquals('', f.readline())
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('This is on stderr.\n', f.readline())
self.assertEqual('', f.readline())
def test_7_invoke_shell(self):
"""
@ -290,9 +290,9 @@ class TransportTest(ParamikoTest):
schan = self.ts.accept(1.0)
chan.send('communist j. cat\n')
f = schan.makefile()
self.assertEquals('communist j. cat\n', f.readline())
self.assertEqual('communist j. cat\n', f.readline())
chan.close()
self.assertEquals('', f.readline())
self.assertEqual('', f.readline())
def test_8_channel_exception(self):
"""
@ -304,7 +304,7 @@ class TransportTest(ParamikoTest):
self.fail('expected exception')
except ChannelException:
x = sys.exc_info()[1]
self.assert_(x.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED)
self.assertTrue(x.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED)
def test_9_exit_status(self):
"""
@ -316,7 +316,7 @@ class TransportTest(ParamikoTest):
schan = self.ts.accept(1.0)
chan.exec_command('yes')
schan.send('Hello there.\n')
self.assert_(not chan.exit_status_ready())
self.assertTrue(not chan.exit_status_ready())
# trigger an EOF
schan.shutdown_read()
schan.shutdown_write()
@ -324,15 +324,15 @@ class TransportTest(ParamikoTest):
schan.close()
f = chan.makefile()
self.assertEquals('Hello there.\n', f.readline())
self.assertEquals('', f.readline())
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('', f.readline())
count = 0
while not chan.exit_status_ready():
time.sleep(0.1)
count += 1
if count > 50:
raise Exception("timeout")
self.assertEquals(23, chan.recv_exit_status())
self.assertEqual(23, chan.recv_exit_status())
chan.close()
def test_A_select(self):
@ -346,9 +346,9 @@ class TransportTest(ParamikoTest):
# nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEquals([], r)
self.assertEquals([], w)
self.assertEquals([], e)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
schan.send('hello\n')
@ -358,17 +358,17 @@ class TransportTest(ParamikoTest):
if chan in r:
break
time.sleep(0.1)
self.assertEquals([chan], r)
self.assertEquals([], w)
self.assertEquals([], e)
self.assertEqual([chan], r)
self.assertEqual([], w)
self.assertEqual([], e)
self.assertEquals(b('hello\n'), chan.recv(6))
self.assertEqual(b('hello\n'), chan.recv(6))
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEquals([], r)
self.assertEquals([], w)
self.assertEquals([], e)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
schan.close()
@ -378,17 +378,17 @@ class TransportTest(ParamikoTest):
if chan in r:
break
time.sleep(0.1)
self.assertEquals([chan], r)
self.assertEquals([], w)
self.assertEquals([], e)
self.assertEquals(bytes(), chan.recv(16))
self.assertEqual([chan], r)
self.assertEqual([], w)
self.assertEqual([], e)
self.assertEqual(bytes(), chan.recv(16))
# make sure the pipe is still open for now...
p = chan._pipe
self.assertEquals(False, p._closed)
self.assertEqual(False, p._closed)
chan.close()
# ...and now is closed.
self.assertEquals(True, p._closed)
self.assertEqual(True, p._closed)
def test_B_renegotiate(self):
"""
@ -400,7 +400,7 @@ class TransportTest(ParamikoTest):
chan.exec_command('yes')
schan = self.ts.accept(1.0)
self.assertEquals(self.tc.H, self.tc.session_id)
self.assertEqual(self.tc.H, self.tc.session_id)
for i in range(20):
chan.send('x' * 1024)
chan.close()
@ -410,7 +410,7 @@ class TransportTest(ParamikoTest):
if self.tc.H != self.tc.session_id:
break
time.sleep(0.1)
self.assertNotEquals(self.tc.H, self.tc.session_id)
self.assertNotEqual(self.tc.H, self.tc.session_id)
schan.close()
@ -429,8 +429,8 @@ class TransportTest(ParamikoTest):
chan.send('x' * 1024)
bytes2 = self.tc.packetizer._Packetizer__sent_bytes
# tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :)
self.assert_(bytes2 - bytes < 1024)
self.assertEquals(52, bytes2 - bytes)
self.assertTrue(bytes2 - bytes < 1024)
self.assertEqual(52, bytes2 - bytes)
chan.close()
schan.close()
@ -450,20 +450,20 @@ class TransportTest(ParamikoTest):
requested.append((addr, port))
self.tc._queue_incoming_channel(c)
self.assertEquals(None, getattr(self.server, '_x11_screen_number', None))
self.assertEqual(None, getattr(self.server, '_x11_screen_number', None))
cookie = chan.request_x11(0, single_connection=True, handler=handler)
self.assertEquals(0, self.server._x11_screen_number)
self.assertEquals('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol)
self.assertEquals(cookie, self.server._x11_auth_cookie)
self.assertEquals(True, self.server._x11_single_connection)
self.assertEqual(0, self.server._x11_screen_number)
self.assertEqual('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol)
self.assertEqual(cookie, self.server._x11_auth_cookie)
self.assertEqual(True, self.server._x11_single_connection)
x11_server = self.ts.open_x11_channel(('localhost', 6093))
x11_client = self.tc.accept()
self.assertEquals('localhost', requested[0][0])
self.assertEquals(6093, requested[0][1])
self.assertEqual('localhost', requested[0][0])
self.assertEqual(6093, requested[0][1])
x11_server.send('hello')
self.assertEquals(b('hello'), x11_client.recv(5))
self.assertEqual(b('hello'), x11_client.recv(5))
x11_server.close()
x11_client.close()
@ -487,7 +487,7 @@ class TransportTest(ParamikoTest):
self.tc._queue_incoming_channel(c)
port = self.tc.request_port_forward('127.0.0.1', 0, handler)
self.assertEquals(port, self.server._listen.getsockname()[1])
self.assertEqual(port, self.server._listen.getsockname()[1])
cs = socket.socket()
cs.connect(('127.0.0.1', port))
@ -496,7 +496,7 @@ class TransportTest(ParamikoTest):
cch = self.tc.accept()
sch.send('hello')
self.assertEquals(b('hello'), cch.recv(5))
self.assertEqual(b('hello'), cch.recv(5))
sch.close()
cch.close()
ss.close()
@ -533,7 +533,7 @@ class TransportTest(ParamikoTest):
sch.send(cch.recv(8192))
sch.close()
self.assertEquals(b('Hello!\n'), cs.recv(7))
self.assertEqual(b('Hello!\n'), cs.recv(7))
cs.close()
def test_G_stderr_select(self):
@ -548,9 +548,9 @@ class TransportTest(ParamikoTest):
# nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEquals([], r)
self.assertEquals([], w)
self.assertEquals([], e)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
schan.send_stderr('hello\n')
@ -560,17 +560,17 @@ class TransportTest(ParamikoTest):
if chan in r:
break
time.sleep(0.1)
self.assertEquals([chan], r)
self.assertEquals([], w)
self.assertEquals([], e)
self.assertEqual([chan], r)
self.assertEqual([], w)
self.assertEqual([], e)
self.assertEquals(b('hello\n'), chan.recv_stderr(6))
self.assertEqual(b('hello\n'), chan.recv_stderr(6))
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEquals([], r)
self.assertEquals([], w)
self.assertEquals([], e)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
schan.close()
chan.close()
@ -584,7 +584,7 @@ class TransportTest(ParamikoTest):
chan.invoke_shell()
schan = self.ts.accept(1.0)
self.assertEquals(chan.send_ready(), True)
self.assertEqual(chan.send_ready(), True)
total = 0
K = '*' * 1024
while total < 1024 * 1024:
@ -592,11 +592,11 @@ class TransportTest(ParamikoTest):
total += len(K)
if not chan.send_ready():
break
self.assert_(total < 1024 * 1024)
self.assertTrue(total < 1024 * 1024)
schan.close()
chan.close()
self.assertEquals(chan.send_ready(), True)
self.assertEqual(chan.send_ready(), True)
def test_I_rekey_deadlock(self):
"""

View File

@ -103,7 +103,7 @@ class UtilTest(ParamikoTest):
global test_config_file
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEquals(config._config,
self.assertEqual(config._config,
[{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}},
{'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}},
{'host': ['*'], 'config': {'crazy': 'something dumb '}},
@ -131,7 +131,7 @@ class UtilTest(ParamikoTest):
hostname=host,
identityfile=[os.path.expanduser("~/.ssh/id_rsa")]
)
self.assertEquals(
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
values
)
@ -139,7 +139,7 @@ class UtilTest(ParamikoTest):
def test_4_generate_key_bytes(self):
x = paramiko.util.generate_key_bytes(SHA, b('ABCDEFGH'), 'This is my secret passphrase.', 64)
hex = ''.join(['%02x' % byte_ord(c) for c in x])
self.assertEquals(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
def test_5_host_keys(self):
f = open('hostfile.temp', 'w')
@ -147,11 +147,11 @@ class UtilTest(ParamikoTest):
f.close()
try:
hostdict = paramiko.util.load_host_keys('hostfile.temp')
self.assertEquals(2, len(hostdict))
self.assertEquals(1, len(list(hostdict.values())[0]))
self.assertEquals(1, len(list(hostdict.values())[1]))
self.assertEqual(2, len(hostdict))
self.assertEqual(1, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
self.assertEquals(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
finally:
os.unlink('hostfile.temp')
@ -159,7 +159,7 @@ class UtilTest(ParamikoTest):
from paramiko.common import rng
# just verify that we can pull out 32 bytes and not get an exception.
x = rng.read(32)
self.assertEquals(len(x), 32)
self.assertEqual(len(x), 32)
def test_7_host_config_expose_issue_33(self):
test_config_file = """
@ -175,13 +175,13 @@ Host *
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
host = 'www13.example.com'
self.assertEquals(
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
{'hostname': host, 'port': '22'}
)
def test_8_eintr_retry(self):
self.assertEquals('foo', paramiko.util.retry_on_signal(lambda: 'foo'))
self.assertEqual('foo', paramiko.util.retry_on_signal(lambda: 'foo'))
# Variables that are set by raises_intr
intr_errors_remaining = [3]
@ -192,8 +192,8 @@ Host *
intr_errors_remaining[0] -= 1
raise IOError(errno.EINTR, 'file', 'interrupted system call')
self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None)
self.assertEquals(0, intr_errors_remaining[0])
self.assertEquals(4, call_count[0])
self.assertEqual(0, intr_errors_remaining[0])
self.assertEqual(4, call_count[0])
def raises_ioerror_not_eintr():
raise IOError(errno.ENOENT, 'file', 'file not found')
@ -219,7 +219,7 @@ Host equals-delimited
f = StringIO(conf)
config = paramiko.util.parse_ssh_config(f)
for host in ('space-delimited', 'equals-delimited'):
self.assertEquals(
self.assertEqual(
host_config(host, config)['proxycommand'],
'foo bar=biz baz'
)
@ -245,7 +245,7 @@ Host *
('specific', "host specific port 37 lol"),
('portonly', "host portonly port 155"),
):
self.assertEquals(
self.assertEqual(
host_config(host, config)['proxycommand'],
val
)
@ -267,7 +267,7 @@ Host *
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
host = 'www13.example.com'
self.assertEquals(
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
{'hostname': host, 'port': '8080'}
)
@ -295,7 +295,7 @@ ProxyCommand foo=bar:%h-%p
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEquals(
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
values
)
@ -325,7 +325,7 @@ IdentityFile id_dsa22
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEquals(
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
values
)