revise the prefetch machinery so that it doesn't assert if you seek to earlier than the prefetched buffers. also, keep around any prefetched data that has been seeked around, so that jumping around in the file will still get the benefit of prefetch, though only the first time any chunk is read.
This commit is contained in:
parent
bbcf7687aa
commit
a98c5cf627
|
@ -44,6 +44,7 @@ class SFTPFile (BufferedFile):
|
||||||
BufferedFile._set_mode(self, mode, bufsize)
|
BufferedFile._set_mode(self, mode, bufsize)
|
||||||
self.pipelined = False
|
self.pipelined = False
|
||||||
self._prefetching = False
|
self._prefetching = False
|
||||||
|
self._prefetch_done = False
|
||||||
self._prefetch_so_far = 0
|
self._prefetch_so_far = 0
|
||||||
self._prefetch_size = 0
|
self._prefetch_size = 0
|
||||||
self._prefetch_data = {}
|
self._prefetch_data = {}
|
||||||
|
@ -83,29 +84,48 @@ class SFTPFile (BufferedFile):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _read_prefetch(self, size):
|
def _read_prefetch(self, size):
|
||||||
|
"""
|
||||||
|
read data out of the prefetch buffer, if possible. if the data isn't
|
||||||
|
in the buffer, return None. otherwise, behaves like a normal read.
|
||||||
|
"""
|
||||||
# while not closed, and haven't fetched past the current position, and haven't reached EOF...
|
# while not closed, and haven't fetched past the current position, and haven't reached EOF...
|
||||||
while (self._prefetch_so_far <= self._realpos) and \
|
while (self._prefetch_so_far <= self._realpos) and \
|
||||||
(self._prefetch_so_far < self._prefetch_size) and not self._closed:
|
(self._prefetch_so_far < self._prefetch_size) and not self._closed:
|
||||||
|
if self._prefetch_done:
|
||||||
|
return None
|
||||||
self.sftp._read_response()
|
self.sftp._read_response()
|
||||||
self._check_exception()
|
self._check_exception()
|
||||||
k = self._prefetch_data.keys()
|
k = self._prefetch_data.keys()
|
||||||
k.sort()
|
|
||||||
while (len(k) > 0) and (k[0] + len(self._prefetch_data[k[0]]) <= self._realpos):
|
|
||||||
# done with that block
|
|
||||||
del self._prefetch_data[k[0]]
|
|
||||||
k.pop(0)
|
|
||||||
if len(k) == 0:
|
if len(k) == 0:
|
||||||
self._prefetching = False
|
self._prefetching = False
|
||||||
return ''
|
return ''
|
||||||
assert k[0] <= self._realpos
|
|
||||||
buf_offset = self._realpos - k[0]
|
# find largest offset < realpos
|
||||||
buf_length = len(self._prefetch_data[k[0]]) - buf_offset
|
pos_list = [i for i in k if i <= self._realpos]
|
||||||
return self._prefetch_data[k[0]][buf_offset : buf_offset + buf_length]
|
if len(pos_list) == 0:
|
||||||
|
return None
|
||||||
|
index = max(pos_list)
|
||||||
|
prefetch = self._prefetch_data[index]
|
||||||
|
del self._prefetch_data[index]
|
||||||
|
|
||||||
|
buf_offset = self._realpos - index
|
||||||
|
if buf_offset > 0:
|
||||||
|
self._prefetch_data[index] = prefetch[:buf_offset]
|
||||||
|
prefetch = prefetch[buf_offset:]
|
||||||
|
if buf_offset >= len(prefetch):
|
||||||
|
# it's not here.
|
||||||
|
return None
|
||||||
|
if size < len(prefetch):
|
||||||
|
self._prefetch_data[self._realpos + size] = prefetch[size:]
|
||||||
|
prefetch = prefetch[:size]
|
||||||
|
return prefetch
|
||||||
|
|
||||||
def _read(self, size):
|
def _read(self, size):
|
||||||
size = min(size, self.MAX_REQUEST_SIZE)
|
size = min(size, self.MAX_REQUEST_SIZE)
|
||||||
if self._prefetching:
|
if self._prefetching:
|
||||||
return self._read_prefetch(size)
|
data = self._read_prefetch(size)
|
||||||
|
if data is not None:
|
||||||
|
return data
|
||||||
t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size))
|
t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size))
|
||||||
if t != CMD_DATA:
|
if t != CMD_DATA:
|
||||||
raise SFTPError('Expected data')
|
raise SFTPError('Expected data')
|
||||||
|
@ -333,6 +353,7 @@ class SFTPFile (BufferedFile):
|
||||||
size = self.stat().st_size
|
size = self.stat().st_size
|
||||||
# queue up async reads for the rest of the file
|
# queue up async reads for the rest of the file
|
||||||
self._prefetching = True
|
self._prefetching = True
|
||||||
|
self._prefetch_done = False
|
||||||
self._prefetch_so_far = self._realpos
|
self._prefetch_so_far = self._realpos
|
||||||
self._prefetch_size = size
|
self._prefetch_size = size
|
||||||
self._prefetch_data = {}
|
self._prefetch_data = {}
|
||||||
|
@ -371,6 +392,8 @@ class SFTPFile (BufferedFile):
|
||||||
data = msg.get_string()
|
data = msg.get_string()
|
||||||
self._prefetch_data[self._prefetch_so_far] = data
|
self._prefetch_data[self._prefetch_so_far] = data
|
||||||
self._prefetch_so_far += len(data)
|
self._prefetch_so_far += len(data)
|
||||||
|
if self._prefetch_so_far == self._prefetch_size:
|
||||||
|
self._prefetch_done = True
|
||||||
|
|
||||||
def _check_exception(self):
|
def _check_exception(self):
|
||||||
"if there's a saved exception, raise & clear it"
|
"if there's a saved exception, raise & clear it"
|
||||||
|
|
Loading…
Reference in New Issue