[project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--patch-32]
add unit tests add unit tests for BufferedFile and SFTP (it's a start). remove the demo sftp client because it was 99% copied from the other demos, which makes it kinda confusing. the unit tests are a much better example.
This commit is contained in:
parent
7cd7fced6e
commit
14f2193d39
157
demo_sftp.py
157
demo_sftp.py
|
@ -1,157 +0,0 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
import sys, os, socket, threading, getpass, logging, time, base64, select, termios, tty, traceback
|
||||
import paramiko
|
||||
|
||||
|
||||
##### utility functions
|
||||
|
||||
def load_host_keys():
|
||||
filename = os.environ['HOME'] + '/.ssh/known_hosts'
|
||||
keys = {}
|
||||
try:
|
||||
f = open(filename, 'r')
|
||||
except Exception, e:
|
||||
print '*** Unable to open host keys file (%s)' % filename
|
||||
return
|
||||
for line in f:
|
||||
keylist = line.split(' ')
|
||||
if len(keylist) != 3:
|
||||
continue
|
||||
hostlist, keytype, key = keylist
|
||||
hosts = hostlist.split(',')
|
||||
for host in hosts:
|
||||
if not keys.has_key(host):
|
||||
keys[host] = {}
|
||||
keys[host][keytype] = base64.decodestring(key)
|
||||
f.close()
|
||||
return keys
|
||||
|
||||
|
||||
##### main demo
|
||||
|
||||
# setup logging
|
||||
l = logging.getLogger("paramiko")
|
||||
l.setLevel(logging.DEBUG)
|
||||
if len(l.handlers) == 0:
|
||||
f = open('demo.log', 'w')
|
||||
lh = logging.StreamHandler(f)
|
||||
lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S'))
|
||||
l.addHandler(lh)
|
||||
|
||||
|
||||
username = ''
|
||||
if len(sys.argv) > 1:
|
||||
hostname = sys.argv[1]
|
||||
if hostname.find('@') >= 0:
|
||||
username, hostname = hostname.split('@')
|
||||
else:
|
||||
hostname = raw_input('Hostname: ')
|
||||
if len(hostname) == 0:
|
||||
print '*** Hostname required.'
|
||||
sys.exit(1)
|
||||
port = 22
|
||||
if hostname.find(':') >= 0:
|
||||
hostname, portstr = hostname.split(':')
|
||||
port = int(portstr)
|
||||
|
||||
# now connect
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect((hostname, port))
|
||||
except Exception, e:
|
||||
print '*** Connect failed: ' + str(e)
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
event = threading.Event()
|
||||
t = paramiko.Transport(sock)
|
||||
t.start_client(event)
|
||||
# print repr(t)
|
||||
event.wait(15)
|
||||
if not t.is_active():
|
||||
print '*** SSH negotiation failed.'
|
||||
sys.exit(1)
|
||||
# print repr(t)
|
||||
|
||||
keys = load_host_keys()
|
||||
keytype, hostkey = t.get_remote_server_key()
|
||||
if not keys.has_key(hostname):
|
||||
print '*** WARNING: Unknown host key!'
|
||||
elif not keys[hostname].has_key(keytype):
|
||||
print '*** WARNING: Unknown host key!'
|
||||
elif keys[hostname][keytype] != hostkey:
|
||||
print '*** WARNING: Host key has changed!!!'
|
||||
sys.exit(1)
|
||||
else:
|
||||
print '*** Host key OK.'
|
||||
|
||||
event.clear()
|
||||
|
||||
# get username
|
||||
if username == '':
|
||||
default_username = getpass.getuser()
|
||||
username = raw_input('Username [%s]: ' % default_username)
|
||||
if len(username) == 0:
|
||||
username = default_username
|
||||
|
||||
# ask for what kind of authentication to try
|
||||
default_auth = 'p'
|
||||
auth = raw_input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth)
|
||||
if len(auth) == 0:
|
||||
auth = default_auth
|
||||
|
||||
if auth == 'r':
|
||||
key = paramiko.RSAKey()
|
||||
default_path = os.environ['HOME'] + '/.ssh/id_rsa'
|
||||
path = raw_input('RSA key [%s]: ' % default_path)
|
||||
if len(path) == 0:
|
||||
path = default_path
|
||||
try:
|
||||
key.read_private_key_file(path)
|
||||
except paramiko.PasswordRequiredException:
|
||||
password = getpass.getpass('RSA key password: ')
|
||||
key.read_private_key_file(path, password)
|
||||
t.auth_publickey(username, key, event)
|
||||
elif auth == 'd':
|
||||
key = paramiko.DSSKey()
|
||||
default_path = os.environ['HOME'] + '/.ssh/id_dsa'
|
||||
path = raw_input('DSS key [%s]: ' % default_path)
|
||||
if len(path) == 0:
|
||||
path = default_path
|
||||
try:
|
||||
key.read_private_key_file(path)
|
||||
except paramiko.PasswordRequiredException:
|
||||
password = getpass.getpass('DSS key password: ')
|
||||
key.read_private_key_file(path, password)
|
||||
t.auth_key(username, key, event)
|
||||
else:
|
||||
pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))
|
||||
t.auth_password(username, pw, event)
|
||||
|
||||
event.wait(10)
|
||||
# print repr(t)
|
||||
if not t.is_authenticated():
|
||||
print '*** Authentication failed. :('
|
||||
t.close()
|
||||
sys.exit(1)
|
||||
|
||||
chan = t.open_session()
|
||||
chan.invoke_subsystem('sftp')
|
||||
print '*** SFTP...'
|
||||
sftp = paramiko.SFTP(chan)
|
||||
print repr(sftp.listdir('/tmp'))
|
||||
|
||||
chan.close()
|
||||
t.close()
|
||||
|
||||
except Exception, e:
|
||||
print '*** Caught exception: ' + str(e.__class__) + ': ' + str(e)
|
||||
traceback.print_exc()
|
||||
try:
|
||||
t.close()
|
||||
except:
|
||||
pass
|
||||
sys.exit(1)
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
# Copyright (C) 2003-2004 Robey Pointer <robey@lag.net>
|
||||
#
|
||||
# This file is part of paramiko.
|
||||
#
|
||||
# Paramiko is free software; you can redistribute it and/or modify it under the
|
||||
# terms of the GNU Lesser General Public License as published by the Free
|
||||
# Software Foundation; either version 2.1 of the License, or (at your option)
|
||||
# any later version.
|
||||
#
|
||||
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
|
||||
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with Foobar; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
|
||||
|
||||
"""
|
||||
do the unit tests!
|
||||
"""
|
||||
|
||||
import sys, os, unittest
|
||||
sys.path.append('tests/')
|
||||
|
||||
from test_file import BufferedFileTest
|
||||
from test_sftp import SFTPTest
|
||||
|
||||
suite = unittest.TestSuite()
|
||||
suite.addTest(unittest.makeSuite(BufferedFileTest))
|
||||
suite.addTest(unittest.makeSuite(SFTPTest))
|
||||
unittest.TextTestRunner(verbosity=2).run(suite)
|
||||
|
|
@ -0,0 +1,139 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
# Copyright (C) 2003-2004 Robey Pointer <robey@lag.net>
|
||||
#
|
||||
# This file is part of paramiko.
|
||||
#
|
||||
# Paramiko is free software; you can redistribute it and/or modify it under the
|
||||
# terms of the GNU Lesser General Public License as published by the Free
|
||||
# Software Foundation; either version 2.1 of the License, or (at your option)
|
||||
# any later version.
|
||||
#
|
||||
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
|
||||
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with Foobar; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
|
||||
|
||||
"""
|
||||
Some unit tests for the BufferedFile abstraction.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from paramiko.file import BufferedFile
|
||||
|
||||
|
||||
class LoopbackFile (BufferedFile):
|
||||
"""
|
||||
BufferedFile object that you can write data into, and then read it back.
|
||||
"""
|
||||
def __init__(self, mode='r', bufsize=-1):
|
||||
BufferedFile.__init__(self)
|
||||
self._set_mode(mode, bufsize)
|
||||
self.buffer = ''
|
||||
|
||||
def _read(self, size):
|
||||
if len(self.buffer) == 0:
|
||||
return None
|
||||
if size > len(self.buffer):
|
||||
size = len(self.buffer)
|
||||
data = self.buffer[:size]
|
||||
self.buffer = self.buffer[size:]
|
||||
return data
|
||||
|
||||
def _write(self, data):
|
||||
self.buffer += data
|
||||
return len(data)
|
||||
|
||||
|
||||
class BufferedFileTest (unittest.TestCase):
|
||||
|
||||
def test_1_simple(self):
|
||||
f = LoopbackFile('r')
|
||||
try:
|
||||
f.write('hi')
|
||||
self.assert_(False, 'no exception on write to read-only file')
|
||||
except:
|
||||
pass
|
||||
f.close()
|
||||
|
||||
f = LoopbackFile('w')
|
||||
try:
|
||||
f.read(1)
|
||||
self.assert_(False, 'no exception to read from write-only file')
|
||||
except:
|
||||
pass
|
||||
f.close()
|
||||
|
||||
def test_2_readline(self):
|
||||
f = LoopbackFile('r+U')
|
||||
f.write('First line.\nSecond line.\r\nFinal line non-terminated.')
|
||||
self.assertEqual(f.readline(), 'First line.\n')
|
||||
# universal newline mode should convert this linefeed:
|
||||
self.assertEqual(f.readline(), 'Second line.\n')
|
||||
self.assertEqual(f.readline(), 'Final line non-terminated.')
|
||||
self.assertEqual(f.readline(), '')
|
||||
f.close()
|
||||
try:
|
||||
f.readline()
|
||||
self.assert_(False, 'no exception on readline of closed file')
|
||||
except IOError:
|
||||
pass
|
||||
self.assert_('\n' in f.newlines)
|
||||
self.assert_('\r\n' in f.newlines)
|
||||
self.assert_('\r' not in f.newlines)
|
||||
|
||||
def test_3_lf(self):
|
||||
"""
|
||||
try to trick the linefeed detector.
|
||||
"""
|
||||
f = LoopbackFile('r+U')
|
||||
f.write('First line.\r')
|
||||
self.assertEqual(f.readline(), 'First line.\n')
|
||||
f.write('\nSecond.\r\n')
|
||||
self.assertEqual(f.readline(), 'Second.\n')
|
||||
f.close()
|
||||
self.assertEqual(f.newlines, '\r\n')
|
||||
|
||||
def test_4_write(self):
|
||||
"""
|
||||
verify that write buffering is on.
|
||||
"""
|
||||
f = LoopbackFile('r+', 1)
|
||||
f.write('Complete line.\nIncomplete line.')
|
||||
self.assertEqual(f.readline(), 'Complete line.\n')
|
||||
self.assertEqual(f.readline(), '')
|
||||
f.write('..\n')
|
||||
self.assertEqual(f.readline(), 'Incomplete line...\n')
|
||||
f.close()
|
||||
|
||||
def test_5_flush(self):
|
||||
"""
|
||||
verify that flush will force a write.
|
||||
"""
|
||||
f = LoopbackFile('r+', 512)
|
||||
f.write('Not\nquite\n512 bytes.\n')
|
||||
self.assertEqual(f.read(1), '')
|
||||
f.flush()
|
||||
self.assertEqual(f.read(5), 'Not\nq')
|
||||
self.assertEqual(f.read(10), 'uite\n512 b')
|
||||
self.assertEqual(f.read(9), 'ytes.\n')
|
||||
self.assertEqual(f.read(3), '')
|
||||
f.close()
|
||||
|
||||
def test_6_buffering(self):
|
||||
"""
|
||||
verify that flushing happens automatically on buffer crossing.
|
||||
"""
|
||||
f = LoopbackFile('r+', 16)
|
||||
f.write('Too small.')
|
||||
self.assertEqual(f.read(4), '')
|
||||
f.write(' ')
|
||||
self.assertEqual(f.read(4), '')
|
||||
f.write('Enough.')
|
||||
self.assertEqual(f.read(20), 'Too small. Enough.')
|
||||
f.close()
|
||||
|
|
@ -0,0 +1,303 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
# Copyright (C) 2003-2004 Robey Pointer <robey@lag.net>
|
||||
#
|
||||
# This file is part of paramiko.
|
||||
#
|
||||
# Paramiko is free software; you can redistribute it and/or modify it under the
|
||||
# terms of the GNU Lesser General Public License as published by the Free
|
||||
# Software Foundation; either version 2.1 of the License, or (at your option)
|
||||
# any later version.
|
||||
#
|
||||
# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
|
||||
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with Foobar; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
|
||||
|
||||
"""
|
||||
some unit tests to make sure sftp works.
|
||||
|
||||
a real actual sftp server is contacted, and a new folder is created there to
|
||||
do test file operations in (so no existing files will be harmed).
|
||||
"""
|
||||
|
||||
import sys, os
|
||||
|
||||
# need a host and private-key where we have acecss
|
||||
HOST = os.environ.get('TEST_HOST', 'localhost')
|
||||
USER = os.environ.get('TEST_USER', os.environ.get('USER', 'nobody'))
|
||||
PKEY = os.environ.get('TEST_PKEY', os.path.join(os.environ.get('HOME', '/'), '.ssh/id_rsa'))
|
||||
FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing')
|
||||
|
||||
import paramiko, logging, unittest
|
||||
|
||||
ARTICLE = '''
|
||||
Insulin sensitivity and liver insulin receptor structure in ducks from two
|
||||
genera
|
||||
|
||||
T. Constans, B. Chevalier, M. Derouet and J. Simon
|
||||
Station de Recherches Avicoles, Institut National de la Recherche Agronomique,
|
||||
Nouzilly, France.
|
||||
|
||||
Insulin sensitivity and liver insulin receptor structure were studied in
|
||||
5-wk-old ducks from two genera (Muscovy and Pekin). In the fasting state, both
|
||||
duck types were equally resistant to exogenous insulin compared with chicken.
|
||||
Despite the low potency of duck insulin, the number of insulin receptors was
|
||||
lower in Muscovy duck and similar in Pekin duck and chicken liver membranes.
|
||||
After 125I-insulin cross-linking, the size of the alpha-subunit of the
|
||||
receptors from the three species was 135,000. Wheat germ agglutinin-purified
|
||||
receptors from the three species were contaminated by an active and unusual
|
||||
adenosinetriphosphatase (ATPase) contaminant (highest activity in Muscovy
|
||||
duck). Sequential purification of solubilized receptor from both duck types on
|
||||
lentil and then wheat germ agglutinin lectins led to a fraction of receptors
|
||||
very poor in ATPase activity that exhibited a beta-subunit size (95,000) and
|
||||
tyrosine kinase activity similar to those of ATPase-free chicken insulin
|
||||
receptors. Therefore the ducks from the two genera exhibit an alpha-beta-
|
||||
structure for liver insulin receptors and a clear difference in the number of
|
||||
liver insulin receptors. Their sensitivity to insulin is, however, similarly
|
||||
decreased compared with chicken.
|
||||
'''
|
||||
|
||||
|
||||
# setup logging
|
||||
l = logging.getLogger('paramiko')
|
||||
l.setLevel(logging.DEBUG)
|
||||
if len(l.handlers) == 0:
|
||||
f = open('test.log', 'w')
|
||||
lh = logging.StreamHandler(f)
|
||||
lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S'))
|
||||
l.addHandler(lh)
|
||||
t = paramiko.Transport(HOST)
|
||||
key = paramiko.RSAKey()
|
||||
key.read_private_key_file(PKEY)
|
||||
try:
|
||||
t.connect(username=USER, pkey=key)
|
||||
except paramiko.SSHException:
|
||||
t.close()
|
||||
sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n')
|
||||
sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n')
|
||||
sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % HOST)
|
||||
sys.stderr.write(' (Override the SSH host with environment variable TEST_HOST.)\n')
|
||||
sys.stderr.write('* The username to auth as (%s) is invalid.\n' % USER)
|
||||
sys.stderr.write(' (Override the auth user with environment variable TEST_USER.)\n')
|
||||
sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % PKEY)
|
||||
sys.stderr.write(' (Override the private key location with environment variable TEST_PKEY.)\n')
|
||||
sys.stderr.write('\n')
|
||||
sys.exit(1)
|
||||
sftp = paramiko.SFTP.from_transport(t)
|
||||
|
||||
|
||||
class SFTPTest (unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
sftp.mkdir(FOLDER)
|
||||
|
||||
def tearDown(self):
|
||||
sftp.rmdir(FOLDER)
|
||||
|
||||
def test_1_folder(self):
|
||||
"""
|
||||
create a temporary folder, verify that we can create a file in it, then
|
||||
remove the folder and verify that we can't create a file in it anymore.
|
||||
"""
|
||||
f = sftp.open(FOLDER + '/test', 'w')
|
||||
try:
|
||||
self.assertEqual(f.stat().st_size, 0)
|
||||
f.close()
|
||||
try:
|
||||
f = sftp.open(FOLDER + '/test', 'w')
|
||||
# shouldn't be able to create that file
|
||||
self.assert_(False, 'no exception at dummy file creation')
|
||||
except:
|
||||
pass
|
||||
finally:
|
||||
sftp.remove(FOLDER + '/test')
|
||||
|
||||
def test_2_write(self):
|
||||
"""
|
||||
verify that a file can be created and written, and the size is correct.
|
||||
"""
|
||||
f = sftp.open(FOLDER + '/duck.txt', 'w')
|
||||
try:
|
||||
f.write(ARTICLE)
|
||||
f.close()
|
||||
self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
|
||||
finally:
|
||||
sftp.remove(FOLDER + '/duck.txt')
|
||||
|
||||
def test_3_append(self):
|
||||
"""
|
||||
verify that a file can be opened for append, and tell() still works.
|
||||
"""
|
||||
f = sftp.open(FOLDER + '/append.txt', 'w')
|
||||
try:
|
||||
f.write('first line\nsecond line\n')
|
||||
self.assertEqual(f.tell(), 23)
|
||||
f.close()
|
||||
|
||||
f = sftp.open(FOLDER + '/append.txt', 'a+')
|
||||
f.write('third line!!!\n')
|
||||
self.assertEqual(f.tell(), 37)
|
||||
self.assertEqual(f.stat().st_size, 37)
|
||||
f.seek(-26, f.SEEK_CUR)
|
||||
self.assertEqual(f.readline(), 'second line\n')
|
||||
f.close()
|
||||
finally:
|
||||
sftp.remove(FOLDER + '/append.txt')
|
||||
|
||||
def test_4_rename(self):
|
||||
"""
|
||||
verify that renaming a file works.
|
||||
"""
|
||||
f = sftp.open(FOLDER + '/first.txt', 'w')
|
||||
try:
|
||||
f.write('content!\n');
|
||||
f.close()
|
||||
sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt')
|
||||
try:
|
||||
f = sftp.open(FOLDER + '/first.txt', 'r')
|
||||
self.assert_(False, 'no exception on reading nonexistent file')
|
||||
except:
|
||||
pass
|
||||
f = sftp.open(FOLDER + '/second.txt', 'r')
|
||||
f.seek(-6, f.SEEK_END)
|
||||
self.assertEqual(f.read(4), 'tent')
|
||||
f.close()
|
||||
finally:
|
||||
try:
|
||||
sftp.remove(FOLDER + '/first.txt')
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
sftp.remove(FOLDER + '/second.txt')
|
||||
except:
|
||||
pass
|
||||
|
||||
def test_5_listdir(self):
|
||||
"""
|
||||
verify that a folder can be created, a bunch of files can be placed in it,
|
||||
and those files show up in sftp.listdir.
|
||||
"""
|
||||
try:
|
||||
f = sftp.open(FOLDER + '/duck.txt', 'w')
|
||||
f.close()
|
||||
|
||||
f = sftp.open(FOLDER + '/fish.txt', 'w')
|
||||
f.close()
|
||||
|
||||
f = sftp.open(FOLDER + '/tertiary.py', 'w')
|
||||
f.close()
|
||||
|
||||
x = sftp.listdir(FOLDER)
|
||||
self.assertEqual(len(x), 3)
|
||||
self.assert_('duck.txt' in x)
|
||||
self.assert_('fish.txt' in x)
|
||||
self.assert_('tertiary.py' in x)
|
||||
self.assert_('random' not in x)
|
||||
finally:
|
||||
sftp.remove(FOLDER + '/duck.txt')
|
||||
sftp.remove(FOLDER + '/fish.txt')
|
||||
sftp.remove(FOLDER + '/tertiary.py')
|
||||
|
||||
def test_6_setstat(self):
|
||||
"""
|
||||
verify that the setstat functions (chown, chmod, utime) work.
|
||||
"""
|
||||
f = sftp.open(FOLDER + '/special', 'w')
|
||||
try:
|
||||
f.close()
|
||||
|
||||
stat = sftp.stat(FOLDER + '/special')
|
||||
sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600)
|
||||
self.assertEqual(sftp.stat(FOLDER + '/special').st_mode & 0777, 0600)
|
||||
|
||||
mtime = stat.st_mtime - 3600
|
||||
atime = stat.st_atime - 1800
|
||||
sftp.utime(FOLDER + '/special', (atime, mtime))
|
||||
nstat = sftp.stat(FOLDER + '/special')
|
||||
self.assertEqual(nstat.st_mtime, mtime)
|
||||
self.assertEqual(nstat.st_atime, atime)
|
||||
|
||||
# can't really test chown, since we'd have to know a valid uid.
|
||||
finally:
|
||||
sftp.remove(FOLDER + '/special')
|
||||
|
||||
def test_7_readline_seek(self):
|
||||
"""
|
||||
create a text file and write a bunch of text into it. then count the lines
|
||||
in the file, and seek around to retreive particular lines. this should
|
||||
verify that read buffering and 'tell' work well together, and that read
|
||||
buffering is reset on 'seek'.
|
||||
"""
|
||||
try:
|
||||
f = sftp.open(FOLDER + '/duck.txt', 'w')
|
||||
f.write(ARTICLE)
|
||||
f.close()
|
||||
|
||||
f = sftp.open(FOLDER + '/duck.txt', 'r+')
|
||||
line_number = 0
|
||||
loc = 0
|
||||
pos_list = []
|
||||
for line in f:
|
||||
line_number += 1
|
||||
pos_list.append(loc)
|
||||
loc = f.tell()
|
||||
f.seek(pos_list[6], f.SEEK_SET)
|
||||
self.assertEqual(f.readline(), 'Nouzilly, France.\n')
|
||||
f.seek(pos_list[17], f.SEEK_SET)
|
||||
self.assertEqual(f.readline()[:4], 'duck')
|
||||
f.seek(pos_list[10], f.SEEK_SET)
|
||||
self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n')
|
||||
f.close()
|
||||
finally:
|
||||
sftp.remove(FOLDER + '/duck.txt')
|
||||
|
||||
def test_8_write_seek(self):
|
||||
"""
|
||||
create a text file, seek back and change part of it, and verify that the
|
||||
changes worked.
|
||||
"""
|
||||
f = sftp.open(FOLDER + '/testing.txt', 'w')
|
||||
try:
|
||||
f.write('hello kitty.\n')
|
||||
f.seek(-5, f.SEEK_CUR)
|
||||
f.write('dd')
|
||||
f.close()
|
||||
|
||||
self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13)
|
||||
f = sftp.open(FOLDER + '/testing.txt', 'r')
|
||||
data = f.read(20)
|
||||
f.close()
|
||||
self.assertEqual(data, 'hello kiddy.\n')
|
||||
finally:
|
||||
sftp.remove(FOLDER + '/testing.txt')
|
||||
|
||||
def test_9_symlink(self):
|
||||
"""
|
||||
create a symlink and then check that lstat doesn't follow it.
|
||||
"""
|
||||
f = sftp.open(FOLDER + '/original.txt', 'w')
|
||||
try:
|
||||
f.write('original\n')
|
||||
f.close()
|
||||
sftp.symlink('original.txt', FOLDER + '/link.txt')
|
||||
self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt')
|
||||
|
||||
f = sftp.open(FOLDER + '/link.txt', 'r')
|
||||
self.assertEqual(f.readlines(), [ 'original\n' ])
|
||||
f.close()
|
||||
self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12)
|
||||
self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9)
|
||||
finally:
|
||||
try:
|
||||
sftp.remove(FOLDER + '/link.txt')
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
sftp.remove(FOLDER + '/original.txt')
|
||||
except:
|
||||
pass
|
Loading…
Reference in New Issue