From 14f2193d3979c910d205282a43ca2da8f85f3915 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Mon, 8 Mar 2004 17:54:19 +0000 Subject: [PATCH] [project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--patch-32] add unit tests add unit tests for BufferedFile and SFTP (it's a start). remove the demo sftp client because it was 99% copied from the other demos, which makes it kinda confusing. the unit tests are a much better example. --- demo_sftp.py | 157 ----------------------- test.py | 35 ++++++ tests/test_file.py | 139 +++++++++++++++++++++ tests/test_sftp.py | 303 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 477 insertions(+), 157 deletions(-) delete mode 100755 demo_sftp.py create mode 100755 test.py create mode 100755 tests/test_file.py create mode 100755 tests/test_sftp.py diff --git a/demo_sftp.py b/demo_sftp.py deleted file mode 100755 index 1f6cd3a..0000000 --- a/demo_sftp.py +++ /dev/null @@ -1,157 +0,0 @@ -#!/usr/bin/python - -import sys, os, socket, threading, getpass, logging, time, base64, select, termios, tty, traceback -import paramiko - - -##### utility functions - -def load_host_keys(): - filename = os.environ['HOME'] + '/.ssh/known_hosts' - keys = {} - try: - f = open(filename, 'r') - except Exception, e: - print '*** Unable to open host keys file (%s)' % filename - return - for line in f: - keylist = line.split(' ') - if len(keylist) != 3: - continue - hostlist, keytype, key = keylist - hosts = hostlist.split(',') - for host in hosts: - if not keys.has_key(host): - keys[host] = {} - keys[host][keytype] = base64.decodestring(key) - f.close() - return keys - - -##### main demo - -# setup logging -l = logging.getLogger("paramiko") -l.setLevel(logging.DEBUG) -if len(l.handlers) == 0: - f = open('demo.log', 'w') - lh = logging.StreamHandler(f) - lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S')) - l.addHandler(lh) - - -username = '' -if len(sys.argv) > 1: - hostname = sys.argv[1] - if hostname.find('@') >= 0: - username, hostname = hostname.split('@') -else: - hostname = raw_input('Hostname: ') -if len(hostname) == 0: - print '*** Hostname required.' - sys.exit(1) -port = 22 -if hostname.find(':') >= 0: - hostname, portstr = hostname.split(':') - port = int(portstr) - -# now connect -try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((hostname, port)) -except Exception, e: - print '*** Connect failed: ' + str(e) - traceback.print_exc() - sys.exit(1) - -try: - event = threading.Event() - t = paramiko.Transport(sock) - t.start_client(event) - # print repr(t) - event.wait(15) - if not t.is_active(): - print '*** SSH negotiation failed.' - sys.exit(1) - # print repr(t) - - keys = load_host_keys() - keytype, hostkey = t.get_remote_server_key() - if not keys.has_key(hostname): - print '*** WARNING: Unknown host key!' - elif not keys[hostname].has_key(keytype): - print '*** WARNING: Unknown host key!' - elif keys[hostname][keytype] != hostkey: - print '*** WARNING: Host key has changed!!!' - sys.exit(1) - else: - print '*** Host key OK.' - - event.clear() - - # get username - if username == '': - default_username = getpass.getuser() - username = raw_input('Username [%s]: ' % default_username) - if len(username) == 0: - username = default_username - - # ask for what kind of authentication to try - default_auth = 'p' - auth = raw_input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth) - if len(auth) == 0: - auth = default_auth - - if auth == 'r': - key = paramiko.RSAKey() - default_path = os.environ['HOME'] + '/.ssh/id_rsa' - path = raw_input('RSA key [%s]: ' % default_path) - if len(path) == 0: - path = default_path - try: - key.read_private_key_file(path) - except paramiko.PasswordRequiredException: - password = getpass.getpass('RSA key password: ') - key.read_private_key_file(path, password) - t.auth_publickey(username, key, event) - elif auth == 'd': - key = paramiko.DSSKey() - default_path = os.environ['HOME'] + '/.ssh/id_dsa' - path = raw_input('DSS key [%s]: ' % default_path) - if len(path) == 0: - path = default_path - try: - key.read_private_key_file(path) - except paramiko.PasswordRequiredException: - password = getpass.getpass('DSS key password: ') - key.read_private_key_file(path, password) - t.auth_key(username, key, event) - else: - pw = getpass.getpass('Password for %s@%s: ' % (username, hostname)) - t.auth_password(username, pw, event) - - event.wait(10) - # print repr(t) - if not t.is_authenticated(): - print '*** Authentication failed. :(' - t.close() - sys.exit(1) - - chan = t.open_session() - chan.invoke_subsystem('sftp') - print '*** SFTP...' - sftp = paramiko.SFTP(chan) - print repr(sftp.listdir('/tmp')) - - chan.close() - t.close() - -except Exception, e: - print '*** Caught exception: ' + str(e.__class__) + ': ' + str(e) - traceback.print_exc() - try: - t.close() - except: - pass - sys.exit(1) - diff --git a/test.py b/test.py new file mode 100755 index 0000000..15a9b77 --- /dev/null +++ b/test.py @@ -0,0 +1,35 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2004 Robey Pointer +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Foobar; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +do the unit tests! +""" + +import sys, os, unittest +sys.path.append('tests/') + +from test_file import BufferedFileTest +from test_sftp import SFTPTest + +suite = unittest.TestSuite() +suite.addTest(unittest.makeSuite(BufferedFileTest)) +suite.addTest(unittest.makeSuite(SFTPTest)) +unittest.TextTestRunner(verbosity=2).run(suite) + diff --git a/tests/test_file.py b/tests/test_file.py new file mode 100755 index 0000000..222b681 --- /dev/null +++ b/tests/test_file.py @@ -0,0 +1,139 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2004 Robey Pointer +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Foobar; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for the BufferedFile abstraction. +""" + +import unittest +from paramiko.file import BufferedFile + + +class LoopbackFile (BufferedFile): + """ + BufferedFile object that you can write data into, and then read it back. + """ + def __init__(self, mode='r', bufsize=-1): + BufferedFile.__init__(self) + self._set_mode(mode, bufsize) + self.buffer = '' + + def _read(self, size): + if len(self.buffer) == 0: + return None + if size > len(self.buffer): + size = len(self.buffer) + data = self.buffer[:size] + self.buffer = self.buffer[size:] + return data + + def _write(self, data): + self.buffer += data + return len(data) + + +class BufferedFileTest (unittest.TestCase): + + def test_1_simple(self): + f = LoopbackFile('r') + try: + f.write('hi') + self.assert_(False, 'no exception on write to read-only file') + except: + pass + f.close() + + f = LoopbackFile('w') + try: + f.read(1) + self.assert_(False, 'no exception to read from write-only file') + except: + pass + f.close() + + def test_2_readline(self): + f = LoopbackFile('r+U') + f.write('First line.\nSecond line.\r\nFinal line non-terminated.') + self.assertEqual(f.readline(), 'First line.\n') + # universal newline mode should convert this linefeed: + self.assertEqual(f.readline(), 'Second line.\n') + self.assertEqual(f.readline(), 'Final line non-terminated.') + self.assertEqual(f.readline(), '') + f.close() + try: + f.readline() + self.assert_(False, 'no exception on readline of closed file') + except IOError: + pass + self.assert_('\n' in f.newlines) + self.assert_('\r\n' in f.newlines) + self.assert_('\r' not in f.newlines) + + def test_3_lf(self): + """ + try to trick the linefeed detector. + """ + f = LoopbackFile('r+U') + f.write('First line.\r') + self.assertEqual(f.readline(), 'First line.\n') + f.write('\nSecond.\r\n') + self.assertEqual(f.readline(), 'Second.\n') + f.close() + self.assertEqual(f.newlines, '\r\n') + + def test_4_write(self): + """ + verify that write buffering is on. + """ + f = LoopbackFile('r+', 1) + f.write('Complete line.\nIncomplete line.') + self.assertEqual(f.readline(), 'Complete line.\n') + self.assertEqual(f.readline(), '') + f.write('..\n') + self.assertEqual(f.readline(), 'Incomplete line...\n') + f.close() + + def test_5_flush(self): + """ + verify that flush will force a write. + """ + f = LoopbackFile('r+', 512) + f.write('Not\nquite\n512 bytes.\n') + self.assertEqual(f.read(1), '') + f.flush() + self.assertEqual(f.read(5), 'Not\nq') + self.assertEqual(f.read(10), 'uite\n512 b') + self.assertEqual(f.read(9), 'ytes.\n') + self.assertEqual(f.read(3), '') + f.close() + + def test_6_buffering(self): + """ + verify that flushing happens automatically on buffer crossing. + """ + f = LoopbackFile('r+', 16) + f.write('Too small.') + self.assertEqual(f.read(4), '') + f.write(' ') + self.assertEqual(f.read(4), '') + f.write('Enough.') + self.assertEqual(f.read(20), 'Too small. Enough.') + f.close() + diff --git a/tests/test_sftp.py b/tests/test_sftp.py new file mode 100755 index 0000000..798853f --- /dev/null +++ b/tests/test_sftp.py @@ -0,0 +1,303 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2004 Robey Pointer +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Foobar; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +some unit tests to make sure sftp works. + +a real actual sftp server is contacted, and a new folder is created there to +do test file operations in (so no existing files will be harmed). +""" + +import sys, os + +# need a host and private-key where we have acecss +HOST = os.environ.get('TEST_HOST', 'localhost') +USER = os.environ.get('TEST_USER', os.environ.get('USER', 'nobody')) +PKEY = os.environ.get('TEST_PKEY', os.path.join(os.environ.get('HOME', '/'), '.ssh/id_rsa')) +FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing') + +import paramiko, logging, unittest + +ARTICLE = ''' +Insulin sensitivity and liver insulin receptor structure in ducks from two +genera + +T. Constans, B. Chevalier, M. Derouet and J. Simon +Station de Recherches Avicoles, Institut National de la Recherche Agronomique, +Nouzilly, France. + +Insulin sensitivity and liver insulin receptor structure were studied in +5-wk-old ducks from two genera (Muscovy and Pekin). In the fasting state, both +duck types were equally resistant to exogenous insulin compared with chicken. +Despite the low potency of duck insulin, the number of insulin receptors was +lower in Muscovy duck and similar in Pekin duck and chicken liver membranes. +After 125I-insulin cross-linking, the size of the alpha-subunit of the +receptors from the three species was 135,000. Wheat germ agglutinin-purified +receptors from the three species were contaminated by an active and unusual +adenosinetriphosphatase (ATPase) contaminant (highest activity in Muscovy +duck). Sequential purification of solubilized receptor from both duck types on +lentil and then wheat germ agglutinin lectins led to a fraction of receptors +very poor in ATPase activity that exhibited a beta-subunit size (95,000) and +tyrosine kinase activity similar to those of ATPase-free chicken insulin +receptors. Therefore the ducks from the two genera exhibit an alpha-beta- +structure for liver insulin receptors and a clear difference in the number of +liver insulin receptors. Their sensitivity to insulin is, however, similarly +decreased compared with chicken. +''' + + +# setup logging +l = logging.getLogger('paramiko') +l.setLevel(logging.DEBUG) +if len(l.handlers) == 0: + f = open('test.log', 'w') + lh = logging.StreamHandler(f) + lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S')) + l.addHandler(lh) +t = paramiko.Transport(HOST) +key = paramiko.RSAKey() +key.read_private_key_file(PKEY) +try: + t.connect(username=USER, pkey=key) +except paramiko.SSHException: + t.close() + sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n') + sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n') + sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % HOST) + sys.stderr.write(' (Override the SSH host with environment variable TEST_HOST.)\n') + sys.stderr.write('* The username to auth as (%s) is invalid.\n' % USER) + sys.stderr.write(' (Override the auth user with environment variable TEST_USER.)\n') + sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % PKEY) + sys.stderr.write(' (Override the private key location with environment variable TEST_PKEY.)\n') + sys.stderr.write('\n') + sys.exit(1) +sftp = paramiko.SFTP.from_transport(t) + + +class SFTPTest (unittest.TestCase): + + def setUp(self): + sftp.mkdir(FOLDER) + + def tearDown(self): + sftp.rmdir(FOLDER) + + def test_1_folder(self): + """ + create a temporary folder, verify that we can create a file in it, then + remove the folder and verify that we can't create a file in it anymore. + """ + f = sftp.open(FOLDER + '/test', 'w') + try: + self.assertEqual(f.stat().st_size, 0) + f.close() + try: + f = sftp.open(FOLDER + '/test', 'w') + # shouldn't be able to create that file + self.assert_(False, 'no exception at dummy file creation') + except: + pass + finally: + sftp.remove(FOLDER + '/test') + + def test_2_write(self): + """ + verify that a file can be created and written, and the size is correct. + """ + f = sftp.open(FOLDER + '/duck.txt', 'w') + try: + f.write(ARTICLE) + f.close() + self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + finally: + sftp.remove(FOLDER + '/duck.txt') + + def test_3_append(self): + """ + verify that a file can be opened for append, and tell() still works. + """ + f = sftp.open(FOLDER + '/append.txt', 'w') + try: + f.write('first line\nsecond line\n') + self.assertEqual(f.tell(), 23) + f.close() + + f = sftp.open(FOLDER + '/append.txt', 'a+') + f.write('third line!!!\n') + self.assertEqual(f.tell(), 37) + self.assertEqual(f.stat().st_size, 37) + f.seek(-26, f.SEEK_CUR) + self.assertEqual(f.readline(), 'second line\n') + f.close() + finally: + sftp.remove(FOLDER + '/append.txt') + + def test_4_rename(self): + """ + verify that renaming a file works. + """ + f = sftp.open(FOLDER + '/first.txt', 'w') + try: + f.write('content!\n'); + f.close() + sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt') + try: + f = sftp.open(FOLDER + '/first.txt', 'r') + self.assert_(False, 'no exception on reading nonexistent file') + except: + pass + f = sftp.open(FOLDER + '/second.txt', 'r') + f.seek(-6, f.SEEK_END) + self.assertEqual(f.read(4), 'tent') + f.close() + finally: + try: + sftp.remove(FOLDER + '/first.txt') + except: + pass + try: + sftp.remove(FOLDER + '/second.txt') + except: + pass + + def test_5_listdir(self): + """ + verify that a folder can be created, a bunch of files can be placed in it, + and those files show up in sftp.listdir. + """ + try: + f = sftp.open(FOLDER + '/duck.txt', 'w') + f.close() + + f = sftp.open(FOLDER + '/fish.txt', 'w') + f.close() + + f = sftp.open(FOLDER + '/tertiary.py', 'w') + f.close() + + x = sftp.listdir(FOLDER) + self.assertEqual(len(x), 3) + self.assert_('duck.txt' in x) + self.assert_('fish.txt' in x) + self.assert_('tertiary.py' in x) + self.assert_('random' not in x) + finally: + sftp.remove(FOLDER + '/duck.txt') + sftp.remove(FOLDER + '/fish.txt') + sftp.remove(FOLDER + '/tertiary.py') + + def test_6_setstat(self): + """ + verify that the setstat functions (chown, chmod, utime) work. + """ + f = sftp.open(FOLDER + '/special', 'w') + try: + f.close() + + stat = sftp.stat(FOLDER + '/special') + sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600) + self.assertEqual(sftp.stat(FOLDER + '/special').st_mode & 0777, 0600) + + mtime = stat.st_mtime - 3600 + atime = stat.st_atime - 1800 + sftp.utime(FOLDER + '/special', (atime, mtime)) + nstat = sftp.stat(FOLDER + '/special') + self.assertEqual(nstat.st_mtime, mtime) + self.assertEqual(nstat.st_atime, atime) + + # can't really test chown, since we'd have to know a valid uid. + finally: + sftp.remove(FOLDER + '/special') + + def test_7_readline_seek(self): + """ + create a text file and write a bunch of text into it. then count the lines + in the file, and seek around to retreive particular lines. this should + verify that read buffering and 'tell' work well together, and that read + buffering is reset on 'seek'. + """ + try: + f = sftp.open(FOLDER + '/duck.txt', 'w') + f.write(ARTICLE) + f.close() + + f = sftp.open(FOLDER + '/duck.txt', 'r+') + line_number = 0 + loc = 0 + pos_list = [] + for line in f: + line_number += 1 + pos_list.append(loc) + loc = f.tell() + f.seek(pos_list[6], f.SEEK_SET) + self.assertEqual(f.readline(), 'Nouzilly, France.\n') + f.seek(pos_list[17], f.SEEK_SET) + self.assertEqual(f.readline()[:4], 'duck') + f.seek(pos_list[10], f.SEEK_SET) + self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n') + f.close() + finally: + sftp.remove(FOLDER + '/duck.txt') + + def test_8_write_seek(self): + """ + create a text file, seek back and change part of it, and verify that the + changes worked. + """ + f = sftp.open(FOLDER + '/testing.txt', 'w') + try: + f.write('hello kitty.\n') + f.seek(-5, f.SEEK_CUR) + f.write('dd') + f.close() + + self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13) + f = sftp.open(FOLDER + '/testing.txt', 'r') + data = f.read(20) + f.close() + self.assertEqual(data, 'hello kiddy.\n') + finally: + sftp.remove(FOLDER + '/testing.txt') + + def test_9_symlink(self): + """ + create a symlink and then check that lstat doesn't follow it. + """ + f = sftp.open(FOLDER + '/original.txt', 'w') + try: + f.write('original\n') + f.close() + sftp.symlink('original.txt', FOLDER + '/link.txt') + self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt') + + f = sftp.open(FOLDER + '/link.txt', 'r') + self.assertEqual(f.readlines(), [ 'original\n' ]) + f.close() + self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12) + self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9) + finally: + try: + sftp.remove(FOLDER + '/link.txt') + except: + pass + try: + sftp.remove(FOLDER + '/original.txt') + except: + pass