document readv, fix thinko, and add a readv unit test
This commit is contained in:
parent
ece62093bd
commit
82d8ca6e11
|
@ -364,6 +364,20 @@ class SFTPFile (BufferedFile):
|
||||||
self._start_prefetch(chunks)
|
self._start_prefetch(chunks)
|
||||||
|
|
||||||
def readv(self, chunks):
|
def readv(self, chunks):
|
||||||
|
"""
|
||||||
|
Read a set of blocks from the file by (offset, length). This is more
|
||||||
|
efficient than doing a series of L{seek} and L{read} calls, since the
|
||||||
|
prefetch machinery is used to retrieve all the requested blocks at
|
||||||
|
once.
|
||||||
|
|
||||||
|
@param chunks: a list of (offset, length) tuples indicating which
|
||||||
|
sections of the file to read
|
||||||
|
@ptype chunks: list(tuple(long, int))
|
||||||
|
@return: a list of blocks read, in the same order as in C{chunks}
|
||||||
|
@rtype: list(str)
|
||||||
|
|
||||||
|
@since: 1.5.4
|
||||||
|
"""
|
||||||
# put the offsets in order, since we depend on that for determining
|
# put the offsets in order, since we depend on that for determining
|
||||||
# when the reads have finished.
|
# when the reads have finished.
|
||||||
ordered_chunks = chunks[:]
|
ordered_chunks = chunks[:]
|
||||||
|
@ -391,16 +405,16 @@ class SFTPFile (BufferedFile):
|
||||||
self._prefetch_done = False
|
self._prefetch_done = False
|
||||||
self._prefetch_so_far = chunks[0][0]
|
self._prefetch_so_far = chunks[0][0]
|
||||||
self._prefetch_data = {}
|
self._prefetch_data = {}
|
||||||
self._prefetch_reads = chunks
|
self._prefetch_reads = chunks[:]
|
||||||
|
|
||||||
t = threading.Thread(target=self._prefetch_thread)
|
t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
|
||||||
t.setDaemon(True)
|
t.setDaemon(True)
|
||||||
t.start()
|
t.start()
|
||||||
|
|
||||||
def _prefetch_thread(self):
|
def _prefetch_thread(self, chunks):
|
||||||
# do these read requests in a temporary thread because there may be
|
# do these read requests in a temporary thread because there may be
|
||||||
# a lot of them, so it may block.
|
# a lot of them, so it may block.
|
||||||
for offset, length in self._prefetch_reads:
|
for offset, length in chunks:
|
||||||
self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
|
self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
|
||||||
|
|
||||||
def _async_response(self, t, msg):
|
def _async_response(self, t, msg):
|
||||||
|
|
|
@ -177,6 +177,7 @@ class BigSFTPTest (unittest.TestCase):
|
||||||
|
|
||||||
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
|
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
k2blob = kblob + kblob
|
k2blob = kblob + kblob
|
||||||
chunk = 793
|
chunk = 793
|
||||||
for i in xrange(10):
|
for i in xrange(10):
|
||||||
|
@ -194,10 +195,51 @@ class BigSFTPTest (unittest.TestCase):
|
||||||
self.assertEqual(data, k2blob[n_offset:n_offset + chunk])
|
self.assertEqual(data, k2blob[n_offset:n_offset + chunk])
|
||||||
offset += chunk
|
offset += chunk
|
||||||
f.close()
|
f.close()
|
||||||
|
end = time.time()
|
||||||
|
sys.stderr.write('%ds ' % round(end - start))
|
||||||
finally:
|
finally:
|
||||||
sftp.remove('%s/hongry.txt' % FOLDER)
|
sftp.remove('%s/hongry.txt' % FOLDER)
|
||||||
|
|
||||||
def test_5_lots_of_prefetching(self):
|
def test_5_readv_seek(self):
|
||||||
|
sftp = get_sftp()
|
||||||
|
kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
|
||||||
|
try:
|
||||||
|
f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
|
||||||
|
f.set_pipelined(True)
|
||||||
|
for n in range(1024):
|
||||||
|
f.write(kblob)
|
||||||
|
if n % 128 == 0:
|
||||||
|
sys.stderr.write('.')
|
||||||
|
f.close()
|
||||||
|
sys.stderr.write(' ')
|
||||||
|
|
||||||
|
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
k2blob = kblob + kblob
|
||||||
|
chunk = 793
|
||||||
|
for i in xrange(10):
|
||||||
|
f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
|
||||||
|
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
|
||||||
|
# make a bunch of offsets and put them in random order
|
||||||
|
offsets = [base_offset + j * chunk for j in xrange(100)]
|
||||||
|
readv_list = []
|
||||||
|
for j in xrange(100):
|
||||||
|
o = offsets[random.randint(0, len(offsets) - 1)]
|
||||||
|
offsets.remove(o)
|
||||||
|
readv_list.append((o, chunk))
|
||||||
|
ret = f.readv(readv_list)
|
||||||
|
for i in xrange(len(readv_list)):
|
||||||
|
offset = readv_list[i][0]
|
||||||
|
n_offset = offset % 1024
|
||||||
|
self.assertEqual(ret[i], k2blob[n_offset:n_offset + chunk])
|
||||||
|
f.close()
|
||||||
|
end = time.time()
|
||||||
|
sys.stderr.write('%ds ' % round(end - start))
|
||||||
|
finally:
|
||||||
|
sftp.remove('%s/hongry.txt' % FOLDER)
|
||||||
|
|
||||||
|
def test_6_lots_of_prefetching(self):
|
||||||
"""
|
"""
|
||||||
prefetch a 1MB file a bunch of times, discarding the file object
|
prefetch a 1MB file a bunch of times, discarding the file object
|
||||||
without using it, to verify that paramiko doesn't get confused.
|
without using it, to verify that paramiko doesn't get confused.
|
||||||
|
@ -231,7 +273,7 @@ class BigSFTPTest (unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
sftp.remove('%s/hongry.txt' % FOLDER)
|
sftp.remove('%s/hongry.txt' % FOLDER)
|
||||||
|
|
||||||
def test_6_big_file_big_buffer(self):
|
def test_7_big_file_big_buffer(self):
|
||||||
"""
|
"""
|
||||||
write a 1MB file, with no linefeeds, and a big buffer.
|
write a 1MB file, with no linefeeds, and a big buffer.
|
||||||
"""
|
"""
|
||||||
|
@ -246,7 +288,7 @@ class BigSFTPTest (unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
sftp.remove('%s/hongry.txt' % FOLDER)
|
sftp.remove('%s/hongry.txt' % FOLDER)
|
||||||
|
|
||||||
def test_7_big_file_renegotiate(self):
|
def test_8_big_file_renegotiate(self):
|
||||||
"""
|
"""
|
||||||
write a 1MB file, forcing key renegotiation in the middle.
|
write a 1MB file, forcing key renegotiation in the middle.
|
||||||
"""
|
"""
|
||||||
|
|
Loading…
Reference in New Issue