Fix issue 34 (SFTPFile prefetch assumes response order matches requests)
SFTPFile._async_response gets a new 'num' parameter giving the request number. This can be matched up with the return value of SFTPClient._async_request() to retrieve data specific to that request. The prefetch queue SFTPFile._prefetch_reads is replaced with the dict _prefetch_extents, which maps request numbers to (offset,length) tuples. A lock is used to exclude the case where a response arrives in _async_response before _prefetch_thread has updated it.
This commit is contained in:
parent
7bcbc24198
commit
974294ad7d
|
@ -682,7 +682,7 @@ class SFTPClient (BaseSFTP):
|
||||||
self._convert_status(msg)
|
self._convert_status(msg)
|
||||||
return t, msg
|
return t, msg
|
||||||
if fileobj is not type(None):
|
if fileobj is not type(None):
|
||||||
fileobj._async_response(t, msg)
|
fileobj._async_response(t, msg, num)
|
||||||
if waitfor is None:
|
if waitfor is None:
|
||||||
# just doing a single check
|
# just doing a single check
|
||||||
break
|
break
|
||||||
|
|
|
@ -49,7 +49,8 @@ class SFTPFile (BufferedFile):
|
||||||
self._prefetching = False
|
self._prefetching = False
|
||||||
self._prefetch_done = False
|
self._prefetch_done = False
|
||||||
self._prefetch_data = {}
|
self._prefetch_data = {}
|
||||||
self._prefetch_reads = []
|
self._prefetch_extents = {}
|
||||||
|
self._prefetch_lock = threading.Lock()
|
||||||
self._saved_exception = None
|
self._saved_exception = None
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
|
@ -86,7 +87,7 @@ class SFTPFile (BufferedFile):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _data_in_prefetch_requests(self, offset, size):
|
def _data_in_prefetch_requests(self, offset, size):
|
||||||
k = [i for i in self._prefetch_reads if i[0] <= offset]
|
k = [self._prefetch_extents[i] for i in self._prefetch_extents if self._prefetch_extents[i][0] <= offset]
|
||||||
if len(k) == 0:
|
if len(k) == 0:
|
||||||
return False
|
return False
|
||||||
k.sort(lambda x, y: cmp(x[0], y[0]))
|
k.sort(lambda x, y: cmp(x[0], y[0]))
|
||||||
|
@ -440,7 +441,6 @@ class SFTPFile (BufferedFile):
|
||||||
def _start_prefetch(self, chunks):
|
def _start_prefetch(self, chunks):
|
||||||
self._prefetching = True
|
self._prefetching = True
|
||||||
self._prefetch_done = False
|
self._prefetch_done = False
|
||||||
self._prefetch_reads.extend(chunks)
|
|
||||||
|
|
||||||
t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
|
t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
|
||||||
t.setDaemon(True)
|
t.setDaemon(True)
|
||||||
|
@ -450,9 +450,11 @@ class SFTPFile (BufferedFile):
|
||||||
# do these read requests in a temporary thread because there may be
|
# do these read requests in a temporary thread because there may be
|
||||||
# a lot of them, so it may block.
|
# a lot of them, so it may block.
|
||||||
for offset, length in chunks:
|
for offset, length in chunks:
|
||||||
self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
|
with self._prefetch_lock:
|
||||||
|
num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
|
||||||
|
self._prefetch_extents[num] = (offset, length)
|
||||||
|
|
||||||
def _async_response(self, t, msg):
|
def _async_response(self, t, msg, num):
|
||||||
if t == CMD_STATUS:
|
if t == CMD_STATUS:
|
||||||
# save exception and re-raise it on next file operation
|
# save exception and re-raise it on next file operation
|
||||||
try:
|
try:
|
||||||
|
@ -463,10 +465,12 @@ class SFTPFile (BufferedFile):
|
||||||
if t != CMD_DATA:
|
if t != CMD_DATA:
|
||||||
raise SFTPError('Expected data')
|
raise SFTPError('Expected data')
|
||||||
data = msg.get_string()
|
data = msg.get_string()
|
||||||
offset, length = self._prefetch_reads.pop(0)
|
with self._prefetch_lock:
|
||||||
self._prefetch_data[offset] = data
|
offset,length = self._prefetch_extents[num]
|
||||||
if len(self._prefetch_reads) == 0:
|
self._prefetch_data[offset] = data
|
||||||
self._prefetch_done = True
|
del self._prefetch_extents[num]
|
||||||
|
if len(self._prefetch_extents) == 0:
|
||||||
|
self._prefetch_done = True
|
||||||
|
|
||||||
def _check_exception(self):
|
def _check_exception(self):
|
||||||
"if there's a saved exception, raise & clear it"
|
"if there's a saved exception, raise & clear it"
|
||||||
|
|
Loading…
Reference in New Issue