generalize the prefetching a bit so that it can be reused for a readv method
This commit is contained in:
parent
a98c5cf627
commit
862e1f48e3
|
@ -46,7 +46,6 @@ class SFTPFile (BufferedFile):
|
||||||
self._prefetching = False
|
self._prefetching = False
|
||||||
self._prefetch_done = False
|
self._prefetch_done = False
|
||||||
self._prefetch_so_far = 0
|
self._prefetch_so_far = 0
|
||||||
self._prefetch_size = 0
|
|
||||||
self._prefetch_data = {}
|
self._prefetch_data = {}
|
||||||
self._saved_exception = None
|
self._saved_exception = None
|
||||||
|
|
||||||
|
@ -89,8 +88,7 @@ class SFTPFile (BufferedFile):
|
||||||
in the buffer, return None. otherwise, behaves like a normal read.
|
in the buffer, return None. otherwise, behaves like a normal read.
|
||||||
"""
|
"""
|
||||||
# while not closed, and haven't fetched past the current position, and haven't reached EOF...
|
# while not closed, and haven't fetched past the current position, and haven't reached EOF...
|
||||||
while (self._prefetch_so_far <= self._realpos) and \
|
while (self._prefetch_so_far <= self._realpos) and not self._closed:
|
||||||
(self._prefetch_so_far < self._prefetch_size) and not self._closed:
|
|
||||||
if self._prefetch_done:
|
if self._prefetch_done:
|
||||||
return None
|
return None
|
||||||
self.sftp._read_response()
|
self.sftp._read_response()
|
||||||
|
@ -348,26 +346,35 @@ class SFTPFile (BufferedFile):
|
||||||
dramatically improve the download speed by avoiding roundtrip latency.
|
dramatically improve the download speed by avoiding roundtrip latency.
|
||||||
The file's contents are incrementally buffered in a background thread.
|
The file's contents are incrementally buffered in a background thread.
|
||||||
|
|
||||||
|
The prefetched data is stored in a buffer until read via the L{read}
|
||||||
|
method. Once data has been read, it's removed from the buffer. The
|
||||||
|
data may be read in a random order (using L{seek}); chunks of the
|
||||||
|
buffer that haven't been read will continue to be buffered.
|
||||||
|
|
||||||
@since: 1.5.1
|
@since: 1.5.1
|
||||||
"""
|
"""
|
||||||
size = self.stat().st_size
|
size = self.stat().st_size
|
||||||
# queue up async reads for the rest of the file
|
# queue up async reads for the rest of the file
|
||||||
self._prefetching = True
|
chunks = []
|
||||||
self._prefetch_done = False
|
|
||||||
self._prefetch_so_far = self._realpos
|
|
||||||
self._prefetch_size = size
|
|
||||||
self._prefetch_data = {}
|
|
||||||
t = threading.Thread(target=self._prefetch)
|
|
||||||
t.setDaemon(True)
|
|
||||||
t.start()
|
|
||||||
|
|
||||||
def _prefetch(self):
|
|
||||||
n = self._realpos
|
n = self._realpos
|
||||||
size = self._prefetch_size
|
|
||||||
while n < size:
|
while n < size:
|
||||||
chunk = min(self.MAX_REQUEST_SIZE, size - n)
|
chunk = min(self.MAX_REQUEST_SIZE, size - n)
|
||||||
self.sftp._async_request(self, CMD_READ, self.handle, long(n), int(chunk))
|
chunks.append((n, chunk))
|
||||||
n += chunk
|
n += chunk
|
||||||
|
self._start_prefetch(chunks)
|
||||||
|
|
||||||
|
def readv(self, chunks):
|
||||||
|
# put the offsets in order, since we depend on that for determining
|
||||||
|
# when the reads have finished.
|
||||||
|
ordered_chunks = chunks[:]
|
||||||
|
ordered_chunks.sort(lambda x, y: cmp(x[0], y[0]))
|
||||||
|
self._start_prefetch(ordered_chunks)
|
||||||
|
# now we can just devolve to a bunch of read()s :)
|
||||||
|
out = []
|
||||||
|
for x in chunks:
|
||||||
|
self.seek(x[0])
|
||||||
|
out.append(self.read(x[1]))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
### internals...
|
### internals...
|
||||||
|
@ -379,6 +386,23 @@ class SFTPFile (BufferedFile):
|
||||||
except:
|
except:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
def _start_prefetch(self, chunks):
|
||||||
|
self._prefetching = True
|
||||||
|
self._prefetch_done = False
|
||||||
|
self._prefetch_so_far = chunks[0][0]
|
||||||
|
self._prefetch_data = {}
|
||||||
|
self._prefetch_reads = chunks
|
||||||
|
|
||||||
|
t = threading.Thread(target=self._prefetch_thread)
|
||||||
|
t.setDaemon(True)
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
def _prefetch_thread(self):
|
||||||
|
# do these read requests in a temporary thread because there may be
|
||||||
|
# a lot of them, so it may block.
|
||||||
|
for offset, length in self._prefetch_reads:
|
||||||
|
self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
|
||||||
|
|
||||||
def _async_response(self, t, msg):
|
def _async_response(self, t, msg):
|
||||||
if t == CMD_STATUS:
|
if t == CMD_STATUS:
|
||||||
# save exception and re-raise it on next file operation
|
# save exception and re-raise it on next file operation
|
||||||
|
@ -390,9 +414,11 @@ class SFTPFile (BufferedFile):
|
||||||
if t != CMD_DATA:
|
if t != CMD_DATA:
|
||||||
raise SFTPError('Expected data')
|
raise SFTPError('Expected data')
|
||||||
data = msg.get_string()
|
data = msg.get_string()
|
||||||
self._prefetch_data[self._prefetch_so_far] = data
|
offset, length = self._prefetch_reads.pop(0)
|
||||||
self._prefetch_so_far += len(data)
|
assert length == len(data)
|
||||||
if self._prefetch_so_far == self._prefetch_size:
|
self._prefetch_data[offset] = data
|
||||||
|
self._prefetch_so_far = offset + length
|
||||||
|
if len(self._prefetch_reads) == 0:
|
||||||
self._prefetch_done = True
|
self._prefetch_done = True
|
||||||
|
|
||||||
def _check_exception(self):
|
def _check_exception(self):
|
||||||
|
|
Loading…
Reference in New Issue