[project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--patch-37]

can now generate rsa keys (not dss yet)
added functionality to ber to create ber streams.  added some common methods
to PKey to allow dumping the key to base64 (the format used by openssh for
public key files and host key lists), and a factory for creating a key from
a private key file, and a common way to save private keys.  RSAKey luckily
didn't have to change that much.

also added a factory method to RSAKey to generate a new key.
This commit is contained in:
Robey Pointer 2004-04-05 10:16:31 +00:00
parent 8fafd1aa17
commit 01bf5477a0
3 changed files with 176 additions and 22 deletions

View File

@ -37,7 +37,7 @@ class BER(object):
return self.content
def __repr__(self):
return 'BER(' + repr(self.content) + ')'
return 'BER(\'' + repr(self.content) + '\')'
def decode(self):
return self.decode_next()
@ -52,10 +52,10 @@ class BER(object):
id = 0
while self.idx < len(self.content):
t = ord(self.content[self.idx])
self.idx += 1
id = (id << 7) | (t & 0x7f)
if not (t & 0x80):
break
id = (id << 7) | (t & 0x7f)
self.idx += 1
if self.idx >= len(self.content):
return None
# now fetch length
@ -67,11 +67,8 @@ class BER(object):
t = size & 0x7f
if self.idx + t > len(self.content):
return None
size = 0
while t > 0:
size = (size << 8) | ord(self.content[self.idx])
self.idx += 1
t -= 1
size = self.inflate_long(self.content[self.idx : self.idx + t], True)
self.idx += t
if self.idx + size > len(self.content):
# can't fit
return None
@ -98,3 +95,34 @@ class BER(object):
out.append(x)
decode_sequence = staticmethod(decode_sequence)
def encode_tlv(self, id, val):
# FIXME: support id > 31 someday
self.content += chr(id)
if len(val) > 0x7f:
lenstr = util.deflate_long(len(val))
self.content += chr(0x80 + len(lenstr)) + lenstr
else:
self.content += chr(len(val))
self.content += val
def encode(self, x):
if type(x) is bool:
if x:
self.encode_tlv(1, '\xff')
else:
self.encode_tlv(1, '\x00')
elif (type(x) is int) or (type(x) is long):
self.encode_tlv(2, util.deflate_long(x))
elif type(x) is str:
self.encode_tlv(4, x)
elif (type(x) is list) or (type(x) is tuple):
self.encode_tlv(30, self.encode_sequence(x))
else:
raise BERException('Unknown type for encoding: %s' % repr(type(x)))
def encode_sequence(data):
b = BER()
for item in data:
b.encode(item)
return str(b)
encode_sequence = staticmethod(encode_sequence)

View File

@ -22,12 +22,16 @@
Common API for all public keys.
"""
import base64
from Crypto.Hash import MD5
from Crypto.Cipher import DES3
from common import *
from message import Message
from ssh_exception import SSHException, PasswordRequiredException
import util
import base64
class PKey (object):
"""
@ -105,6 +109,17 @@ class PKey (object):
"""
return MD5.new(str(self)).digest()
def get_base64(self):
"""
Return a base64 string containing the public part of this key. Nothing
secret is revealed. This format is compatible with that used to store
public key files or recognized host keys.
@return: a base64 string containing the public part of the key.
@rtype: string
"""
return ''.join(base64.encodestring(str(self)).split('\n'))
def sign_ssh_data(self, randpool, data):
"""
Sign a blob of data with this private key, and return a L{Message}
@ -152,7 +167,48 @@ class PKey (object):
encrypted, and C{password} is C{None}.
@raise SSHException: if the key file is invalid.
"""
pass
raise exception('Not implemented in PKey')
def from_private_key_file(cl, filename, password=None):
"""
Create a key object by reading a private key file. This is roughly
equivalent to creating a new key object and then calling
L{read_private_key_file} on it. Through the magic of python, this
factory method will exist in all subclasses of PKey (such as L{RSAKey}
or L{DSSKey}), but is useless on the abstract PKey class.
@param filename: name of the file to read.
@type filename: string
@param password: an optional password to use to decrypt the key file,
if it's encrypted
@type password: string
@return: a new key object based on the given private key.
@rtype: L{PKey}
@raise IOError: if there was an error reading the file.
@raise PasswordRequiredException: if the private key file is
encrypted, and C{password} is C{None}.
@raise SSHException: if the key file is invalid.
"""
key = cl()
key.read_private_key_file(filename, password)
return key
from_private_key_file = classmethod(from_private_key_file)
def write_private_key_file(self, filename, password=None):
"""
Write private key contents into a file. If the password is not
C{None}, the key is encrypted before writing.
@param filename: name of the file to write.
@type filename: string
@param password: an optional password to use to encrypt the key file.
@type password: string
@raise IOError: if there was an error writing the file.
@raise SSHException: if the key is invalid.
"""
raise exception('Not implemented in PKey')
def _read_private_key_file(self, tag, filename, password=None):
"""
@ -222,6 +278,47 @@ class PKey (object):
keysize = self._CIPHER_TABLE[encryption_type]['keysize']
mode = self._CIPHER_TABLE[encryption_type]['mode']
# this confusing line turns something like '2F91' into '/\x91' (sorry, was feeling clever)
salt = ''.join([chr(int(saltstr[i:i+2], 16)) for i in range(0, len(saltstr), 2)])
salt = util.unhexify(saltstr)
key = util.generate_key_bytes(MD5, salt, password, keysize)
return cipher.new(key, mode, salt).decrypt(data)
def _write_private_key_file(self, tag, filename, data, password=None):
"""
Write an SSH2-format private key file in a form that can be read by
paramiko or openssh. If no password is given, the key is written in
a trivially-encoded format (base64) which is completely insecure. If
a password is given, DES-EDE3-CBC is used.
@param tag: C{"RSA"} or C{"DSA"}, the tag used to mark the data block.
@type tag: string
@param filename: name of the file to write.
@type filename: string
@param data: data blob that makes up the private key.
@type data: string
@param password: an optional password to use to encrypt the file.
@type password: string
@raise IOError: if there was an error writing the file.
"""
f = open(filename, 'w', 0600)
f.write('-----BEGIN %s PRIVATE KEY-----\n' % tag)
if password is not None:
# since we only support one cipher here, use it
cipher_name = self._CIPHER_TABLE.keys()[0]
cipher = self._CIPHER_TABLE[cipher_name]['cipher']
keysize = self._CIPHER_TABLE[cipher_name]['keysize']
mode = self._CIPHER_TABLE[cipher_name]['mode']
salt = randpool.get_bytes(8)
key = util.generate_key_bytes(MD5, salt, password, keysize)
data = cipher.new(key, mode, salt).encrypt(data)
f.write('Proc-Type: 4,ENCRYPTED\n')
f.write('DEK-Info: %s,%s\n' % (cipher_name, util.hexify(salt)))
f.write('\n')
s = base64.encodestring(data)
# re-wrap to 64-char lines
s = ''.join(s.split('\n'))
s = '\n'.join([s[i : i+64] for i in range(0, len(s), 64)])
f.write(s)
f.write('\n')
f.write('-----END %s PRIVATE KEY-----\n' % tag)
f.close()

View File

@ -22,12 +22,16 @@
L{RSAKey}
"""
from message import Message
import base64
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA, MD5
from Crypto.Cipher import DES3
from common import *
from message import Message
from ber import BER, BERException
from util import format_binary, inflate_long, deflate_long
import util
from pkey import PKey
from ssh_exception import SSHException
@ -38,15 +42,15 @@ class RSAKey (PKey):
"""
def __init__(self, msg=None, data=''):
self.valid = 0
self.valid = False
if (msg is None) and (data is not None):
msg = Message(data)
if (msg is None) or (msg.get_string() != 'ssh-rsa'):
return
self.e = msg.get_mpint()
self.n = msg.get_mpint()
self.size = len(deflate_long(self.n, 0))
self.valid = 1
self.size = len(util.deflate_long(self.n, 0))
self.valid = True
def __str__(self):
if not self.valid:
@ -78,7 +82,7 @@ class RSAKey (PKey):
def sign_ssh_data(self, randpool, data):
hash = SHA.new(data).digest()
rsa = RSA.construct((long(self.n), long(self.e), long(self.d)))
sig = deflate_long(rsa.sign(self._pkcs1imify(hash), '')[0], 0)
sig = util.deflate_long(rsa.sign(self._pkcs1imify(hash), '')[0], 0)
m = Message()
m.add_string('ssh-rsa')
m.add_string(sig)
@ -87,18 +91,18 @@ class RSAKey (PKey):
def verify_ssh_sig(self, data, msg):
if (not self.valid) or (msg.get_string() != 'ssh-rsa'):
return False
sig = inflate_long(msg.get_string(), 1)
sig = util.inflate_long(msg.get_string(), 1)
# verify the signature by SHA'ing the data and encrypting it using the
# public key. some wackiness ensues where we "pkcs1imify" the 20-byte
# hash into a string as long as the RSA key.
hash = inflate_long(self._pkcs1imify(SHA.new(data).digest()), 1)
hash = util.inflate_long(self._pkcs1imify(SHA.new(data).digest()), 1)
rsa = RSA.construct((long(self.n), long(self.e)))
return rsa.verify(hash, (sig,))
def read_private_key_file(self, filename, password=None):
# private key file contains:
# RSAPrivateKey = { version = 0, n, e, d, p, q, d mod p-1, d mod q-1, q**-1 mod p }
self.valid = 0
self.valid = False
data = self._read_private_key_file('RSA', filename, password)
try:
keylist = BER(data).decode()
@ -112,5 +116,30 @@ class RSAKey (PKey):
# not really needed
self.p = keylist[4]
self.q = keylist[5]
self.size = len(deflate_long(self.n, 0))
self.valid = 1
self.size = len(util.deflate_long(self.n, 0))
self.valid = True
def write_private_key_file(self, filename, password=None):
if not self.valid:
raise SSHException('Invalid key')
keylist = [ 0, self.n, self.e, self.d, self.p, self.q,
self.d % (self.p - 1), self.d % (self.q - 1),
util.mod_inverse(self.q, self.p) ]
try:
b = BER()
b.encode(keylist)
except BERException:
raise SSHException('Unable to create ber encoding of key')
self._write_private_key_file('RSA', filename, str(b), password)
def generate(bits, progress_func=None):
rsa = RSA.generate(bits, randpool.get_bytes, progress_func)
key = RSAKey()
key.n = rsa.n
key.e = rsa.e
key.d = rsa.d
key.p = rsa.p
key.q = rsa.q
key.valid = True
return key
generate = staticmethod(generate)