Use 'with' for opening most file and SFTPFIle objects

This commit is contained in:
Scott Maxwell 2013-11-19 08:56:53 -08:00
parent 7471515fff
commit 2da5f1fb45
12 changed files with 272 additions and 354 deletions

View File

@ -96,7 +96,8 @@ try:
sftp.mkdir("demo_sftp_folder") sftp.mkdir("demo_sftp_folder")
except IOError: except IOError:
print('(assuming demo_sftp_folder/ already exists)') print('(assuming demo_sftp_folder/ already exists)')
sftp.open('demo_sftp_folder/README', 'w').write('This was created by demo_sftp.py.\n') with sftp.open('demo_sftp_folder/README', 'w') as f:
f.write('This was created by demo_sftp.py.\n')
with open('demo_sftp.py', 'r') as f: with open('demo_sftp.py', 'r') as f:
data = f.read() data = f.read()
sftp.open('demo_sftp_folder/demo_sftp.py', 'w').write(data) sftp.open('demo_sftp_folder/demo_sftp.py', 'w').write(data)

View File

@ -93,7 +93,7 @@ class AuthHandler (object):
self._request_auth() self._request_auth()
finally: finally:
self.transport.lock.release() self.transport.lock.release()
def auth_interactive(self, username, handler, event, submethods=''): def auth_interactive(self, username, handler, event, submethods=''):
""" """
response_list = handler(title, instructions, prompt_list) response_list = handler(title, instructions, prompt_list)
@ -108,7 +108,7 @@ class AuthHandler (object):
self._request_auth() self._request_auth()
finally: finally:
self.transport.lock.release() self.transport.lock.release()
def abort(self): def abort(self):
if self.auth_event is not None: if self.auth_event is not None:
self.auth_event.set() self.auth_event.set()

View File

@ -192,11 +192,10 @@ class SSHClient (object):
if self.known_hosts is not None: if self.known_hosts is not None:
self.load_host_keys(self.known_hosts) self.load_host_keys(self.known_hosts)
f = open(filename, 'w') with open(filename, 'w') as f:
for hostname, keys in self._host_keys.items(): for hostname, keys in self._host_keys.items():
for keytype, key in keys.items(): for keytype, key in keys.items():
f.write('%s %s %s\n' % (hostname, keytype, key.get_base64())) f.write('%s %s %s\n' % (hostname, keytype, key.get_base64()))
f.close()
def get_host_keys(self): def get_host_keys(self):
""" """

View File

@ -172,8 +172,7 @@ class HostKeys (MutableMapping):
@raise IOError: if there was an error reading the file @raise IOError: if there was an error reading the file
""" """
f = open(filename, 'r') with open(filename, 'r') as f:
try:
for lineno, line in enumerate(f): for lineno, line in enumerate(f):
line = line.strip() line = line.strip()
if (len(line) == 0) or (line[0] == '#'): if (len(line) == 0) or (line[0] == '#'):
@ -186,8 +185,6 @@ class HostKeys (MutableMapping):
e.hostnames.remove(h) e.hostnames.remove(h)
if len(e.hostnames): if len(e.hostnames):
self._entries.append(e) self._entries.append(e)
finally:
f.close()
def save(self, filename): def save(self, filename):
""" """
@ -203,14 +200,11 @@ class HostKeys (MutableMapping):
@since: 1.6.1 @since: 1.6.1
""" """
f = open(filename, 'w') with open(filename, 'w') as f:
try:
for e in self._entries: for e in self._entries:
line = e.to_line() line = e.to_line()
if line: if line:
f.write(line) f.write(line)
finally:
f.close()
def lookup(self, hostname): def lookup(self, hostname):
""" """

View File

@ -282,11 +282,8 @@ class PKey (object):
encrypted, and C{password} is C{None}. encrypted, and C{password} is C{None}.
@raise SSHException: if the key file is invalid. @raise SSHException: if the key file is invalid.
""" """
f = open(filename, 'r') with open(filename, 'r') as f:
try:
data = self._read_private_key(tag, f, password) data = self._read_private_key(tag, f, password)
finally:
f.close()
return data return data
def _read_private_key(self, tag, f, password=None): def _read_private_key(self, tag, f, password=None):
@ -354,13 +351,10 @@ class PKey (object):
@raise IOError: if there was an error writing the file. @raise IOError: if there was an error writing the file.
""" """
f = open(filename, 'w', o600) with open(filename, 'w', o600) as f:
try:
# grrr... the mode doesn't always take hold # grrr... the mode doesn't always take hold
os.chmod(filename, o600) os.chmod(filename, o600)
self._write_private_key(tag, f, data, password) self._write_private_key(tag, f, data, password)
finally:
f.close()
def _write_private_key(self, tag, f, data, password=None): def _write_private_key(self, tag, f, data, password=None):
f.write('-----BEGIN %s PRIVATE KEY-----\n' % tag) f.write('-----BEGIN %s PRIVATE KEY-----\n' % tag)

View File

@ -113,8 +113,7 @@ class ModulusPack (object):
@raise IOError: passed from any file operations that fail. @raise IOError: passed from any file operations that fail.
""" """
self.pack = {} self.pack = {}
f = open(filename, 'r') with open(filename, 'r') as f:
try:
for line in f: for line in f:
line = line.strip() line = line.strip()
if (len(line) == 0) or (line[0] == '#'): if (len(line) == 0) or (line[0] == '#'):
@ -123,8 +122,6 @@ class ModulusPack (object):
self._parse_modulus(line) self._parse_modulus(line)
except: except:
continue continue
finally:
f.close()
def get_modulus(self, min, prefer, max): def get_modulus(self, min, prefer, max):
bitsizes = sorted(self.pack.keys()) bitsizes = sorted(self.pack.keys())

View File

@ -561,10 +561,9 @@ class SFTPClient (BaseSFTP):
@since: 1.4 @since: 1.4
""" """
fr = self.file(remotepath, 'wb') with self.file(remotepath, 'wb') as fr:
fr.set_pipelined(True) fr.set_pipelined(True)
size = 0 size = 0
try:
while True: while True:
data = fl.read(32768) data = fl.read(32768)
fr.write(data) fr.write(data)
@ -573,8 +572,6 @@ class SFTPClient (BaseSFTP):
callback(size, file_size) callback(size, file_size)
if len(data) == 0: if len(data) == 0:
break break
finally:
fr.close()
if confirm: if confirm:
s = self.stat(remotepath) s = self.stat(remotepath)
if s.st_size != size: if s.st_size != size:
@ -610,11 +607,8 @@ class SFTPClient (BaseSFTP):
@since: 1.4 @since: 1.4
""" """
file_size = os.stat(localpath).st_size file_size = os.stat(localpath).st_size
fl = open(localpath, 'rb') with open(localpath, 'rb') as fl:
try:
return self.putfo(fl, remotepath, os.stat(localpath).st_size, callback, confirm) return self.putfo(fl, remotepath, os.stat(localpath).st_size, callback, confirm)
finally:
fl.close()
def getfo(self, remotepath, fl, callback=None): def getfo(self, remotepath, fl, callback=None):
""" """
@ -636,10 +630,9 @@ class SFTPClient (BaseSFTP):
@since: 1.4 @since: 1.4
""" """
fr = self.open(remotepath, 'rb') with self.open(remotepath, 'rb') as fr:
file_size = self.stat(remotepath).st_size file_size = self.stat(remotepath).st_size
fr.prefetch() fr.prefetch()
try:
size = 0 size = 0
while True: while True:
data = fr.read(32768) data = fr.read(32768)
@ -649,8 +642,6 @@ class SFTPClient (BaseSFTP):
callback(size, file_size) callback(size, file_size)
if len(data) == 0: if len(data) == 0:
break break
finally:
fr.close()
return size return size
def get(self, remotepath, localpath, callback=None): def get(self, remotepath, localpath, callback=None):
@ -671,11 +662,8 @@ class SFTPClient (BaseSFTP):
@since: 1.4 @since: 1.4
""" """
file_size = self.stat(remotepath).st_size file_size = self.stat(remotepath).st_size
fl = open(localpath, 'wb') with open(localpath, 'wb') as fl:
try:
size = self.getfo(remotepath, fl, callback) size = self.getfo(remotepath, fl, callback)
finally:
fl.close()
s = os.stat(localpath) s = os.stat(localpath)
if s.st_size != size: if s.st_size != size:
raise IOError('size mismatch in get! %d != %d' % (s.st_size, size)) raise IOError('size mismatch in get! %d != %d' % (s.st_size, size))

View File

@ -166,11 +166,8 @@ class SFTPServer (BaseSFTP, SubsystemHandler):
if attr._flags & attr.FLAG_AMTIME: if attr._flags & attr.FLAG_AMTIME:
os.utime(filename, (attr.st_atime, attr.st_mtime)) os.utime(filename, (attr.st_atime, attr.st_mtime))
if attr._flags & attr.FLAG_SIZE: if attr._flags & attr.FLAG_SIZE:
f = open(filename, 'w+') with open(filename, 'w+') as f:
try:
f.truncate(attr.st_size) f.truncate(attr.st_size)
finally:
f.close()
set_file_attr = staticmethod(set_file_attr) set_file_attr = staticmethod(set_file_attr)

View File

@ -56,9 +56,8 @@ Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg\
class HostKeysTest (unittest.TestCase): class HostKeysTest (unittest.TestCase):
def setUp(self): def setUp(self):
f = open('hostfile.temp', 'w') with open('hostfile.temp', 'w') as f:
f.write(test_hosts_file) f.write(test_hosts_file)
f.close()
def tearDown(self): def tearDown(self):
os.unlink('hostfile.temp') os.unlink('hostfile.temp')

View File

@ -186,10 +186,9 @@ class SFTPTest (unittest.TestCase):
""" """
verify that a file can be created and written, and the size is correct. verify that a file can be created and written, and the size is correct.
""" """
f = sftp.open(FOLDER + '/duck.txt', 'w')
try: try:
f.write(ARTICLE) with sftp.open(FOLDER + '/duck.txt', 'w') as f:
f.close() f.write(ARTICLE)
self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
finally: finally:
sftp.remove(FOLDER + '/duck.txt') sftp.remove(FOLDER + '/duck.txt')
@ -209,40 +208,36 @@ class SFTPTest (unittest.TestCase):
""" """
verify that a file can be opened for append, and tell() still works. verify that a file can be opened for append, and tell() still works.
""" """
f = sftp.open(FOLDER + '/append.txt', 'w')
try: try:
f.write('first line\nsecond line\n') with sftp.open(FOLDER + '/append.txt', 'w') as f:
self.assertEqual(f.tell(), 23) f.write('first line\nsecond line\n')
f.close() self.assertEqual(f.tell(), 23)
f = sftp.open(FOLDER + '/append.txt', 'a+') with sftp.open(FOLDER + '/append.txt', 'a+') as f:
f.write('third line!!!\n') f.write('third line!!!\n')
self.assertEqual(f.tell(), 37) self.assertEqual(f.tell(), 37)
self.assertEqual(f.stat().st_size, 37) self.assertEqual(f.stat().st_size, 37)
f.seek(-26, f.SEEK_CUR) f.seek(-26, f.SEEK_CUR)
self.assertEqual(f.readline(), 'second line\n') self.assertEqual(f.readline(), 'second line\n')
finally: finally:
f.close()
sftp.remove(FOLDER + '/append.txt') sftp.remove(FOLDER + '/append.txt')
def test_5_rename(self): def test_5_rename(self):
""" """
verify that renaming a file works. verify that renaming a file works.
""" """
f = sftp.open(FOLDER + '/first.txt', 'w')
try: try:
f.write('content!\n') with sftp.open(FOLDER + '/first.txt', 'w') as f:
f.close() f.write('content!\n')
sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt') sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt')
try: try:
f = sftp.open(FOLDER + '/first.txt', 'r') f = sftp.open(FOLDER + '/first.txt', 'r')
self.assertTrue(False, 'no exception on reading nonexistent file') self.assertTrue(False, 'no exception on reading nonexistent file')
except IOError: except IOError:
pass pass
f = sftp.open(FOLDER + '/second.txt', 'r') with sftp.open(FOLDER + '/second.txt', 'r') as f:
f.seek(-6, f.SEEK_END) f.seek(-6, f.SEEK_END)
self.assertEqual(u(f.read(4)), 'tent') self.assertEqual(u(f.read(4)), 'tent')
f.close()
finally: finally:
try: try:
sftp.remove(FOLDER + '/first.txt') sftp.remove(FOLDER + '/first.txt')
@ -300,10 +295,9 @@ class SFTPTest (unittest.TestCase):
""" """
verify that the setstat functions (chown, chmod, utime, truncate) work. verify that the setstat functions (chown, chmod, utime, truncate) work.
""" """
f = sftp.open(FOLDER + '/special', 'w')
try: try:
f.write('x' * 1024) with sftp.open(FOLDER + '/special', 'w') as f:
f.close() f.write('x' * 1024)
stat = sftp.stat(FOLDER + '/special') stat = sftp.stat(FOLDER + '/special')
sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600) sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600)
@ -339,40 +333,38 @@ class SFTPTest (unittest.TestCase):
verify that the fsetstat functions (chown, chmod, utime, truncate) verify that the fsetstat functions (chown, chmod, utime, truncate)
work on open files. work on open files.
""" """
f = sftp.open(FOLDER + '/special', 'w')
try: try:
f.write('x' * 1024) with sftp.open(FOLDER + '/special', 'w') as f:
f.close() f.write('x' * 1024)
f = sftp.open(FOLDER + '/special', 'r+') with sftp.open(FOLDER + '/special', 'r+') as f:
stat = f.stat() stat = f.stat()
f.chmod((stat.st_mode & ~o777) | o600) f.chmod((stat.st_mode & ~o777) | o600)
stat = f.stat() stat = f.stat()
expected_mode = o600 expected_mode = o600
if sys.platform == 'win32': if sys.platform == 'win32':
# chmod not really functional on windows # chmod not really functional on windows
expected_mode = o666 expected_mode = o666
if sys.platform == 'cygwin': if sys.platform == 'cygwin':
# even worse. # even worse.
expected_mode = o644 expected_mode = o644
self.assertEqual(stat.st_mode & o777, expected_mode) self.assertEqual(stat.st_mode & o777, expected_mode)
self.assertEqual(stat.st_size, 1024) self.assertEqual(stat.st_size, 1024)
mtime = stat.st_mtime - 3600 mtime = stat.st_mtime - 3600
atime = stat.st_atime - 1800 atime = stat.st_atime - 1800
f.utime((atime, mtime)) f.utime((atime, mtime))
stat = f.stat() stat = f.stat()
self.assertEqual(stat.st_mtime, mtime) self.assertEqual(stat.st_mtime, mtime)
if sys.platform not in ('win32', 'cygwin'): if sys.platform not in ('win32', 'cygwin'):
self.assertEqual(stat.st_atime, atime) self.assertEqual(stat.st_atime, atime)
# can't really test chown, since we'd have to know a valid uid. # can't really test chown, since we'd have to know a valid uid.
f.truncate(512) f.truncate(512)
stat = f.stat() stat = f.stat()
self.assertEqual(stat.st_size, 512) self.assertEqual(stat.st_size, 512)
f.close()
finally: finally:
sftp.remove(FOLDER + '/special') sftp.remove(FOLDER + '/special')
@ -384,25 +376,23 @@ class SFTPTest (unittest.TestCase):
buffering is reset on 'seek'. buffering is reset on 'seek'.
""" """
try: try:
f = sftp.open(FOLDER + '/duck.txt', 'w') with sftp.open(FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE) f.write(ARTICLE)
f.close()
f = sftp.open(FOLDER + '/duck.txt', 'r+') with sftp.open(FOLDER + '/duck.txt', 'r+') as f:
line_number = 0 line_number = 0
loc = 0 loc = 0
pos_list = [] pos_list = []
for line in f: for line in f:
line_number += 1 line_number += 1
pos_list.append(loc) pos_list.append(loc)
loc = f.tell() loc = f.tell()
f.seek(pos_list[6], f.SEEK_SET) f.seek(pos_list[6], f.SEEK_SET)
self.assertEqual(f.readline(), 'Nouzilly, France.\n') self.assertEqual(f.readline(), 'Nouzilly, France.\n')
f.seek(pos_list[17], f.SEEK_SET) f.seek(pos_list[17], f.SEEK_SET)
self.assertEqual(f.readline()[:4], 'duck') self.assertEqual(f.readline()[:4], 'duck')
f.seek(pos_list[10], f.SEEK_SET) f.seek(pos_list[10], f.SEEK_SET)
self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n') self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n')
f.close()
finally: finally:
sftp.remove(FOLDER + '/duck.txt') sftp.remove(FOLDER + '/duck.txt')
@ -411,17 +401,15 @@ class SFTPTest (unittest.TestCase):
create a text file, seek back and change part of it, and verify that the create a text file, seek back and change part of it, and verify that the
changes worked. changes worked.
""" """
f = sftp.open(FOLDER + '/testing.txt', 'w')
try: try:
f.write('hello kitty.\n') with sftp.open(FOLDER + '/testing.txt', 'w') as f:
f.seek(-5, f.SEEK_CUR) f.write('hello kitty.\n')
f.write('dd') f.seek(-5, f.SEEK_CUR)
f.close() f.write('dd')
self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13) self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13)
f = sftp.open(FOLDER + '/testing.txt', 'r') with sftp.open(FOLDER + '/testing.txt', 'r') as f:
data = f.read(20) data = f.read(20)
f.close()
self.assertEqual(data, 'hello kiddy.\n') self.assertEqual(data, 'hello kiddy.\n')
finally: finally:
sftp.remove(FOLDER + '/testing.txt') sftp.remove(FOLDER + '/testing.txt')
@ -434,16 +422,14 @@ class SFTPTest (unittest.TestCase):
# skip symlink tests on windows # skip symlink tests on windows
return return
f = sftp.open(FOLDER + '/original.txt', 'w')
try: try:
f.write('original\n') with sftp.open(FOLDER + '/original.txt', 'w') as f:
f.close() f.write('original\n')
sftp.symlink('original.txt', FOLDER + '/link.txt') sftp.symlink('original.txt', FOLDER + '/link.txt')
self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt') self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt')
f = sftp.open(FOLDER + '/link.txt', 'r') with sftp.open(FOLDER + '/link.txt', 'r') as f:
self.assertEqual(f.readlines(), ['original\n']) self.assertEqual(f.readlines(), ['original\n'])
f.close()
cwd = sftp.normalize('.') cwd = sftp.normalize('.')
if cwd[-1] == '/': if cwd[-1] == '/':
@ -477,18 +463,16 @@ class SFTPTest (unittest.TestCase):
""" """
verify that buffered writes are automatically flushed on seek. verify that buffered writes are automatically flushed on seek.
""" """
f = sftp.open(FOLDER + '/happy.txt', 'w', 1)
try: try:
f.write('full line.\n') with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f:
f.write('partial') f.write('full line.\n')
f.seek(9, f.SEEK_SET) f.write('partial')
f.write('?\n') f.seek(9, f.SEEK_SET)
f.close() f.write('?\n')
f = sftp.open(FOLDER + '/happy.txt', 'r') with sftp.open(FOLDER + '/happy.txt', 'r') as f:
self.assertEqual(f.readline(), 'full line?\n') self.assertEqual(f.readline(), 'full line?\n')
self.assertEqual(f.read(7), 'partial') self.assertEqual(f.read(7), 'partial')
f.close()
finally: finally:
try: try:
sftp.remove(FOLDER + '/happy.txt') sftp.remove(FOLDER + '/happy.txt')
@ -544,9 +528,8 @@ class SFTPTest (unittest.TestCase):
self.assertEqual(['beta'], sftp.listdir('.')) self.assertEqual(['beta'], sftp.listdir('.'))
sftp.chdir('beta') sftp.chdir('beta')
f = sftp.open('fish', 'w') with sftp.open('fish', 'w') as f:
f.write('hello\n') f.write('hello\n')
f.close()
sftp.chdir('..') sftp.chdir('..')
self.assertEqual(['fish'], sftp.listdir('beta')) self.assertEqual(['fish'], sftp.listdir('beta'))
sftp.chdir('..') sftp.chdir('..')
@ -575,17 +558,15 @@ class SFTPTest (unittest.TestCase):
fd, localname = mkstemp() fd, localname = mkstemp()
os.close(fd) os.close(fd)
text = b('All I wanted was a plastic bunny rabbit.\n') text = b('All I wanted was a plastic bunny rabbit.\n')
f = open(localname, 'wb') with open(localname, 'wb') as f:
f.write(text) f.write(text)
f.close()
saved_progress = [] saved_progress = []
def progress_callback(x, y): def progress_callback(x, y):
saved_progress.append((x, y)) saved_progress.append((x, y))
sftp.put(localname, FOLDER + '/bunny.txt', progress_callback) sftp.put(localname, FOLDER + '/bunny.txt', progress_callback)
f = sftp.open(FOLDER + '/bunny.txt', 'rb') with sftp.open(FOLDER + '/bunny.txt', 'rb') as f:
self.assertEqual(text, f.read(128)) self.assertEqual(text, f.read(128))
f.close()
self.assertEqual((41, 41), saved_progress[-1]) self.assertEqual((41, 41), saved_progress[-1])
os.unlink(localname) os.unlink(localname)
@ -594,9 +575,8 @@ class SFTPTest (unittest.TestCase):
saved_progress = [] saved_progress = []
sftp.get(FOLDER + '/bunny.txt', localname, progress_callback) sftp.get(FOLDER + '/bunny.txt', localname, progress_callback)
f = open(localname, 'rb') with open(localname, 'rb') as f:
self.assertEqual(text, f.read(128)) self.assertEqual(text, f.read(128))
f.close()
self.assertEqual((41, 41), saved_progress[-1]) self.assertEqual((41, 41), saved_progress[-1])
os.unlink(localname) os.unlink(localname)
@ -608,20 +588,18 @@ class SFTPTest (unittest.TestCase):
(it's an sftp extension that we support, and may be the only ones who (it's an sftp extension that we support, and may be the only ones who
support it.) support it.)
""" """
f = sftp.open(FOLDER + '/kitty.txt', 'w') with sftp.open(FOLDER + '/kitty.txt', 'w') as f:
f.write('here kitty kitty' * 64) f.write('here kitty kitty' * 64)
f.close()
try: try:
f = sftp.open(FOLDER + '/kitty.txt', 'r') with sftp.open(FOLDER + '/kitty.txt', 'r') as f:
sum = f.check('sha1') sum = f.check('sha1')
self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper()) self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper())
sum = f.check('md5', 0, 512) sum = f.check('md5', 0, 512)
self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper()) self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper())
sum = f.check('md5', 0, 0, 510) sum = f.check('md5', 0, 0, 510)
self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6', self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6',
u(hexlify(sum)).upper()) u(hexlify(sum)).upper())
f.close()
finally: finally:
sftp.unlink(FOLDER + '/kitty.txt') sftp.unlink(FOLDER + '/kitty.txt')
@ -645,9 +623,8 @@ class SFTPTest (unittest.TestCase):
""" """
verify that unicode strings are encoded into utf8 correctly. verify that unicode strings are encoded into utf8 correctly.
""" """
f = sftp.open(FOLDER + '/something', 'w') with sftp.open(FOLDER + '/something', 'w') as f:
f.write('okay') f.write('okay')
f.close()
try: try:
sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder) sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder)
@ -660,9 +637,8 @@ class SFTPTest (unittest.TestCase):
sftp.mkdir(FOLDER + '/' + unicode_folder) sftp.mkdir(FOLDER + '/' + unicode_folder)
try: try:
sftp.chdir(FOLDER + '/' + unicode_folder) sftp.chdir(FOLDER + '/' + unicode_folder)
f = sftp.open('something', 'w') with sftp.open('something', 'w') as f:
f.write('okay') f.write('okay')
f.close()
sftp.unlink('something') sftp.unlink('something')
finally: finally:
sftp.chdir() sftp.chdir()
@ -675,14 +651,12 @@ class SFTPTest (unittest.TestCase):
f = sftp.open(FOLDER + '/zero', 'w') f = sftp.open(FOLDER + '/zero', 'w')
f.close() f.close()
try: try:
f = sftp.open(FOLDER + '/zero', 'r') with sftp.open(FOLDER + '/zero', 'r') as f:
f.readv([(0, 12)]) f.readv([(0, 12)])
f.close()
f = sftp.open(FOLDER + '/zero', 'r') with sftp.open(FOLDER + '/zero', 'r') as f:
f.prefetch() f.prefetch()
f.read(100) f.read(100)
f.close()
finally: finally:
sftp.unlink(FOLDER + '/zero') sftp.unlink(FOLDER + '/zero')
@ -695,9 +669,8 @@ class SFTPTest (unittest.TestCase):
fd, localname = mkstemp() fd, localname = mkstemp()
os.close(fd) os.close(fd)
text = 'All I wanted was a plastic bunny rabbit.\n' text = 'All I wanted was a plastic bunny rabbit.\n'
f = open(localname, 'w') with open(localname, 'w') as f:
f.write(text) f.write(text)
f.close()
saved_progress = [] saved_progress = []
def progress_callback(x, y): def progress_callback(x, y):
saved_progress.append((x, y)) saved_progress.append((x, y))
@ -705,9 +678,8 @@ class SFTPTest (unittest.TestCase):
self.assertEqual(SFTPAttributes().attr, res.attr) self.assertEqual(SFTPAttributes().attr, res.attr)
f = sftp.open(FOLDER + '/bunny.txt', 'r') with sftp.open(FOLDER + '/bunny.txt', 'r') as f:
self.assertEqual(text, f.read(128)) self.assertEqual(text, f.read(128))
f.close()
self.assertEqual((41, 41), saved_progress[-1]) self.assertEqual((41, 41), saved_progress[-1])
os.unlink(localname) os.unlink(localname)
@ -719,19 +691,17 @@ class SFTPTest (unittest.TestCase):
does not work except through paramiko. :( openssh fails. does not work except through paramiko. :( openssh fails.
""" """
f = sftp.open(FOLDER + '/append.txt', 'a')
try: try:
f.write('first line\nsecond line\n') with sftp.open(FOLDER + '/append.txt', 'a') as f:
f.seek(11, f.SEEK_SET) f.write('first line\nsecond line\n')
f.write('third line\n') f.seek(11, f.SEEK_SET)
f.close() f.write('third line\n')
f = sftp.open(FOLDER + '/append.txt', 'r') with sftp.open(FOLDER + '/append.txt', 'r') as f:
self.assertEqual(f.stat().st_size, 34) self.assertEqual(f.stat().st_size, 34)
self.assertEqual(f.readline(), 'first line\n') self.assertEqual(f.readline(), 'first line\n')
self.assertEqual(f.readline(), 'second line\n') self.assertEqual(f.readline(), 'second line\n')
self.assertEqual(f.readline(), 'third line\n') self.assertEqual(f.readline(), 'third line\n')
f.close()
finally: finally:
sftp.remove(FOLDER + '/append.txt') sftp.remove(FOLDER + '/append.txt')

View File

@ -66,9 +66,8 @@ class BigSFTPTest (unittest.TestCase):
numfiles = 100 numfiles = 100
try: try:
for i in range(numfiles): for i in range(numfiles):
f = sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) with sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) as f:
f.write('this is file #%d.\n' % i) f.write('this is file #%d.\n' % i)
f.close()
sftp.chmod('%s/file%d.txt' % (FOLDER, i), o660) sftp.chmod('%s/file%d.txt' % (FOLDER, i), o660)
# now make sure every file is there, by creating a list of filenmes # now make sure every file is there, by creating a list of filenmes
@ -76,9 +75,8 @@ class BigSFTPTest (unittest.TestCase):
numlist = list(range(numfiles)) numlist = list(range(numfiles))
while len(numlist) > 0: while len(numlist) > 0:
r = numlist[random.randint(0, len(numlist) - 1)] r = numlist[random.randint(0, len(numlist) - 1)]
f = sftp.open('%s/file%d.txt' % (FOLDER, r)) with sftp.open('%s/file%d.txt' % (FOLDER, r)) as f:
self.assertEqual(f.readline(), 'this is file #%d.\n' % r) self.assertEqual(f.readline(), 'this is file #%d.\n' % r)
f.close()
numlist.remove(r) numlist.remove(r)
finally: finally:
for i in range(numfiles): for i in range(numfiles):
@ -95,12 +93,11 @@ class BigSFTPTest (unittest.TestCase):
kblob = (1024 * 'x') kblob = (1024 * 'x')
start = time.time() start = time.time()
try: try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w') with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f:
for n in range(1024): for n in range(1024):
f.write(kblob) f.write(kblob)
if n % 128 == 0: if n % 128 == 0:
sys.stderr.write('.') sys.stderr.write('.')
f.close()
sys.stderr.write(' ') sys.stderr.write(' ')
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
@ -108,11 +105,10 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('%ds ' % round(end - start)) sys.stderr.write('%ds ' % round(end - start))
start = time.time() start = time.time()
f = sftp.open('%s/hongry.txt' % FOLDER, 'r') with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
for n in range(1024): for n in range(1024):
data = f.read(1024) data = f.read(1024)
self.assertEqual(data, kblob) self.assertEqual(data, kblob)
f.close()
end = time.time() end = time.time()
sys.stderr.write('%ds ' % round(end - start)) sys.stderr.write('%ds ' % round(end - start))
@ -127,13 +123,12 @@ class BigSFTPTest (unittest.TestCase):
kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
start = time.time() start = time.time()
try: try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'wb') with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
f.set_pipelined(True) f.set_pipelined(True)
for n in range(1024): for n in range(1024):
f.write(kblob) f.write(kblob)
if n % 128 == 0: if n % 128 == 0:
sys.stderr.write('.') sys.stderr.write('.')
f.close()
sys.stderr.write(' ') sys.stderr.write(' ')
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
@ -141,22 +136,21 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('%ds ' % round(end - start)) sys.stderr.write('%ds ' % round(end - start))
start = time.time() start = time.time()
f = sftp.open('%s/hongry.txt' % FOLDER, 'rb') with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
f.prefetch() f.prefetch()
# read on odd boundaries to make sure the bytes aren't getting scrambled # read on odd boundaries to make sure the bytes aren't getting scrambled
n = 0 n = 0
k2blob = kblob + kblob k2blob = kblob + kblob
chunk = 629 chunk = 629
size = 1024 * 1024 size = 1024 * 1024
while n < size: while n < size:
if n + chunk > size: if n + chunk > size:
chunk = size - n chunk = size - n
data = f.read(chunk) data = f.read(chunk)
offset = n % 1024 offset = n % 1024
self.assertEqual(data, k2blob[offset:offset + chunk]) self.assertEqual(data, k2blob[offset:offset + chunk])
n += chunk n += chunk
f.close()
end = time.time() end = time.time()
sys.stderr.write('%ds ' % round(end - start)) sys.stderr.write('%ds ' % round(end - start))
@ -167,13 +161,12 @@ class BigSFTPTest (unittest.TestCase):
sftp = get_sftp() sftp = get_sftp()
kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try: try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'wb') with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
f.set_pipelined(True) f.set_pipelined(True)
for n in range(1024): for n in range(1024):
f.write(kblob) f.write(kblob)
if n % 128 == 0: if n % 128 == 0:
sys.stderr.write('.') sys.stderr.write('.')
f.close()
sys.stderr.write(' ') sys.stderr.write(' ')
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
@ -182,20 +175,19 @@ class BigSFTPTest (unittest.TestCase):
k2blob = kblob + kblob k2blob = kblob + kblob
chunk = 793 chunk = 793
for i in range(10): for i in range(10):
f = sftp.open('%s/hongry.txt' % FOLDER, 'rb') with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
f.prefetch() f.prefetch()
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000) base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
offsets = [base_offset + j * chunk for j in range(100)] offsets = [base_offset + j * chunk for j in range(100)]
# randomly seek around and read them out # randomly seek around and read them out
for j in range(100): for j in range(100):
offset = offsets[random.randint(0, len(offsets) - 1)] offset = offsets[random.randint(0, len(offsets) - 1)]
offsets.remove(offset) offsets.remove(offset)
f.seek(offset) f.seek(offset)
data = f.read(chunk) data = f.read(chunk)
n_offset = offset % 1024 n_offset = offset % 1024
self.assertEqual(data, k2blob[n_offset:n_offset + chunk]) self.assertEqual(data, k2blob[n_offset:n_offset + chunk])
offset += chunk offset += chunk
f.close()
end = time.time() end = time.time()
sys.stderr.write('%ds ' % round(end - start)) sys.stderr.write('%ds ' % round(end - start))
finally: finally:
@ -205,13 +197,12 @@ class BigSFTPTest (unittest.TestCase):
sftp = get_sftp() sftp = get_sftp()
kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try: try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'wb') with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
f.set_pipelined(True) f.set_pipelined(True)
for n in range(1024): for n in range(1024):
f.write(kblob) f.write(kblob)
if n % 128 == 0: if n % 128 == 0:
sys.stderr.write('.') sys.stderr.write('.')
f.close()
sys.stderr.write(' ') sys.stderr.write(' ')
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
@ -220,21 +211,20 @@ class BigSFTPTest (unittest.TestCase):
k2blob = kblob + kblob k2blob = kblob + kblob
chunk = 793 chunk = 793
for i in range(10): for i in range(10):
f = sftp.open('%s/hongry.txt' % FOLDER, 'rb') with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000) base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
# make a bunch of offsets and put them in random order # make a bunch of offsets and put them in random order
offsets = [base_offset + j * chunk for j in range(100)] offsets = [base_offset + j * chunk for j in range(100)]
readv_list = [] readv_list = []
for j in range(100): for j in range(100):
o = offsets[random.randint(0, len(offsets) - 1)] o = offsets[random.randint(0, len(offsets) - 1)]
offsets.remove(o) offsets.remove(o)
readv_list.append((o, chunk)) readv_list.append((o, chunk))
ret = f.readv(readv_list) ret = f.readv(readv_list)
for i in range(len(readv_list)): for i in range(len(readv_list)):
offset = readv_list[i][0] offset = readv_list[i][0]
n_offset = offset % 1024 n_offset = offset % 1024
self.assertEqual(next(ret), k2blob[n_offset:n_offset + chunk]) self.assertEqual(next(ret), k2blob[n_offset:n_offset + chunk])
f.close()
end = time.time() end = time.time()
sys.stderr.write('%ds ' % round(end - start)) sys.stderr.write('%ds ' % round(end - start))
finally: finally:
@ -248,29 +238,26 @@ class BigSFTPTest (unittest.TestCase):
sftp = get_sftp() sftp = get_sftp()
kblob = (1024 * 'x') kblob = (1024 * 'x')
try: try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w') with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f:
f.set_pipelined(True) f.set_pipelined(True)
for n in range(1024): for n in range(1024):
f.write(kblob) f.write(kblob)
if n % 128 == 0: if n % 128 == 0:
sys.stderr.write('.') sys.stderr.write('.')
f.close()
sys.stderr.write(' ') sys.stderr.write(' ')
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
for i in range(10): for i in range(10):
f = sftp.open('%s/hongry.txt' % FOLDER, 'r') with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
f.prefetch()
with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
f.prefetch() f.prefetch()
f.close() for n in range(1024):
f = sftp.open('%s/hongry.txt' % FOLDER, 'r') data = f.read(1024)
f.prefetch() self.assertEqual(data, kblob)
for n in range(1024): if n % 128 == 0:
data = f.read(1024) sys.stderr.write('.')
self.assertEqual(data, kblob)
if n % 128 == 0:
sys.stderr.write('.')
f.close()
sys.stderr.write(' ') sys.stderr.write(' ')
finally: finally:
sftp.remove('%s/hongry.txt' % FOLDER) sftp.remove('%s/hongry.txt' % FOLDER)
@ -282,33 +269,31 @@ class BigSFTPTest (unittest.TestCase):
sftp = get_sftp() sftp = get_sftp()
kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try: try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'wb') with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
f.set_pipelined(True) f.set_pipelined(True)
for n in range(1024): for n in range(1024):
f.write(kblob) f.write(kblob)
if n % 128 == 0: if n % 128 == 0:
sys.stderr.write('.') sys.stderr.write('.')
f.close()
sys.stderr.write(' ') sys.stderr.write(' ')
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
f = sftp.open('%s/hongry.txt' % FOLDER, 'rb') with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
f.prefetch() f.prefetch()
data = f.read(1024) data = f.read(1024)
self.assertEqual(data, kblob) self.assertEqual(data, kblob)
chunk_size = 793 chunk_size = 793
base_offset = 512 * 1024 base_offset = 512 * 1024
k2blob = kblob + kblob k2blob = kblob + kblob
chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)] chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)]
for data in f.readv(chunks): for data in f.readv(chunks):
offset = base_offset % 1024 offset = base_offset % 1024
self.assertEqual(chunk_size, len(data)) self.assertEqual(chunk_size, len(data))
self.assertEqual(k2blob[offset:offset + chunk_size], data) self.assertEqual(k2blob[offset:offset + chunk_size], data)
base_offset += chunk_size base_offset += chunk_size
f.close()
sys.stderr.write(' ') sys.stderr.write(' ')
finally: finally:
sftp.remove('%s/hongry.txt' % FOLDER) sftp.remove('%s/hongry.txt' % FOLDER)
@ -321,24 +306,22 @@ class BigSFTPTest (unittest.TestCase):
sftp = get_sftp() sftp = get_sftp()
kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try: try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'wb') with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
f.set_pipelined(True) f.set_pipelined(True)
for n in range(1024): for n in range(1024):
f.write(kblob) f.write(kblob)
if n % 128 == 0: if n % 128 == 0:
sys.stderr.write('.') sys.stderr.write('.')
f.close()
sys.stderr.write(' ') sys.stderr.write(' ')
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
f = sftp.open('%s/hongry.txt' % FOLDER, 'rb') with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
data = list(f.readv([(23 * 1024, 128 * 1024)])) data = list(f.readv([(23 * 1024, 128 * 1024)]))
self.assertEqual(1, len(data)) self.assertEqual(1, len(data))
data = data[0] data = data[0]
self.assertEqual(128 * 1024, len(data)) self.assertEqual(128 * 1024, len(data))
f.close()
sys.stderr.write(' ') sys.stderr.write(' ')
finally: finally:
sftp.remove('%s/hongry.txt' % FOLDER) sftp.remove('%s/hongry.txt' % FOLDER)
@ -350,9 +333,8 @@ class BigSFTPTest (unittest.TestCase):
sftp = get_sftp() sftp = get_sftp()
mblob = (1024 * 1024 * 'x') mblob = (1024 * 1024 * 'x')
try: try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f:
f.write(mblob) f.write(mblob)
f.close()
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
finally: finally:
@ -367,21 +349,19 @@ class BigSFTPTest (unittest.TestCase):
t.packetizer.REKEY_BYTES = 512 * 1024 t.packetizer.REKEY_BYTES = 512 * 1024
k32blob = (32 * 1024 * 'x') k32blob = (32 * 1024 * 'x')
try: try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f:
for i in range(32): for i in range(32):
f.write(k32blob) f.write(k32blob)
f.close()
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
self.assertNotEqual(t.H, t.session_id) self.assertNotEqual(t.H, t.session_id)
# try to read it too. # try to read it too.
f = sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) with sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) as f:
f.prefetch() f.prefetch()
total = 0 total = 0
while total < 1024 * 1024: while total < 1024 * 1024:
total += len(f.read(32 * 1024)) total += len(f.read(32 * 1024))
f.close()
finally: finally:
sftp.remove('%s/hongry.txt' % FOLDER) sftp.remove('%s/hongry.txt' % FOLDER)
t.packetizer.REKEY_BYTES = pow(2, 30) t.packetizer.REKEY_BYTES = pow(2, 30)

View File

@ -142,9 +142,8 @@ class UtilTest(ParamikoTest):
self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b') self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
def test_5_host_keys(self): def test_5_host_keys(self):
f = open('hostfile.temp', 'w') with open('hostfile.temp', 'w') as f:
f.write(test_hosts_file) f.write(test_hosts_file)
f.close()
try: try:
hostdict = paramiko.util.load_host_keys('hostfile.temp') hostdict = paramiko.util.load_host_keys('hostfile.temp')
self.assertEqual(2, len(hostdict)) self.assertEqual(2, len(hostdict))