From 862e1f48e32f874ee489002b067f175b3a851644 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Thu, 9 Mar 2006 18:22:34 -0800 Subject: [PATCH] [project @ robey@lag.net-20060310022234-c183ad0b3f31eb28] generalize the prefetching a bit so that it can be reused for a readv method --- paramiko/sftp_file.py | 64 ++++++++++++++++++++++++++++++------------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 7d9b8f4..f132101 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -46,7 +46,6 @@ class SFTPFile (BufferedFile): self._prefetching = False self._prefetch_done = False self._prefetch_so_far = 0 - self._prefetch_size = 0 self._prefetch_data = {} self._saved_exception = None @@ -89,8 +88,7 @@ class SFTPFile (BufferedFile): in the buffer, return None. otherwise, behaves like a normal read. """ # while not closed, and haven't fetched past the current position, and haven't reached EOF... - while (self._prefetch_so_far <= self._realpos) and \ - (self._prefetch_so_far < self._prefetch_size) and not self._closed: + while (self._prefetch_so_far <= self._realpos) and not self._closed: if self._prefetch_done: return None self.sftp._read_response() @@ -348,27 +346,36 @@ class SFTPFile (BufferedFile): dramatically improve the download speed by avoiding roundtrip latency. The file's contents are incrementally buffered in a background thread. + The prefetched data is stored in a buffer until read via the L{read} + method. Once data has been read, it's removed from the buffer. The + data may be read in a random order (using L{seek}); chunks of the + buffer that haven't been read will continue to be buffered. + @since: 1.5.1 """ size = self.stat().st_size # queue up async reads for the rest of the file - self._prefetching = True - self._prefetch_done = False - self._prefetch_so_far = self._realpos - self._prefetch_size = size - self._prefetch_data = {} - t = threading.Thread(target=self._prefetch) - t.setDaemon(True) - t.start() - - def _prefetch(self): + chunks = [] n = self._realpos - size = self._prefetch_size while n < size: chunk = min(self.MAX_REQUEST_SIZE, size - n) - self.sftp._async_request(self, CMD_READ, self.handle, long(n), int(chunk)) + chunks.append((n, chunk)) n += chunk - + self._start_prefetch(chunks) + + def readv(self, chunks): + # put the offsets in order, since we depend on that for determining + # when the reads have finished. + ordered_chunks = chunks[:] + ordered_chunks.sort(lambda x, y: cmp(x[0], y[0])) + self._start_prefetch(ordered_chunks) + # now we can just devolve to a bunch of read()s :) + out = [] + for x in chunks: + self.seek(x[0]) + out.append(self.read(x[1])) + return out + ### internals... @@ -379,6 +386,23 @@ class SFTPFile (BufferedFile): except: return 0 + def _start_prefetch(self, chunks): + self._prefetching = True + self._prefetch_done = False + self._prefetch_so_far = chunks[0][0] + self._prefetch_data = {} + self._prefetch_reads = chunks + + t = threading.Thread(target=self._prefetch_thread) + t.setDaemon(True) + t.start() + + def _prefetch_thread(self): + # do these read requests in a temporary thread because there may be + # a lot of them, so it may block. + for offset, length in self._prefetch_reads: + self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + def _async_response(self, t, msg): if t == CMD_STATUS: # save exception and re-raise it on next file operation @@ -390,9 +414,11 @@ class SFTPFile (BufferedFile): if t != CMD_DATA: raise SFTPError('Expected data') data = msg.get_string() - self._prefetch_data[self._prefetch_so_far] = data - self._prefetch_so_far += len(data) - if self._prefetch_so_far == self._prefetch_size: + offset, length = self._prefetch_reads.pop(0) + assert length == len(data) + self._prefetch_data[offset] = data + self._prefetch_so_far = offset + length + if len(self._prefetch_reads) == 0: self._prefetch_done = True def _check_exception(self):