add HostKeys, a helper for reading/parsing openssh known_hosts files, including hashed-host support
This commit is contained in:
Robey Pointer 2006-02-19 16:35:13 -08:00
parent 08c9efc86a
commit 409c4fc482
7 changed files with 298 additions and 30 deletions

2
README
View File

@ -257,6 +257,4 @@ v1.0 JIGGLYPUFF
* SFTPClient.set_size
* remove @since that predate 1.0
* put examples in examples/ folder
* support .ssh/known_hosts files made with HashKnownHosts
* sftp server mode should convert all paths to unicode before calling into sftp_si

View File

@ -1,4 +1,4 @@
# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
# Copyright (C) 2003-2006 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
@ -84,6 +84,7 @@ from packet import Packetizer
from file import BufferedFile
from agent import Agent, AgentKey
from pkey import PKey
from hostkeys import HostKeys
# fix module names for epydoc
for x in [Transport, SecurityOptions, Channel, SFTPServer, SSHException, \
@ -91,7 +92,7 @@ for x in [Transport, SecurityOptions, Channel, SFTPServer, SSHException, \
SubsystemHandler, AuthHandler, RSAKey, DSSKey, SFTPError, \
SFTP, SFTPClient, SFTPServer, Message, Packetizer, SFTPAttributes, \
SFTPHandle, SFTPServerInterface, BufferedFile, Agent, AgentKey, \
PKey, BaseSFTP, SFTPFile, ServerInterface]:
PKey, BaseSFTP, SFTPFile, ServerInterface, HostKeys]:
x.__module__ = 'paramiko'
from common import AUTH_SUCCESSFUL, AUTH_PARTIALLY_SUCCESSFUL, AUTH_FAILED, \
@ -124,4 +125,5 @@ __all__ = [ 'Transport',
'BufferedFile',
'Agent',
'AgentKey',
'HostKeys',
'util' ]

187
paramiko/hostkeys.py Normal file
View File

@ -0,0 +1,187 @@
# Copyright (C) 2006 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
# Paramiko is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
"""
L{HostKeys}
"""
import base64
from Crypto.Hash import SHA, HMAC
from paramiko.common import *
from paramiko.dsskey import DSSKey
from paramiko.rsakey import RSAKey
class HostKeys (object):
"""
Representation of an openssh-style "known hosts" file. Host keys can be
read from one or more files, and then individual hosts can be looked up to
verify server keys during SSH negotiation.
A HostKeys object can be treated like a dict; any dict lookup is equivalent
to calling L{lookup}.
@since: 1.5.3
"""
def __init__(self, filename=None):
"""
Create a new HostKeys object, optionally loading keys from an openssh
style host-key file.
@param filename: filename to load host keys from, or C{None}
@type filename: str
"""
# hostname -> keytype -> PKey
self.keys = {}
self.contains_hashes = False
if filename is not None:
self.load(filename)
def add(self, hostname, keytype, key):
"""
Add a host key entry to the table. Any existing entry for a
C{(hostname, keytype)} pair will be replaced.
@param hostname:
@type hostname: str
@param keytype: key type (C{"ssh-rsa"} or C{"ssh-dss"})
@type keytype: str
@param key: the key to add
@type key: L{PKey}
"""
if not hostname in self.keys:
self.keys[hostname] = {}
if hostname.startswith('|1|'):
self.contains_hashes = True
self.keys[hostname][keytype] = key
def load(self, filename):
"""
Read a file of known SSH host keys, in the format used by openssh.
This type of file unfortunately doesn't exist on Windows, but on
posix, it will usually be stored in
C{os.path.expanduser("~/.ssh/known_hosts")}.
@param filename: name of the file to read host keys from
@type filename: str
"""
f = file(filename, 'r')
for line in f:
line = line.strip()
if (len(line) == 0) or (line[0] == '#'):
continue
keylist = line.split(' ')
if len(keylist) != 3:
# don't understand this line
continue
hostlist, keytype, key = keylist
for host in hostlist.split(','):
if keytype == 'ssh-rsa':
self.add(host, keytype, RSAKey(data=base64.decodestring(key)))
elif keytype == 'ssh-dss':
self.add(host, keytype, DSSKey(data=base64.decodestring(key)))
f.close()
def lookup(self, hostname):
"""
Find a hostkey entry for a given hostname or IP. If no entry is found,
C{None} is returned. Otherwise a dictionary of keytype to key is
returned.
@param hostname: the hostname to lookup
@type hostname: str
@return: keys associated with this host (or C{None})
@rtype: dict(str, L{PKey})
"""
if hostname in self.keys:
return self.keys[hostname]
if not self.contains_hashes:
return None
for h in self.keys.keys():
if h.startswith('|1|'):
hmac = self.hash_host(hostname, h)
if hmac == h:
return self.keys[h]
return None
def check(self, hostname, key):
"""
Return True if the given key is associated with the given hostname
in this dictionary.
@param hostname: hostname (or IP) of the SSH server
@type hostname: str
@param key: the key to check
@type key: L{PKey}
@return: C{True} if the key is associated with the hostname; C{False}
if not
@rtype: bool
"""
k = self.lookup(hostname)
if k is None:
return False
host_key = k.get(key.get_name(), None)
if host_key is None:
return False
return str(host_key) == str(key)
def clear(self):
"""
Remove all host keys from the dictionary.
"""
self.keys = {}
self.contains_hashes = False
def values(self):
return self.keys.values();
def __getitem__(self, key):
ret = self.lookup(key)
if ret is None:
raise KeyError(key)
return ret
def __len__(self):
return len(self.keys)
def hash_host(hostname, salt=None):
"""
Return a "hashed" form of the hostname, as used by openssh when storing
hashed hostnames in the known_hosts file.
@param hostname: the hostname to hash
@type hostname: str
@param salt: optional salt to use when hashing (must be 20 bytes long)
@type salt: str
@return: the hashed hostname
@rtype: str
"""
if salt is None:
salt = randpool.get_bytes(SHA.digest_size)
else:
if salt.startswith('|1|'):
salt = salt.split('|')[2]
salt = base64.decodestring(salt)
assert len(salt) == SHA.digest_size
hmac = HMAC.HMAC(salt, hostname, SHA).digest()
hostkey = '|1|%s|%s' % (base64.encodestring(salt), base64.encodestring(hmac))
return hostkey.replace('\n', '')
hash_host = staticmethod(hash_host)

View File

@ -189,35 +189,15 @@ def load_host_keys(filename):
This type of file unfortunately doesn't exist on Windows, but on posix,
it will usually be stored in C{os.path.expanduser("~/.ssh/known_hosts")}.
Since 1.5.3, this is just a wrapper around L{HostKeys}.
@param filename: name of the file to read host keys from
@type filename: str
@return: dict of host keys, indexed by hostname and then keytype
@rtype: dict(hostname, dict(keytype, L{PKey <paramiko.pkey.PKey>}))
"""
import base64
from rsakey import RSAKey
from dsskey import DSSKey
keys = {}
f = file(filename, 'r')
for line in f:
line = line.strip()
if (len(line) == 0) or (line[0] == '#'):
continue
keylist = line.split(' ')
if len(keylist) != 3:
continue
hostlist, keytype, key = keylist
hosts = hostlist.split(',')
for host in hosts:
if not keys.has_key(host):
keys[host] = {}
if keytype == 'ssh-rsa':
keys[host][keytype] = RSAKey(data=base64.decodestring(key))
elif keytype == 'ssh-dss':
keys[host][keytype] = DSSKey(data=base64.decodestring(key))
f.close()
return keys
from paramiko.hostkeys import HostKeys
return HostKeys(filename)
def parse_ssh_config(file_obj):
"""
@ -355,3 +335,5 @@ def get_logger(name):
l = logging.getLogger(name)
l.addFilter(_pfilter)
return l

View File

@ -31,6 +31,7 @@ sys.path.append('tests/')
from test_message import MessageTest
from test_file import BufferedFileTest
from test_util import UtilTest
from test_hostkeys import HostKeysTest
from test_pkey import KeyTest
from test_kex import KexTest
from test_packetizer import PacketizerTest
@ -89,6 +90,7 @@ suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(MessageTest))
suite.addTest(unittest.makeSuite(BufferedFileTest))
suite.addTest(unittest.makeSuite(UtilTest))
suite.addTest(unittest.makeSuite(HostKeysTest))
if options.use_pkey:
suite.addTest(unittest.makeSuite(KeyTest))
suite.addTest(unittest.makeSuite(KexTest))

73
tests/test_hostkeys.py Normal file
View File

@ -0,0 +1,73 @@
#!/usr/bin/python
# Copyright (C) 2006 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
# Paramiko is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
"""
Some unit tests for HostKeys.
"""
import base64
import os
import unittest
import paramiko
test_hosts_file = """\
secure.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkpKhOk5r\
9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11Ml8om3\
D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz9uhc=
happy.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\
BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M=
"""
keyblob = """\
AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31MBGQ3GQ/Fc7SX6gkpXkwcZryoi4k\
NFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW5ymME3bQ4J/k1IKxCtz/bAlAqFgK\
oc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M="""
class HostKeysTest (unittest.TestCase):
def setUp(self):
f = open('hostfile.temp', 'w')
f.write(test_hosts_file)
f.close()
def tearDown(self):
os.unlink('hostfile.temp')
def test_1_load(self):
hostdict = paramiko.HostKeys('hostfile.temp')
self.assertEquals(2, len(hostdict))
self.assertEquals(1, len(hostdict.values()[0]))
self.assertEquals(1, len(hostdict.values()[1]))
fp = paramiko.util.hexify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint())
self.assertEquals('E6684DB30E109B67B70FF1DC5C7F1363', fp)
def test_2_add(self):
hostdict = paramiko.HostKeys('hostfile.temp')
hh = '|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c='
key = paramiko.RSAKey(data=base64.decodestring(keyblob))
hostdict.add(hh, 'ssh-rsa', key)
self.assertEquals(3, len(hostdict))
x = hostdict['foo.example.com']
fp = paramiko.util.hexify(x['ssh-rsa'].get_fingerprint())
self.assertEquals('7EC91BB336CB6D810B124B1353C32396', fp)
self.assertTrue(hostdict.check('foo.example.com', key))

View File

@ -23,6 +23,7 @@ Some unit tests for utility functions.
"""
import cStringIO
import os
import unittest
from Crypto.Hash import SHA
import paramiko.util
@ -43,11 +44,18 @@ Host spoo.example.com
Crazy something else
"""
test_hosts_file = """\
secure.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkpKhOk5r\
9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11Ml8om3\
D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz9uhc=
happy.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\
BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M=
"""
class UtilTest (unittest.TestCase):
K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504L
def setUp(self):
pass
@ -78,3 +86,19 @@ class UtilTest (unittest.TestCase):
x = paramiko.util.generate_key_bytes(SHA, 'ABCDEFGH', 'This is my secret passphrase.', 64)
hex = ''.join(['%02x' % ord(c) for c in x])
self.assertEquals(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
def test_4_host_keys(self):
f = open('hostfile.temp', 'w')
f.write(test_hosts_file)
f.close()
try:
hostdict = paramiko.util.load_host_keys('hostfile.temp')
self.assertEquals(2, len(hostdict))
self.assertEquals(1, len(hostdict.values()[0]))
self.assertEquals(1, len(hostdict.values()[1]))
fp = paramiko.util.hexify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint())
self.assertEquals('E6684DB30E109B67B70FF1DC5C7F1363', fp)
finally:
os.unlink('hostfile.temp')