Remove byte conversions and unhexlify calls that we only needed for Py2.5 support and use the `b` byte string marker instead
This commit is contained in:
parent
981f768a62
commit
6d75c75e64
|
@ -42,10 +42,10 @@ print('Read key: ' + u(hexlify(host_key.get_fingerprint())))
|
||||||
class Server (paramiko.ServerInterface):
|
class Server (paramiko.ServerInterface):
|
||||||
# 'data' is the output of base64.encodestring(str(key))
|
# 'data' is the output of base64.encodestring(str(key))
|
||||||
# (using the "user_rsa_key" files)
|
# (using the "user_rsa_key" files)
|
||||||
data = b('AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp' + \
|
data = (b'AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp'
|
||||||
'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC' + \
|
b'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC'
|
||||||
'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT' + \
|
b'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT'
|
||||||
'UWT10hcuO4Ks8=')
|
b'UWT10hcuO4Ks8=')
|
||||||
good_pub_key = paramiko.RSAKey(data=decodebytes(data))
|
good_pub_key = paramiko.RSAKey(data=decodebytes(data))
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
|
@ -21,7 +21,6 @@ L{ECDSAKey}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import binascii
|
import binascii
|
||||||
from binascii import unhexlify
|
|
||||||
from ecdsa import SigningKey, VerifyingKey, der, curves
|
from ecdsa import SigningKey, VerifyingKey, der, curves
|
||||||
from ecdsa.util import number_to_string, sigencode_string, sigencode_strings, sigdecode_strings
|
from ecdsa.util import number_to_string, sigencode_string, sigencode_strings, sigdecode_strings
|
||||||
from Crypto.Hash import SHA256, MD5
|
from Crypto.Hash import SHA256, MD5
|
||||||
|
|
|
@ -64,7 +64,7 @@ class PosixPipe (object):
|
||||||
if self._set or self._closed:
|
if self._set or self._closed:
|
||||||
return
|
return
|
||||||
self._set = True
|
self._set = True
|
||||||
os.write(self._wfd, b('*'))
|
os.write(self._wfd, b'*')
|
||||||
|
|
||||||
def set_forever (self):
|
def set_forever (self):
|
||||||
self._forever = True
|
self._forever = True
|
||||||
|
@ -110,7 +110,7 @@ class WindowsPipe (object):
|
||||||
if self._set or self._closed:
|
if self._set or self._closed:
|
||||||
return
|
return
|
||||||
self._set = True
|
self._set = True
|
||||||
self._wsock.send(b('*'))
|
self._wsock.send(b'*')
|
||||||
|
|
||||||
def set_forever (self):
|
def set_forever (self):
|
||||||
self._forever = True
|
self._forever = True
|
||||||
|
|
|
@ -20,7 +20,6 @@
|
||||||
L{RSAKey}
|
L{RSAKey}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from binascii import unhexlify
|
|
||||||
from Crypto.PublicKey import RSA
|
from Crypto.PublicKey import RSA
|
||||||
from Crypto.Hash import SHA, MD5
|
from Crypto.Hash import SHA, MD5
|
||||||
from Crypto.Cipher import DES3
|
from Crypto.Cipher import DES3
|
||||||
|
@ -32,7 +31,7 @@ from paramiko.ber import BER, BERException
|
||||||
from paramiko.pkey import PKey
|
from paramiko.pkey import PKey
|
||||||
from paramiko.ssh_exception import SSHException
|
from paramiko.ssh_exception import SSHException
|
||||||
|
|
||||||
SHA1_DIGESTINFO = unhexlify(b('3021300906052b0e03021a05000414'))
|
SHA1_DIGESTINFO = b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'
|
||||||
|
|
||||||
|
|
||||||
class RSAKey (PKey):
|
class RSAKey (PKey):
|
||||||
|
|
|
@ -48,7 +48,7 @@ def _to_unicode(s):
|
||||||
except UnicodeError:
|
except UnicodeError:
|
||||||
return s
|
return s
|
||||||
|
|
||||||
b_slash = b('/')
|
b_slash = b'/'
|
||||||
|
|
||||||
class SFTPClient (BaseSFTP):
|
class SFTPClient (BaseSFTP):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -49,35 +49,35 @@ class BufferedPipeTest(ParamikoTest):
|
||||||
p.feed('hello.')
|
p.feed('hello.')
|
||||||
self.assertTrue(p.read_ready())
|
self.assertTrue(p.read_ready())
|
||||||
data = p.read(6)
|
data = p.read(6)
|
||||||
self.assertEqual(b('hello.'), data)
|
self.assertEqual(b'hello.', data)
|
||||||
|
|
||||||
p.feed('plus/minus')
|
p.feed('plus/minus')
|
||||||
self.assertEqual(b('plu'), p.read(3))
|
self.assertEqual(b'plu', p.read(3))
|
||||||
self.assertEqual(b('s/m'), p.read(3))
|
self.assertEqual(b's/m', p.read(3))
|
||||||
self.assertEqual(b('inus'), p.read(4))
|
self.assertEqual(b'inus', p.read(4))
|
||||||
|
|
||||||
p.close()
|
p.close()
|
||||||
self.assertTrue(not p.read_ready())
|
self.assertTrue(not p.read_ready())
|
||||||
self.assertEqual(b(''), p.read(1))
|
self.assertEqual(b'', p.read(1))
|
||||||
|
|
||||||
def test_2_delay(self):
|
def test_2_delay(self):
|
||||||
p = BufferedPipe()
|
p = BufferedPipe()
|
||||||
self.assertTrue(not p.read_ready())
|
self.assertTrue(not p.read_ready())
|
||||||
threading.Thread(target=delay_thread, args=(p,)).start()
|
threading.Thread(target=delay_thread, args=(p,)).start()
|
||||||
self.assertEqual(b('a'), p.read(1, 0.1))
|
self.assertEqual(b'a', p.read(1, 0.1))
|
||||||
try:
|
try:
|
||||||
p.read(1, 0.1)
|
p.read(1, 0.1)
|
||||||
self.assertTrue(False)
|
self.assertTrue(False)
|
||||||
except PipeTimeout:
|
except PipeTimeout:
|
||||||
pass
|
pass
|
||||||
self.assertEqual(b('b'), p.read(1, 1.0))
|
self.assertEqual(b'b', p.read(1, 1.0))
|
||||||
self.assertEqual(b(''), p.read(1))
|
self.assertEqual(b'', p.read(1))
|
||||||
|
|
||||||
def test_3_close_while_reading(self):
|
def test_3_close_while_reading(self):
|
||||||
p = BufferedPipe()
|
p = BufferedPipe()
|
||||||
threading.Thread(target=close_thread, args=(p,)).start()
|
threading.Thread(target=close_thread, args=(p,)).start()
|
||||||
data = p.read(1, 1.0)
|
data = p.read(1, 1.0)
|
||||||
self.assertEqual(b(''), data)
|
self.assertEqual(b'', data)
|
||||||
|
|
||||||
def test_4_or_pipe(self):
|
def test_4_or_pipe(self):
|
||||||
p = pipe.make_pipe()
|
p = pipe.make_pipe()
|
||||||
|
|
|
@ -45,7 +45,7 @@ class NullServer (paramiko.ServerInterface):
|
||||||
return paramiko.AUTH_FAILED
|
return paramiko.AUTH_FAILED
|
||||||
|
|
||||||
def check_auth_publickey(self, username, key):
|
def check_auth_publickey(self, username, key):
|
||||||
if (key.get_name() == 'ssh-dss') and (hexlify(key.get_fingerprint()) == b('4478f0b9a23cc5182009ff755bc1d26c')):
|
if (key.get_name() == 'ssh-dss') and key.get_fingerprint() == b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c':
|
||||||
return paramiko.AUTH_SUCCESSFUL
|
return paramiko.AUTH_SUCCESSFUL
|
||||||
return paramiko.AUTH_FAILED
|
return paramiko.AUTH_FAILED
|
||||||
|
|
||||||
|
|
|
@ -37,12 +37,12 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
|
||||||
5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M=
|
5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M=
|
||||||
"""
|
"""
|
||||||
|
|
||||||
keyblob = b("""\
|
keyblob = b"""\
|
||||||
AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31MBGQ3GQ/Fc7SX6gkpXkwcZryoi4k\
|
AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31MBGQ3GQ/Fc7SX6gkpXkwcZryoi4k\
|
||||||
NFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW5ymME3bQ4J/k1IKxCtz/bAlAqFgK\
|
NFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW5ymME3bQ4J/k1IKxCtz/bAlAqFgK\
|
||||||
oc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M=""")
|
oc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M="""
|
||||||
|
|
||||||
keyblob_dss = b("""\
|
keyblob_dss = b"""\
|
||||||
AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/\
|
AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/\
|
||||||
h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF60\
|
h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF60\
|
||||||
8EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIE\
|
8EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIE\
|
||||||
|
@ -50,7 +50,7 @@ AkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+\
|
||||||
Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg\
|
Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg\
|
||||||
4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO\
|
4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO\
|
||||||
1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o\
|
1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o\
|
||||||
0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE=""")
|
0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE="""
|
||||||
|
|
||||||
|
|
||||||
class HostKeysTest (unittest.TestCase):
|
class HostKeysTest (unittest.TestCase):
|
||||||
|
@ -68,7 +68,7 @@ class HostKeysTest (unittest.TestCase):
|
||||||
self.assertEqual(1, len(list(hostdict.values())[0]))
|
self.assertEqual(1, len(list(hostdict.values())[0]))
|
||||||
self.assertEqual(1, len(list(hostdict.values())[1]))
|
self.assertEqual(1, len(list(hostdict.values())[1]))
|
||||||
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
|
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
|
||||||
self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
|
self.assertEqual(b'E6684DB30E109B67B70FF1DC5C7F1363', fp)
|
||||||
|
|
||||||
def test_2_add(self):
|
def test_2_add(self):
|
||||||
hostdict = paramiko.HostKeys('hostfile.temp')
|
hostdict = paramiko.HostKeys('hostfile.temp')
|
||||||
|
@ -78,7 +78,7 @@ class HostKeysTest (unittest.TestCase):
|
||||||
self.assertEqual(3, len(list(hostdict)))
|
self.assertEqual(3, len(list(hostdict)))
|
||||||
x = hostdict['foo.example.com']
|
x = hostdict['foo.example.com']
|
||||||
fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
|
fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
|
||||||
self.assertEqual(b('7EC91BB336CB6D810B124B1353C32396'), fp)
|
self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
|
||||||
self.assertTrue(hostdict.check('foo.example.com', key))
|
self.assertTrue(hostdict.check('foo.example.com', key))
|
||||||
|
|
||||||
def test_3_dict(self):
|
def test_3_dict(self):
|
||||||
|
@ -90,7 +90,7 @@ class HostKeysTest (unittest.TestCase):
|
||||||
x = hostdict.get('secure.example.com', None)
|
x = hostdict.get('secure.example.com', None)
|
||||||
self.assertTrue(x is not None)
|
self.assertTrue(x is not None)
|
||||||
fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
|
fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
|
||||||
self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
|
self.assertEqual(b'E6684DB30E109B67B70FF1DC5C7F1363', fp)
|
||||||
i = 0
|
i = 0
|
||||||
for key in hostdict:
|
for key in hostdict:
|
||||||
i += 1
|
i += 1
|
||||||
|
@ -112,6 +112,6 @@ class HostKeysTest (unittest.TestCase):
|
||||||
self.assertEqual(1, len(list(hostdict.values())[1]))
|
self.assertEqual(1, len(list(hostdict.values())[1]))
|
||||||
self.assertEqual(1, len(list(hostdict.values())[2]))
|
self.assertEqual(1, len(list(hostdict.values())[2]))
|
||||||
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
|
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
|
||||||
self.assertEqual(b('7EC91BB336CB6D810B124B1353C32396'), fp)
|
self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
|
||||||
fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper()
|
fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper()
|
||||||
self.assertEqual(b('4478F0B9A23CC5182009FF755BC1D26C'), fp)
|
self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp)
|
||||||
|
|
|
@ -38,9 +38,9 @@ class FakeKey (object):
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return 'fake-key'
|
return 'fake-key'
|
||||||
def asbytes(self):
|
def asbytes(self):
|
||||||
return b('fake-key')
|
return b'fake-key'
|
||||||
def sign_ssh_data(self, rng, H):
|
def sign_ssh_data(self, rng, H):
|
||||||
return b('fake-sig')
|
return b'fake-sig'
|
||||||
|
|
||||||
|
|
||||||
class FakeModulusPack (object):
|
class FakeModulusPack (object):
|
||||||
|
@ -91,7 +91,7 @@ class KexTest (unittest.TestCase):
|
||||||
transport.server_mode = False
|
transport.server_mode = False
|
||||||
kex = KexGroup1(transport)
|
kex = KexGroup1(transport)
|
||||||
kex.start_kex()
|
kex.start_kex()
|
||||||
x = b('1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4')
|
x = b'1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
|
||||||
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
||||||
self.assertEqual((paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect)
|
self.assertEqual((paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect)
|
||||||
|
|
||||||
|
@ -102,10 +102,10 @@ class KexTest (unittest.TestCase):
|
||||||
msg.add_string('fake-sig')
|
msg.add_string('fake-sig')
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg)
|
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg)
|
||||||
H = b('03079780F3D3AD0B3C6DB30C8D21685F367A86D2')
|
H = b'03079780F3D3AD0B3C6DB30C8D21685F367A86D2'
|
||||||
self.assertEqual(self.K, transport._K)
|
self.assertEqual(self.K, transport._K)
|
||||||
self.assertEqual(H, hexlify(transport._H).upper())
|
self.assertEqual(H, hexlify(transport._H).upper())
|
||||||
self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify)
|
self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify)
|
||||||
self.assertTrue(transport._activated)
|
self.assertTrue(transport._activated)
|
||||||
|
|
||||||
def test_2_group1_server(self):
|
def test_2_group1_server(self):
|
||||||
|
@ -119,8 +119,8 @@ class KexTest (unittest.TestCase):
|
||||||
msg.add_mpint(69)
|
msg.add_mpint(69)
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg)
|
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg)
|
||||||
H = b('B16BF34DD10945EDE84E9C1EF24A14BFDC843389')
|
H = b'B16BF34DD10945EDE84E9C1EF24A14BFDC843389'
|
||||||
x = b('1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967')
|
x = b'1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
|
||||||
self.assertEqual(self.K, transport._K)
|
self.assertEqual(self.K, transport._K)
|
||||||
self.assertEqual(H, hexlify(transport._H).upper())
|
self.assertEqual(H, hexlify(transport._H).upper())
|
||||||
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
||||||
|
@ -131,7 +131,7 @@ class KexTest (unittest.TestCase):
|
||||||
transport.server_mode = False
|
transport.server_mode = False
|
||||||
kex = KexGex(transport)
|
kex = KexGex(transport)
|
||||||
kex.start_kex()
|
kex.start_kex()
|
||||||
x = b('22000004000000080000002000')
|
x = b'22000004000000080000002000'
|
||||||
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
||||||
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
|
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
|
||||||
|
|
||||||
|
@ -140,7 +140,7 @@ class KexTest (unittest.TestCase):
|
||||||
msg.add_mpint(FakeModulusPack.G)
|
msg.add_mpint(FakeModulusPack.G)
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
|
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
|
||||||
x = b('20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4')
|
x = b'20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
|
||||||
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
||||||
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
|
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
|
||||||
|
|
||||||
|
@ -150,10 +150,10 @@ class KexTest (unittest.TestCase):
|
||||||
msg.add_string('fake-sig')
|
msg.add_string('fake-sig')
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
|
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
|
||||||
H = b('A265563F2FA87F1A89BF007EE90D58BE2E4A4BD0')
|
H = b'A265563F2FA87F1A89BF007EE90D58BE2E4A4BD0'
|
||||||
self.assertEqual(self.K, transport._K)
|
self.assertEqual(self.K, transport._K)
|
||||||
self.assertEqual(H, hexlify(transport._H).upper())
|
self.assertEqual(H, hexlify(transport._H).upper())
|
||||||
self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify)
|
self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify)
|
||||||
self.assertTrue(transport._activated)
|
self.assertTrue(transport._activated)
|
||||||
|
|
||||||
def test_4_gex_old_client(self):
|
def test_4_gex_old_client(self):
|
||||||
|
@ -161,7 +161,7 @@ class KexTest (unittest.TestCase):
|
||||||
transport.server_mode = False
|
transport.server_mode = False
|
||||||
kex = KexGex(transport)
|
kex = KexGex(transport)
|
||||||
kex.start_kex(_test_old_style=True)
|
kex.start_kex(_test_old_style=True)
|
||||||
x = b('1E00000800')
|
x = b'1E00000800'
|
||||||
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
||||||
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
|
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
|
||||||
|
|
||||||
|
@ -170,7 +170,7 @@ class KexTest (unittest.TestCase):
|
||||||
msg.add_mpint(FakeModulusPack.G)
|
msg.add_mpint(FakeModulusPack.G)
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
|
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
|
||||||
x = b('20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4')
|
x = b'20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
|
||||||
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
||||||
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
|
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
|
||||||
|
|
||||||
|
@ -180,10 +180,10 @@ class KexTest (unittest.TestCase):
|
||||||
msg.add_string('fake-sig')
|
msg.add_string('fake-sig')
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
|
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
|
||||||
H = b('807F87B269EF7AC5EC7E75676808776A27D5864C')
|
H = b'807F87B269EF7AC5EC7E75676808776A27D5864C'
|
||||||
self.assertEqual(self.K, transport._K)
|
self.assertEqual(self.K, transport._K)
|
||||||
self.assertEqual(H, hexlify(transport._H).upper())
|
self.assertEqual(H, hexlify(transport._H).upper())
|
||||||
self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify)
|
self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify)
|
||||||
self.assertTrue(transport._activated)
|
self.assertTrue(transport._activated)
|
||||||
|
|
||||||
def test_5_gex_server(self):
|
def test_5_gex_server(self):
|
||||||
|
@ -199,7 +199,7 @@ class KexTest (unittest.TestCase):
|
||||||
msg.add_int(4096)
|
msg.add_int(4096)
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
|
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
|
||||||
x = b('1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102')
|
x = b'1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102'
|
||||||
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
||||||
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
|
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
|
||||||
|
|
||||||
|
@ -208,8 +208,8 @@ class KexTest (unittest.TestCase):
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
|
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
|
||||||
K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
|
K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
|
||||||
H = b('CE754197C21BF3452863B4F44D0B3951F12516EF')
|
H = b'CE754197C21BF3452863B4F44D0B3951F12516EF'
|
||||||
x = b('210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967')
|
x = b'210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
|
||||||
self.assertEqual(K, transport._K)
|
self.assertEqual(K, transport._K)
|
||||||
self.assertEqual(H, hexlify(transport._H).upper())
|
self.assertEqual(H, hexlify(transport._H).upper())
|
||||||
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
||||||
|
@ -226,7 +226,7 @@ class KexTest (unittest.TestCase):
|
||||||
msg.add_int(2048)
|
msg.add_int(2048)
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
|
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
|
||||||
x = b('1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102')
|
x = b'1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102'
|
||||||
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
||||||
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
|
self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
|
||||||
|
|
||||||
|
@ -235,8 +235,8 @@ class KexTest (unittest.TestCase):
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
|
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
|
||||||
K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
|
K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
|
||||||
H = b('B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B')
|
H = b'B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B'
|
||||||
x = b('210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967')
|
x = b'210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
|
||||||
self.assertEqual(K, transport._K)
|
self.assertEqual(K, transport._K)
|
||||||
self.assertEqual(H, hexlify(transport._H).upper())
|
self.assertEqual(H, hexlify(transport._H).upper())
|
||||||
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
|
||||||
|
|
|
@ -21,17 +21,16 @@ Some unit tests for ssh protocol message blocks.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from binascii import unhexlify
|
|
||||||
from paramiko.message import Message
|
from paramiko.message import Message
|
||||||
from paramiko.common import *
|
from paramiko.common import *
|
||||||
|
|
||||||
|
|
||||||
class MessageTest (unittest.TestCase):
|
class MessageTest (unittest.TestCase):
|
||||||
|
|
||||||
__a = unhexlify(b('000000170760e09000000001710000000568656c6c6f000003e8')) + (b('x') * 1000)
|
__a = b'\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01\x71\x00\x00\x00\x05\x68\x65\x6c\x6c\x6f\x00\x00\x03\xe8' + b'x' * 1000
|
||||||
__b = unhexlify(b('0100f3003f00000010687565792c64657765792c6c6f756965'))
|
__b = b'\x01\x00\xf3\x00\x3f\x00\x00\x00\x10\x68\x75\x65\x79\x2c\x64\x65\x77\x65\x79\x2c\x6c\x6f\x75\x69\x65'
|
||||||
__c = unhexlify(b('00000000000000050000f5e4d3c2b10900000001110000000700f5e4d3c2b109000000069a1b2c3d4ef7'))
|
__c = b'\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7'
|
||||||
__d = unhexlify(b('00000005ff000000051122334455ff0000000a00f00000000000000000010000000363617400000003612c62'))
|
__d = b'\x00\x00\x00\x05\xff\x00\x00\x00\x05\x11\x22\x33\x44\x55\xff\x00\x00\x00\x0a\x00\xf0\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x63\x61\x74\x00\x00\x00\x03\x61\x2c\x62'
|
||||||
|
|
||||||
def test_1_encode(self):
|
def test_1_encode(self):
|
||||||
msg = Message()
|
msg = Message()
|
||||||
|
|
|
@ -21,7 +21,6 @@ Some unit tests for the ssh2 protocol in Transport.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from binascii import unhexlify
|
|
||||||
from tests.loop import LoopSocket
|
from tests.loop import LoopSocket
|
||||||
from Crypto.Cipher import AES
|
from Crypto.Cipher import AES
|
||||||
from Crypto.Hash import SHA, HMAC
|
from Crypto.Hash import SHA, HMAC
|
||||||
|
@ -55,7 +54,7 @@ class PacketizerTest (unittest.TestCase):
|
||||||
data = rsock.recv(100)
|
data = rsock.recv(100)
|
||||||
# 32 + 12 bytes of MAC = 44
|
# 32 + 12 bytes of MAC = 44
|
||||||
self.assertEqual(44, len(data))
|
self.assertEqual(44, len(data))
|
||||||
self.assertEqual(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c0')), data[:16])
|
self.assertEqual(b'\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0', data[:16])
|
||||||
|
|
||||||
def test_2_read (self):
|
def test_2_read (self):
|
||||||
rsock = LoopSocket()
|
rsock = LoopSocket()
|
||||||
|
@ -66,7 +65,7 @@ class PacketizerTest (unittest.TestCase):
|
||||||
p.set_hexdump(True)
|
p.set_hexdump(True)
|
||||||
cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16)
|
cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16)
|
||||||
p.set_inbound_cipher(cipher, 16, SHA, 12, x1f * 20)
|
p.set_inbound_cipher(cipher, 16, SHA, 12, x1f * 20)
|
||||||
wsock.send(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c090d216560d717361387c4c3dfb977de26e03b1a0c21cd641414cb459')))
|
wsock.send(b'\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59')
|
||||||
cmd, m = p.read_message()
|
cmd, m = p.read_message()
|
||||||
self.assertEqual(100, cmd)
|
self.assertEqual(100, cmd)
|
||||||
self.assertEqual(100, m.get_int())
|
self.assertEqual(100, m.get_int())
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
Some unit tests for public/private key objects.
|
Some unit tests for public/private key objects.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from binascii import hexlify, unhexlify
|
from binascii import hexlify
|
||||||
import unittest
|
import unittest
|
||||||
from paramiko import RSAKey, DSSKey, ECDSAKey, Message, util
|
from paramiko import RSAKey, DSSKey, ECDSAKey, Message, util
|
||||||
from paramiko.common import rng, StringIO, byte_chr, b, bytes
|
from paramiko.common import rng, StringIO, byte_chr, b, bytes
|
||||||
|
@ -77,7 +77,7 @@ ADRvOqQ5R98Sxst765CAqXmRtz8vwoD96g==
|
||||||
-----END EC PRIVATE KEY-----
|
-----END EC PRIVATE KEY-----
|
||||||
"""
|
"""
|
||||||
|
|
||||||
x1234 = unhexlify(b('01020304'))
|
x1234 = b'\x01\x02\x03\x04'
|
||||||
|
|
||||||
|
|
||||||
class KeyTest (unittest.TestCase):
|
class KeyTest (unittest.TestCase):
|
||||||
|
@ -91,7 +91,7 @@ class KeyTest (unittest.TestCase):
|
||||||
def test_1_generate_key_bytes(self):
|
def test_1_generate_key_bytes(self):
|
||||||
from Crypto.Hash import MD5
|
from Crypto.Hash import MD5
|
||||||
key = util.generate_key_bytes(MD5, x1234, 'happy birthday', 30)
|
key = util.generate_key_bytes(MD5, x1234, 'happy birthday', 30)
|
||||||
exp = unhexlify(b('61E1F272F4C1C4561586BD322498C0E924672780F47BB37DDA7D54019E64'))
|
exp = b'\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64'
|
||||||
self.assertEqual(exp, key)
|
self.assertEqual(exp, key)
|
||||||
|
|
||||||
def test_2_load_rsa(self):
|
def test_2_load_rsa(self):
|
||||||
|
@ -165,7 +165,7 @@ class KeyTest (unittest.TestCase):
|
||||||
def test_8_sign_rsa(self):
|
def test_8_sign_rsa(self):
|
||||||
# verify that the rsa private key can sign and verify
|
# verify that the rsa private key can sign and verify
|
||||||
key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
|
||||||
msg = key.sign_ssh_data(rng, b('ice weasels'))
|
msg = key.sign_ssh_data(rng, b'ice weasels')
|
||||||
self.assertTrue(type(msg) is Message)
|
self.assertTrue(type(msg) is Message)
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
self.assertEqual('ssh-rsa', msg.get_text())
|
self.assertEqual('ssh-rsa', msg.get_text())
|
||||||
|
@ -173,12 +173,12 @@ class KeyTest (unittest.TestCase):
|
||||||
self.assertEqual(sig, msg.get_binary())
|
self.assertEqual(sig, msg.get_binary())
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
pub = RSAKey(data=key.asbytes())
|
pub = RSAKey(data=key.asbytes())
|
||||||
self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg))
|
self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg))
|
||||||
|
|
||||||
def test_9_sign_dss(self):
|
def test_9_sign_dss(self):
|
||||||
# verify that the dss private key can sign and verify
|
# verify that the dss private key can sign and verify
|
||||||
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
|
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
|
||||||
msg = key.sign_ssh_data(rng, b('ice weasels'))
|
msg = key.sign_ssh_data(rng, b'ice weasels')
|
||||||
self.assertTrue(type(msg) is Message)
|
self.assertTrue(type(msg) is Message)
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
self.assertEqual('ssh-dss', msg.get_text())
|
self.assertEqual('ssh-dss', msg.get_text())
|
||||||
|
@ -188,19 +188,19 @@ class KeyTest (unittest.TestCase):
|
||||||
self.assertEqual(40, len(msg.get_binary()))
|
self.assertEqual(40, len(msg.get_binary()))
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
pub = DSSKey(data=key.asbytes())
|
pub = DSSKey(data=key.asbytes())
|
||||||
self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg))
|
self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg))
|
||||||
|
|
||||||
def test_A_generate_rsa(self):
|
def test_A_generate_rsa(self):
|
||||||
key = RSAKey.generate(1024)
|
key = RSAKey.generate(1024)
|
||||||
msg = key.sign_ssh_data(rng, b('jerri blank'))
|
msg = key.sign_ssh_data(rng, b'jerri blank')
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
self.assertTrue(key.verify_ssh_sig(b('jerri blank'), msg))
|
self.assertTrue(key.verify_ssh_sig(b'jerri blank', msg))
|
||||||
|
|
||||||
def test_B_generate_dss(self):
|
def test_B_generate_dss(self):
|
||||||
key = DSSKey.generate(1024)
|
key = DSSKey.generate(1024)
|
||||||
msg = key.sign_ssh_data(rng, b('jerri blank'))
|
msg = key.sign_ssh_data(rng, b'jerri blank')
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
self.assertTrue(key.verify_ssh_sig(b('jerri blank'), msg))
|
self.assertTrue(key.verify_ssh_sig(b'jerri blank', msg))
|
||||||
|
|
||||||
def test_10_load_ecdsa(self):
|
def test_10_load_ecdsa(self):
|
||||||
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
|
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
|
||||||
|
@ -219,7 +219,7 @@ class KeyTest (unittest.TestCase):
|
||||||
self.assertEqual(key, key2)
|
self.assertEqual(key, key2)
|
||||||
|
|
||||||
def test_11_load_ecdsa_password(self):
|
def test_11_load_ecdsa_password(self):
|
||||||
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password.key'), b('television'))
|
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password.key'), b'television')
|
||||||
self.assertEqual('ecdsa-sha2-nistp256', key.get_name())
|
self.assertEqual('ecdsa-sha2-nistp256', key.get_name())
|
||||||
exp_ecdsa = b(FINGER_ECDSA.split()[1].replace(':', ''))
|
exp_ecdsa = b(FINGER_ECDSA.split()[1].replace(':', ''))
|
||||||
my_ecdsa = hexlify(key.get_fingerprint())
|
my_ecdsa = hexlify(key.get_fingerprint())
|
||||||
|
@ -239,7 +239,7 @@ class KeyTest (unittest.TestCase):
|
||||||
def test_13_sign_ecdsa(self):
|
def test_13_sign_ecdsa(self):
|
||||||
# verify that the rsa private key can sign and verify
|
# verify that the rsa private key can sign and verify
|
||||||
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
|
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
|
||||||
msg = key.sign_ssh_data(rng, b('ice weasels'))
|
msg = key.sign_ssh_data(rng, b'ice weasels')
|
||||||
self.assertTrue(type(msg) is Message)
|
self.assertTrue(type(msg) is Message)
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
self.assertEqual('ecdsa-sha2-nistp256', msg.get_text())
|
self.assertEqual('ecdsa-sha2-nistp256', msg.get_text())
|
||||||
|
@ -250,4 +250,4 @@ class KeyTest (unittest.TestCase):
|
||||||
|
|
||||||
msg.rewind()
|
msg.rewind()
|
||||||
pub = ECDSAKey(data=key.asbytes())
|
pub = ECDSAKey(data=key.asbytes())
|
||||||
self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg))
|
self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg))
|
||||||
|
|
|
@ -549,7 +549,7 @@ class SFTPTest (unittest.TestCase):
|
||||||
|
|
||||||
fd, localname = mkstemp()
|
fd, localname = mkstemp()
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
text = b('All I wanted was a plastic bunny rabbit.\n')
|
text = b'All I wanted was a plastic bunny rabbit.\n'
|
||||||
with open(localname, 'wb') as f:
|
with open(localname, 'wb') as f:
|
||||||
f.write(text)
|
f.write(text)
|
||||||
saved_progress = []
|
saved_progress = []
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
Some unit tests for the ssh2 protocol in Transport.
|
Some unit tests for the ssh2 protocol in Transport.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from binascii import hexlify, unhexlify
|
from binascii import hexlify
|
||||||
import select
|
import select
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
|
@ -159,10 +159,10 @@ class TransportTest(ParamikoTest):
|
||||||
|
|
||||||
def test_2_compute_key(self):
|
def test_2_compute_key(self):
|
||||||
self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
|
self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
|
||||||
self.tc.H = unhexlify(b('0C8307CDE6856FF30BA93684EB0F04C2520E9ED3'))
|
self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3'
|
||||||
self.tc.session_id = self.tc.H
|
self.tc.session_id = self.tc.H
|
||||||
key = self.tc._compute_key('C', 32)
|
key = self.tc._compute_key('C', 32)
|
||||||
self.assertEqual(b('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995'),
|
self.assertEqual(b'207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995',
|
||||||
hexlify(key).upper())
|
hexlify(key).upper())
|
||||||
|
|
||||||
def test_3_simple(self):
|
def test_3_simple(self):
|
||||||
|
@ -361,7 +361,7 @@ class TransportTest(ParamikoTest):
|
||||||
self.assertEqual([], w)
|
self.assertEqual([], w)
|
||||||
self.assertEqual([], e)
|
self.assertEqual([], e)
|
||||||
|
|
||||||
self.assertEqual(b('hello\n'), chan.recv(6))
|
self.assertEqual(b'hello\n', chan.recv(6))
|
||||||
|
|
||||||
# and, should be dead again now
|
# and, should be dead again now
|
||||||
r, w, e = select.select([chan], [], [], 0.1)
|
r, w, e = select.select([chan], [], [], 0.1)
|
||||||
|
@ -462,7 +462,7 @@ class TransportTest(ParamikoTest):
|
||||||
self.assertEqual(6093, requested[0][1])
|
self.assertEqual(6093, requested[0][1])
|
||||||
|
|
||||||
x11_server.send('hello')
|
x11_server.send('hello')
|
||||||
self.assertEqual(b('hello'), x11_client.recv(5))
|
self.assertEqual(b'hello', x11_client.recv(5))
|
||||||
|
|
||||||
x11_server.close()
|
x11_server.close()
|
||||||
x11_client.close()
|
x11_client.close()
|
||||||
|
@ -495,7 +495,7 @@ class TransportTest(ParamikoTest):
|
||||||
cch = self.tc.accept()
|
cch = self.tc.accept()
|
||||||
|
|
||||||
sch.send('hello')
|
sch.send('hello')
|
||||||
self.assertEqual(b('hello'), cch.recv(5))
|
self.assertEqual(b'hello', cch.recv(5))
|
||||||
sch.close()
|
sch.close()
|
||||||
cch.close()
|
cch.close()
|
||||||
ss.close()
|
ss.close()
|
||||||
|
@ -527,12 +527,12 @@ class TransportTest(ParamikoTest):
|
||||||
cch.connect(self.server._tcpip_dest)
|
cch.connect(self.server._tcpip_dest)
|
||||||
|
|
||||||
ss, _ = greeting_server.accept()
|
ss, _ = greeting_server.accept()
|
||||||
ss.send(b('Hello!\n'))
|
ss.send(b'Hello!\n')
|
||||||
ss.close()
|
ss.close()
|
||||||
sch.send(cch.recv(8192))
|
sch.send(cch.recv(8192))
|
||||||
sch.close()
|
sch.close()
|
||||||
|
|
||||||
self.assertEqual(b('Hello!\n'), cs.recv(7))
|
self.assertEqual(b'Hello!\n', cs.recv(7))
|
||||||
cs.close()
|
cs.close()
|
||||||
|
|
||||||
def test_G_stderr_select(self):
|
def test_G_stderr_select(self):
|
||||||
|
@ -563,7 +563,7 @@ class TransportTest(ParamikoTest):
|
||||||
self.assertEqual([], w)
|
self.assertEqual([], w)
|
||||||
self.assertEqual([], e)
|
self.assertEqual([], e)
|
||||||
|
|
||||||
self.assertEqual(b('hello\n'), chan.recv_stderr(6))
|
self.assertEqual(b'hello\n', chan.recv_stderr(6))
|
||||||
|
|
||||||
# and, should be dead again now
|
# and, should be dead again now
|
||||||
r, w, e = select.select([chan], [], [], 0.1)
|
r, w, e = select.select([chan], [], [], 0.1)
|
||||||
|
|
|
@ -137,7 +137,7 @@ class UtilTest(ParamikoTest):
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_4_generate_key_bytes(self):
|
def test_4_generate_key_bytes(self):
|
||||||
x = paramiko.util.generate_key_bytes(SHA, b('ABCDEFGH'), 'This is my secret passphrase.', 64)
|
x = paramiko.util.generate_key_bytes(SHA, b'ABCDEFGH', 'This is my secret passphrase.', 64)
|
||||||
hex = ''.join(['%02x' % byte_ord(c) for c in x])
|
hex = ''.join(['%02x' % byte_ord(c) for c in x])
|
||||||
self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
|
self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
|
||||||
|
|
||||||
|
@ -150,7 +150,7 @@ class UtilTest(ParamikoTest):
|
||||||
self.assertEqual(1, len(list(hostdict.values())[0]))
|
self.assertEqual(1, len(list(hostdict.values())[0]))
|
||||||
self.assertEqual(1, len(list(hostdict.values())[1]))
|
self.assertEqual(1, len(list(hostdict.values())[1]))
|
||||||
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
|
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
|
||||||
self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
|
self.assertEqual(b'E6684DB30E109B67B70FF1DC5C7F1363', fp)
|
||||||
finally:
|
finally:
|
||||||
os.unlink('hostfile.temp')
|
os.unlink('hostfile.temp')
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue