From 974294ad7d083c49bffe9189cac0d5b08984b1ad Mon Sep 17 00:00:00 2001 From: Richard Kettlewell Date: Mon, 29 Aug 2011 15:50:35 +0100 Subject: [PATCH] Fix issue 34 (SFTPFile prefetch assumes response order matches requests) SFTPFile._async_response gets a new 'num' parameter giving the request number. This can be matched up with the return value of SFTPClient._async_request() to retrieve data specific to that request. The prefetch queue SFTPFile._prefetch_reads is replaced with the dict _prefetch_extents, which maps request numbers to (offset,length) tuples. A lock is used to exclude the case where a response arrives in _async_response before _prefetch_thread has updated it. --- paramiko/sftp_client.py | 2 +- paramiko/sftp_file.py | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py index 79a7761..189caa4 100644 --- a/paramiko/sftp_client.py +++ b/paramiko/sftp_client.py @@ -682,7 +682,7 @@ class SFTPClient (BaseSFTP): self._convert_status(msg) return t, msg if fileobj is not type(None): - fileobj._async_response(t, msg) + fileobj._async_response(t, msg, num) if waitfor is None: # just doing a single check break diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 8c5c7ac..2d4d431 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -49,7 +49,8 @@ class SFTPFile (BufferedFile): self._prefetching = False self._prefetch_done = False self._prefetch_data = {} - self._prefetch_reads = [] + self._prefetch_extents = {} + self._prefetch_lock = threading.Lock() self._saved_exception = None def __del__(self): @@ -86,7 +87,7 @@ class SFTPFile (BufferedFile): pass def _data_in_prefetch_requests(self, offset, size): - k = [i for i in self._prefetch_reads if i[0] <= offset] + k = [self._prefetch_extents[i] for i in self._prefetch_extents if self._prefetch_extents[i][0] <= offset] if len(k) == 0: return False k.sort(lambda x, y: cmp(x[0], y[0])) @@ -440,7 +441,6 @@ class SFTPFile (BufferedFile): def _start_prefetch(self, chunks): self._prefetching = True self._prefetch_done = False - self._prefetch_reads.extend(chunks) t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) t.setDaemon(True) @@ -450,9 +450,11 @@ class SFTPFile (BufferedFile): # do these read requests in a temporary thread because there may be # a lot of them, so it may block. for offset, length in chunks: - self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + with self._prefetch_lock: + num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + self._prefetch_extents[num] = (offset, length) - def _async_response(self, t, msg): + def _async_response(self, t, msg, num): if t == CMD_STATUS: # save exception and re-raise it on next file operation try: @@ -463,10 +465,12 @@ class SFTPFile (BufferedFile): if t != CMD_DATA: raise SFTPError('Expected data') data = msg.get_string() - offset, length = self._prefetch_reads.pop(0) - self._prefetch_data[offset] = data - if len(self._prefetch_reads) == 0: - self._prefetch_done = True + with self._prefetch_lock: + offset,length = self._prefetch_extents[num] + self._prefetch_data[offset] = data + del self._prefetch_extents[num] + if len(self._prefetch_extents) == 0: + self._prefetch_done = True def _check_exception(self): "if there's a saved exception, raise & clear it"