diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index f132101..8f8dbfc 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -364,6 +364,20 @@ class SFTPFile (BufferedFile): self._start_prefetch(chunks) def readv(self, chunks): + """ + Read a set of blocks from the file by (offset, length). This is more + efficient than doing a series of L{seek} and L{read} calls, since the + prefetch machinery is used to retrieve all the requested blocks at + once. + + @param chunks: a list of (offset, length) tuples indicating which + sections of the file to read + @ptype chunks: list(tuple(long, int)) + @return: a list of blocks read, in the same order as in C{chunks} + @rtype: list(str) + + @since: 1.5.4 + """ # put the offsets in order, since we depend on that for determining # when the reads have finished. ordered_chunks = chunks[:] @@ -391,16 +405,16 @@ class SFTPFile (BufferedFile): self._prefetch_done = False self._prefetch_so_far = chunks[0][0] self._prefetch_data = {} - self._prefetch_reads = chunks + self._prefetch_reads = chunks[:] - t = threading.Thread(target=self._prefetch_thread) + t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) t.setDaemon(True) t.start() - def _prefetch_thread(self): + def _prefetch_thread(self, chunks): # do these read requests in a temporary thread because there may be # a lot of them, so it may block. - for offset, length in self._prefetch_reads: + for offset, length in chunks: self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) def _async_response(self, t, msg): diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index 2dd285a..aefe3ef 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -177,6 +177,7 @@ class BigSFTPTest (unittest.TestCase): self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + start = time.time() k2blob = kblob + kblob chunk = 793 for i in xrange(10): @@ -194,10 +195,51 @@ class BigSFTPTest (unittest.TestCase): self.assertEqual(data, k2blob[n_offset:n_offset + chunk]) offset += chunk f.close() + end = time.time() + sys.stderr.write('%ds ' % round(end - start)) finally: sftp.remove('%s/hongry.txt' % FOLDER) - - def test_5_lots_of_prefetching(self): + + def test_5_readv_seek(self): + sftp = get_sftp() + kblob = ''.join([struct.pack('>H', n) for n in xrange(512)]) + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w') + f.set_pipelined(True) + for n in range(1024): + f.write(kblob) + if n % 128 == 0: + sys.stderr.write('.') + f.close() + sys.stderr.write(' ') + + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + + start = time.time() + k2blob = kblob + kblob + chunk = 793 + for i in xrange(10): + f = sftp.open('%s/hongry.txt' % FOLDER, 'r') + base_offset = (512 * 1024) + 17 * random.randint(1000, 2000) + # make a bunch of offsets and put them in random order + offsets = [base_offset + j * chunk for j in xrange(100)] + readv_list = [] + for j in xrange(100): + o = offsets[random.randint(0, len(offsets) - 1)] + offsets.remove(o) + readv_list.append((o, chunk)) + ret = f.readv(readv_list) + for i in xrange(len(readv_list)): + offset = readv_list[i][0] + n_offset = offset % 1024 + self.assertEqual(ret[i], k2blob[n_offset:n_offset + chunk]) + f.close() + end = time.time() + sys.stderr.write('%ds ' % round(end - start)) + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + + def test_6_lots_of_prefetching(self): """ prefetch a 1MB file a bunch of times, discarding the file object without using it, to verify that paramiko doesn't get confused. @@ -231,7 +273,7 @@ class BigSFTPTest (unittest.TestCase): finally: sftp.remove('%s/hongry.txt' % FOLDER) - def test_6_big_file_big_buffer(self): + def test_7_big_file_big_buffer(self): """ write a 1MB file, with no linefeeds, and a big buffer. """ @@ -246,7 +288,7 @@ class BigSFTPTest (unittest.TestCase): finally: sftp.remove('%s/hongry.txt' % FOLDER) - def test_7_big_file_renegotiate(self): + def test_8_big_file_renegotiate(self): """ write a 1MB file, forcing key renegotiation in the middle. """