From c24db3e38c88eb6de1f9921de0b9ce4fc1a6c41c Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Tue, 25 Jul 2006 17:09:07 -0700 Subject: [PATCH] [project @ robey@lag.net-20060726000907-b9a2d46eecc64cec] allow prefetch + readv to occur at the same time (even though it will be really inefficient). instead of a moving pointer, use the prefetched buffers as an indication of what we've downloaded so far. break up large readv requests into the max packet size. add 2 more unit tests to test this stuff. --- paramiko/sftp_file.py | 64 ++++++++++++++++++++++-------------- tests/test_sftp_big.py | 74 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 111 insertions(+), 27 deletions(-) diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 05c1045..3673e86 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -46,7 +46,6 @@ class SFTPFile (BufferedFile): self.pipelined = False self._prefetching = False self._prefetch_done = False - self._prefetch_so_far = 0 self._prefetch_data = {} self._prefetch_reads = [] self._saved_exception = None @@ -84,37 +83,45 @@ class SFTPFile (BufferedFile): # may have outlived the Transport connection pass + def _data_in_prefetch_buffers(self, offset): + """ + if a block of data is present in the prefetch buffers, at the given + offset, return the offset of the relevant prefetch buffer. otherwise, + return None. this guarantees nothing about the number of bytes + collected in the prefetch buffer so far. + """ + k = [i for i in self._prefetch_data.keys() if i <= offset] + if len(k) == 0: + return None + index = max(k) + buf_offset = offset - index + if buf_offset >= len(self._prefetch_data[index]): + # it's not here + return None + return index + def _read_prefetch(self, size): """ read data out of the prefetch buffer, if possible. if the data isn't in the buffer, return None. otherwise, behaves like a normal read. """ # while not closed, and haven't fetched past the current position, and haven't reached EOF... - while (self._prefetch_so_far <= self._realpos) and not self._closed: - if self._prefetch_done: - return None + while not self._prefetch_done and not self._closed: + offset = self._data_in_prefetch_buffers(self._realpos) + if offset is not None: + break self.sftp._read_response() self._check_exception() - k = self._prefetch_data.keys() - if len(k) == 0: + if offset is None: self._prefetching = False - return '' - - # find largest offset < realpos - pos_list = [i for i in k if i <= self._realpos] - if len(pos_list) == 0: return None - index = max(pos_list) - prefetch = self._prefetch_data[index] - del self._prefetch_data[index] + prefetch = self._prefetch_data[offset] + del self._prefetch_data[offset] - buf_offset = self._realpos - index + buf_offset = self._realpos - offset if buf_offset > 0: - self._prefetch_data[index] = prefetch[:buf_offset] + self._prefetch_data[offset] = prefetch[:buf_offset] prefetch = prefetch[buf_offset:] - if buf_offset >= len(prefetch): - # it's not here. - return None if size < len(prefetch): self._prefetch_data[self._realpos + size] = prefetch[size:] prefetch = prefetch[:size] @@ -384,10 +391,22 @@ class SFTPFile (BufferedFile): # put the offsets in order, since we depend on that for determining # when the reads have finished. self.sftp._log(DEBUG, 'readv(%s, %r)' % (util.hexify(self.handle), chunks)) - # FIXME: if prefetch() was already called (not readv), don't prefetch. ordered_chunks = list(chunks) ordered_chunks.sort(lambda x, y: cmp(x[0], y[0])) - self._start_prefetch(ordered_chunks) + + # break up anything larger than the max read size + if len([size for offset, size in ordered_chunks if size > self.MAX_REQUEST_SIZE]) > 0: + read_chunks = [] + for offset, size in ordered_chunks: + while size > 0: + chunk_size = min(size, self.MAX_REQUEST_SIZE) + read_chunks.append((offset, chunk_size)) + offset += chunk_size + size -= chunk_size + else: + read_chunks = ordered_chunks + + self._start_prefetch(read_chunks) # now we can just devolve to a bunch of read()s :) for x in chunks: self.seek(x[0]) @@ -406,8 +425,6 @@ class SFTPFile (BufferedFile): def _start_prefetch(self, chunks): self._prefetching = True self._prefetch_done = False - self._prefetch_so_far = chunks[0][0] - self._prefetch_data = {} self._prefetch_reads.extend(chunks) t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) @@ -434,7 +451,6 @@ class SFTPFile (BufferedFile): offset, length = self._prefetch_reads.pop(0) assert length == len(data) self._prefetch_data[offset] = data - self._prefetch_so_far = offset + length if len(self._prefetch_reads) == 0: self._prefetch_done = True diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index 94edab0..b3f214c 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -272,8 +272,76 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write(' ') finally: sftp.remove('%s/hongry.txt' % FOLDER) - - def test_7_big_file_big_buffer(self): + + def test_7_prefetch_readv(self): + """ + verify that prefetch and readv don't conflict with each other. + """ + sftp = get_sftp() + kblob = ''.join([struct.pack('>H', n) for n in xrange(512)]) + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w') + f.set_pipelined(True) + for n in range(1024): + f.write(kblob) + if n % 128 == 0: + sys.stderr.write('.') + f.close() + sys.stderr.write(' ') + + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + + f = sftp.open('%s/hongry.txt' % FOLDER, 'r') + f.prefetch() + data = f.read(1024) + self.assertEqual(data, kblob) + + chunk_size = 793 + base_offset = 512 * 1024 + k2blob = kblob + kblob + chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)] + for data in f.readv(chunks): + offset = base_offset % 1024 + self.assertEqual(chunk_size, len(data)) + self.assertEqual(k2blob[offset:offset + chunk_size], data) + base_offset += chunk_size + + f.close() + sys.stderr.write(' ') + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + + def test_8_large_readv(self): + """ + verify that a very large readv is broken up correctly and still + returned as a single blob. + """ + sftp = get_sftp() + kblob = ''.join([struct.pack('>H', n) for n in xrange(512)]) + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w') + f.set_pipelined(True) + for n in range(1024): + f.write(kblob) + if n % 128 == 0: + sys.stderr.write('.') + f.close() + sys.stderr.write(' ') + + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + + f = sftp.open('%s/hongry.txt' % FOLDER, 'r') + data = list(f.readv([(23 * 1024, 128 * 1024)])) + self.assertEqual(1, len(data)) + data = data[0] + self.assertEqual(128 * 1024, len(data)) + + f.close() + sys.stderr.write(' ') + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + + def test_9_big_file_big_buffer(self): """ write a 1MB file, with no linefeeds, and a big buffer. """ @@ -288,7 +356,7 @@ class BigSFTPTest (unittest.TestCase): finally: sftp.remove('%s/hongry.txt' % FOLDER) - def test_8_big_file_renegotiate(self): + def test_A_big_file_renegotiate(self): """ write a 1MB file, forcing key renegotiation in the middle. """