Introduce ECDSA
This just adds tests; hooking this up with paramiko comes in the next commit.
This commit is contained in:
		
							parent
							
								
									0392e3df8f
								
							
						
					
					
						commit
						632129c427
					
				|  | @ -69,6 +69,7 @@ from ssh_exception import SSHException, PasswordRequiredException, \ | |||
| from server import ServerInterface, SubsystemHandler, InteractiveQuery | ||||
| from rsakey import RSAKey | ||||
| from dsskey import DSSKey | ||||
| from ecdsakey import ECDSAKey | ||||
| from sftp import SFTPError, BaseSFTP | ||||
| from sftp_client import SFTP, SFTPClient | ||||
| from sftp_server import SFTPServer | ||||
|  |  | |||
|  | @ -0,0 +1,181 @@ | |||
| # Copyright (C) 2003-2007  Robey Pointer <robeypointer@gmail.com> | ||||
| # | ||||
| # This file is part of paramiko. | ||||
| # | ||||
| # Paramiko is free software; you can redistribute it and/or modify it under the | ||||
| # terms of the GNU Lesser General Public License as published by the Free | ||||
| # Software Foundation; either version 2.1 of the License, or (at your option) | ||||
| # any later version. | ||||
| # | ||||
| # Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY | ||||
| # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR | ||||
| # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more | ||||
| # details. | ||||
| # | ||||
| # You should have received a copy of the GNU Lesser General Public License | ||||
| # along with Paramiko; if not, write to the Free Software Foundation, Inc., | ||||
| # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. | ||||
| 
 | ||||
| """ | ||||
| L{ECDSAKey} | ||||
| """ | ||||
| 
 | ||||
| import binascii | ||||
| from ecdsa import SigningKey, VerifyingKey, der, curves | ||||
| from ecdsa.util import number_to_string, sigencode_string, sigencode_strings, sigdecode_strings | ||||
| from Crypto.Hash import SHA256, MD5 | ||||
| from Crypto.Cipher import DES3 | ||||
| 
 | ||||
| from paramiko.common import * | ||||
| from paramiko import util | ||||
| from paramiko.message import Message | ||||
| from paramiko.ber import BER, BERException | ||||
| from paramiko.pkey import PKey | ||||
| from paramiko.ssh_exception import SSHException | ||||
| 
 | ||||
| 
 | ||||
| class ECDSAKey (PKey): | ||||
|     """ | ||||
|     Representation of an ECDSA key which can be used to sign and verify SSH2 | ||||
|     data. | ||||
|     """ | ||||
| 
 | ||||
|     def __init__(self, msg=None, data=None, filename=None, password=None, vals=None, file_obj=None): | ||||
|         self.verifying_key = None | ||||
|         self.signing_key = None | ||||
|         if file_obj is not None: | ||||
|             self._from_private_key(file_obj, password) | ||||
|             return | ||||
|         if filename is not None: | ||||
|             self._from_private_key_file(filename, password) | ||||
|             return | ||||
|         if (msg is None) and (data is not None): | ||||
|             msg = Message(data) | ||||
|         if vals is not None: | ||||
|             self.verifying_key, self.signing_key = vals | ||||
|         else: | ||||
|             if msg is None: | ||||
|                 raise SSHException('Key object may not be empty') | ||||
|             if msg.get_string() != 'ecdsa-sha2-nistp256': | ||||
|                 raise SSHException('Invalid key') | ||||
|             curvename = msg.get_string() | ||||
|             if curvename != 'nistp256': | ||||
|                 raise SSHException("Can't handle curve of type %s" % curvename) | ||||
| 
 | ||||
|             pointinfo = msg.get_string() | ||||
|             if pointinfo[0] != "\x04": | ||||
|                 raise SSHException('Point compression is being used: %s'% | ||||
|                                    binascii.hexlify(pointinfo)) | ||||
|             self.verifying_key = VerifyingKey.from_string(pointinfo[1:], | ||||
|                 curve=curves.NIST256p) | ||||
|         self.size = 256 | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         key = self.verifying_key | ||||
|         m = Message() | ||||
|         m.add_string('ecdsa-sha2-nistp256') | ||||
|         m.add_string('nistp256') | ||||
| 
 | ||||
|         point_str = "\x04" + key.to_string() | ||||
| 
 | ||||
|         m.add_string(point_str) | ||||
|         return str(m) | ||||
| 
 | ||||
|     def __hash__(self): | ||||
|         h = hash(self.get_name()) | ||||
|         h = h * 37 + hash(self.verifying_key.pubkey.point.x()) | ||||
|         h = h * 37 + hash(self.verifying_key.pubkey.point.y()) | ||||
|         return hash(h) | ||||
| 
 | ||||
|     def get_name(self): | ||||
|         return 'ecdsa-sha2-nistp256' | ||||
| 
 | ||||
|     def get_bits(self): | ||||
|         return self.size | ||||
| 
 | ||||
|     def can_sign(self): | ||||
|         return self.signing_key is not None | ||||
| 
 | ||||
|     def sign_ssh_data(self, rpool, data): | ||||
|         digest = SHA256.new(data).digest() | ||||
|         sig = self.signing_key.sign_digest(digest, entropy=rpool.read, | ||||
|                                            sigencode=self._sigencode) | ||||
|         m = Message() | ||||
|         m.add_string('ecdsa-sha2-nistp256') | ||||
|         m.add_string(sig) | ||||
|         return m | ||||
| 
 | ||||
|     def verify_ssh_sig(self, data, msg): | ||||
|         if msg.get_string() != 'ecdsa-sha2-nistp256': | ||||
|             return False | ||||
|         sig = msg.get_string() | ||||
| 
 | ||||
|         # verify the signature by SHA'ing the data and encrypting it | ||||
|         # using the public key. | ||||
|         hash_obj = SHA256.new(data).digest() | ||||
|         return self.verifying_key.verify_digest(sig, hash_obj, | ||||
|                                                 sigdecode=self._sigdecode) | ||||
| 
 | ||||
|     def write_private_key_file(self, filename, password=None): | ||||
|         key = self.signing_key or self.verifying_key | ||||
|         self._write_private_key_file('EC', filename, key.to_der(), password) | ||||
| 
 | ||||
|     def write_private_key(self, file_obj, password=None): | ||||
|         key = self.signing_key or self.verifying_key | ||||
|         self._write_private_key('EC', file_obj, key.to_der(), password) | ||||
| 
 | ||||
|     def generate(bits, progress_func=None): | ||||
|         """ | ||||
|         Generate a new private RSA key.  This factory function can be used to | ||||
|         generate a new host key or authentication key. | ||||
| 
 | ||||
|         @param bits: number of bits the generated key should be. | ||||
|         @type bits: int | ||||
|         @param progress_func: an optional function to call at key points in | ||||
|             key generation (used by C{pyCrypto.PublicKey}). | ||||
|         @type progress_func: function | ||||
|         @return: new private key | ||||
|         @rtype: L{RSAKey} | ||||
|         """ | ||||
|         signing_key = ECDSA.generate() | ||||
|         key = ECDSAKey(vals=(signing_key, signing_key.get_verifying_key())) | ||||
|         return key | ||||
|     generate = staticmethod(generate) | ||||
| 
 | ||||
| 
 | ||||
|     ###  internals... | ||||
| 
 | ||||
| 
 | ||||
|     def _from_private_key_file(self, filename, password): | ||||
|         data = self._read_private_key_file('EC', filename, password) | ||||
|         self._decode_key(data) | ||||
| 
 | ||||
|     def _from_private_key(self, file_obj, password): | ||||
|         data = self._read_private_key('EC', file_obj, password) | ||||
|         self._decode_key(data) | ||||
| 
 | ||||
|     ALLOWED_PADDINGS = ['\x01', '\x02\x02', '\x03\x03\x03', '\x04\x04\x04\x04', | ||||
|                         '\x05\x05\x05\x05\x05', '\x06\x06\x06\x06\x06\x06', | ||||
|                         '\x07\x07\x07\x07\x07\x07\x07'] | ||||
|     def _decode_key(self, data): | ||||
|         s, padding = der.remove_sequence(data) | ||||
|         if padding: | ||||
|             if padding not in self.ALLOWED_PADDINGS: | ||||
|                 raise ValueError, "weird padding: %s" % (binascii.hexlify(empty)) | ||||
|             data = data[:-len(padding)] | ||||
|         key = SigningKey.from_der(data) | ||||
|         self.signing_key = key | ||||
|         self.verifying_key = key.get_verifying_key() | ||||
|         self.size = 256 | ||||
| 
 | ||||
|     def _sigencode(self, r, s, order): | ||||
|         msg = Message() | ||||
|         msg.add_mpint(r) | ||||
|         msg.add_mpint(s) | ||||
|         return str(msg) | ||||
| 
 | ||||
|     def _sigdecode(self, sig, order): | ||||
|         msg = Message(sig) | ||||
|         r = msg.get_mpint() | ||||
|         s = msg.get_mpint() | ||||
|         return (r, s) | ||||
|  | @ -1,2 +1,3 @@ | |||
| pycrypto | ||||
| tox | ||||
| ecdsa | ||||
|  |  | |||
							
								
								
									
										4
									
								
								setup.py
								
								
								
								
							
							
						
						
									
										4
									
								
								setup.py
								
								
								
								
							|  | @ -40,7 +40,9 @@ import sys | |||
| try: | ||||
|     from setuptools import setup | ||||
|     kw = { | ||||
|         'install_requires': 'pycrypto >= 2.1, != 2.4', | ||||
|         'install_requires': ['pycrypto >= 2.1, != 2.4', | ||||
|                              'ecdsa', | ||||
|                              ], | ||||
|     } | ||||
| except ImportError: | ||||
|     from distutils.core import setup | ||||
|  |  | |||
|  | @ -0,0 +1,5 @@ | |||
| -----BEGIN EC PRIVATE KEY----- | ||||
| MHcCAQEEIKB6ty3yVyKEnfF/zprx0qwC76MsMlHY4HXCnqho2eKioAoGCCqGSM49 | ||||
| AwEHoUQDQgAElI9mbdlaS+T9nHxY/59lFnn80EEecZDBHq4gLpccY8Mge5ZTMiMD | ||||
| ADRvOqQ5R98Sxst765CAqXmRtz8vwoD96g== | ||||
| -----END EC PRIVATE KEY----- | ||||
|  | @ -0,0 +1,8 @@ | |||
| -----BEGIN EC PRIVATE KEY----- | ||||
| Proc-Type: 4,ENCRYPTED | ||||
| DEK-Info: AES-128-CBC,EEB56BC745EDB2DE04FC3FE1F8DA387E | ||||
| 
 | ||||
| wdt7QTCa6ahTJLaEPH7NhHyBcxhzrzf93d4UwQOuAhkM6//jKD4lF9fErHBW0f3B | ||||
| ExberCU3UxfEF3xX2thXiLw47JgeOCeQUlqRFx92p36k6YmfNGX6W8CsZ3d+XodF | ||||
| Z+pb6m285CiSX+W95NenFMexXFsIpntiCvTifTKJ8os= | ||||
| -----END EC PRIVATE KEY----- | ||||
|  | @ -23,14 +23,17 @@ Some unit tests for public/private key objects. | |||
| from binascii import hexlify, unhexlify | ||||
| import StringIO | ||||
| import unittest | ||||
| from paramiko import RSAKey, DSSKey, Message, util | ||||
| from paramiko import RSAKey, DSSKey, ECDSAKey, Message, util | ||||
| from paramiko.common import rng | ||||
| 
 | ||||
| # from openssh's ssh-keygen | ||||
| PUB_RSA = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=' | ||||
| PUB_DSS = 'ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE=' | ||||
| PUB_ECDSA = 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo=' | ||||
| 
 | ||||
| FINGER_RSA = '1024 60:73:38:44:cb:51:86:65:7f:de:da:a2:2b:5a:57:d5' | ||||
| FINGER_DSS = '1024 44:78:f0:b9:a2:3c:c5:18:20:09:ff:75:5b:c1:d2:6c' | ||||
| FINGER_ECDSA = '256 25:19:eb:55:e6:a1:47:ff:4f:38:d2:75:6f:a5:d5:60' | ||||
| SIGNED_RSA = '20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8' | ||||
| 
 | ||||
| RSA_PRIVATE_OUT = """\ | ||||
|  | @ -66,6 +69,13 @@ QPSch9pT9XHqn+1rZ4bK+QGA | |||
| -----END DSA PRIVATE KEY----- | ||||
| """ | ||||
| 
 | ||||
| ECDSA_PRIVATE_OUT = """\ | ||||
| -----BEGIN EC PRIVATE KEY----- | ||||
| MHcCAQEEIKB6ty3yVyKEnfF/zprx0qwC76MsMlHY4HXCnqho2eKioAoGCCqGSM49 | ||||
| AwEHoUQDQgAElI9mbdlaS+T9nHxY/59lFnn80EEecZDBHq4gLpccY8Mge5ZTMiMD | ||||
| ADRvOqQ5R98Sxst765CAqXmRtz8vwoD96g== | ||||
| -----END EC PRIVATE KEY----- | ||||
| """ | ||||
| 
 | ||||
| class KeyTest (unittest.TestCase): | ||||
| 
 | ||||
|  | @ -176,7 +186,7 @@ class KeyTest (unittest.TestCase): | |||
|         msg.rewind() | ||||
|         pub = DSSKey(data=str(key)) | ||||
|         self.assert_(pub.verify_ssh_sig('ice weasels', msg)) | ||||
|      | ||||
| 
 | ||||
|     def test_A_generate_rsa(self): | ||||
|         key = RSAKey.generate(1024) | ||||
|         msg = key.sign_ssh_data(rng, 'jerri blank') | ||||
|  | @ -188,3 +198,53 @@ class KeyTest (unittest.TestCase): | |||
|         msg = key.sign_ssh_data(rng, 'jerri blank') | ||||
|         msg.rewind() | ||||
|         self.assert_(key.verify_ssh_sig('jerri blank', msg)) | ||||
| 
 | ||||
|     def test_10_load_ecdsa(self): | ||||
|         key = ECDSAKey.from_private_key_file('tests/test_ecdsa.key') | ||||
|         self.assertEquals('ecdsa-sha2-nistp256', key.get_name()) | ||||
|         exp_ecdsa = FINGER_ECDSA.split()[1].replace(':', '') | ||||
|         my_ecdsa = hexlify(key.get_fingerprint()) | ||||
|         self.assertEquals(exp_ecdsa, my_ecdsa) | ||||
|         self.assertEquals(PUB_ECDSA.split()[1], key.get_base64()) | ||||
|         self.assertEquals(256, key.get_bits()) | ||||
| 
 | ||||
|         s = StringIO.StringIO() | ||||
|         key.write_private_key(s) | ||||
|         self.assertEquals(ECDSA_PRIVATE_OUT, s.getvalue()) | ||||
|         s.seek(0) | ||||
|         key2 = ECDSAKey.from_private_key(s) | ||||
|         self.assertEquals(key, key2) | ||||
| 
 | ||||
|     def test_11_load_ecdsa_password(self): | ||||
|         key = ECDSAKey.from_private_key_file('tests/test_ecdsa_password.key', 'television') | ||||
|         self.assertEquals('ecdsa-sha2-nistp256', key.get_name()) | ||||
|         exp_ecdsa = FINGER_ECDSA.split()[1].replace(':', '') | ||||
|         my_ecdsa = hexlify(key.get_fingerprint()) | ||||
|         self.assertEquals(exp_ecdsa, my_ecdsa) | ||||
|         self.assertEquals(PUB_ECDSA.split()[1], key.get_base64()) | ||||
|         self.assertEquals(256, key.get_bits()) | ||||
| 
 | ||||
|     def test_12_compare_ecdsa(self): | ||||
|         # verify that the private & public keys compare equal | ||||
|         key = ECDSAKey.from_private_key_file('tests/test_ecdsa.key') | ||||
|         self.assertEquals(key, key) | ||||
|         pub = ECDSAKey(data=str(key)) | ||||
|         self.assert_(key.can_sign()) | ||||
|         self.assert_(not pub.can_sign()) | ||||
|         self.assertEquals(key, pub) | ||||
| 
 | ||||
|     def test_13_sign_ecdsa(self): | ||||
|         # verify that the rsa private key can sign and verify | ||||
|         key = ECDSAKey.from_private_key_file('tests/test_ecdsa.key') | ||||
|         msg = key.sign_ssh_data(rng, 'ice weasels') | ||||
|         self.assert_(type(msg) is Message) | ||||
|         msg.rewind() | ||||
|         self.assertEquals('ecdsa-sha2-nistp256', msg.get_string()) | ||||
|         # ECDSA signatures, like DSS signatures, tend to be different | ||||
|         # each time, so we can't compare against a "known correct" | ||||
|         # signature. | ||||
|         # Even the length of the signature can change. | ||||
| 
 | ||||
|         msg.rewind() | ||||
|         pub = ECDSAKey(data=str(key)) | ||||
|         self.assert_(pub.verify_ssh_sig('ice weasels', msg)) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue