allow prefetch + readv to occur at the same time (even though it will be really inefficient). instead of a moving pointer, use the prefetched buffers as an indication of what we've downloaded so far. break up large readv requests into the max packet size. add 2 more unit tests to test this stuff.
This commit is contained in:
parent
787b0b461d
commit
c24db3e38c
|
@ -46,7 +46,6 @@ class SFTPFile (BufferedFile):
|
|||
self.pipelined = False
|
||||
self._prefetching = False
|
||||
self._prefetch_done = False
|
||||
self._prefetch_so_far = 0
|
||||
self._prefetch_data = {}
|
||||
self._prefetch_reads = []
|
||||
self._saved_exception = None
|
||||
|
@ -84,37 +83,45 @@ class SFTPFile (BufferedFile):
|
|||
# may have outlived the Transport connection
|
||||
pass
|
||||
|
||||
def _data_in_prefetch_buffers(self, offset):
|
||||
"""
|
||||
if a block of data is present in the prefetch buffers, at the given
|
||||
offset, return the offset of the relevant prefetch buffer. otherwise,
|
||||
return None. this guarantees nothing about the number of bytes
|
||||
collected in the prefetch buffer so far.
|
||||
"""
|
||||
k = [i for i in self._prefetch_data.keys() if i <= offset]
|
||||
if len(k) == 0:
|
||||
return None
|
||||
index = max(k)
|
||||
buf_offset = offset - index
|
||||
if buf_offset >= len(self._prefetch_data[index]):
|
||||
# it's not here
|
||||
return None
|
||||
return index
|
||||
|
||||
def _read_prefetch(self, size):
|
||||
"""
|
||||
read data out of the prefetch buffer, if possible. if the data isn't
|
||||
in the buffer, return None. otherwise, behaves like a normal read.
|
||||
"""
|
||||
# while not closed, and haven't fetched past the current position, and haven't reached EOF...
|
||||
while (self._prefetch_so_far <= self._realpos) and not self._closed:
|
||||
if self._prefetch_done:
|
||||
return None
|
||||
while not self._prefetch_done and not self._closed:
|
||||
offset = self._data_in_prefetch_buffers(self._realpos)
|
||||
if offset is not None:
|
||||
break
|
||||
self.sftp._read_response()
|
||||
self._check_exception()
|
||||
k = self._prefetch_data.keys()
|
||||
if len(k) == 0:
|
||||
if offset is None:
|
||||
self._prefetching = False
|
||||
return ''
|
||||
|
||||
# find largest offset < realpos
|
||||
pos_list = [i for i in k if i <= self._realpos]
|
||||
if len(pos_list) == 0:
|
||||
return None
|
||||
index = max(pos_list)
|
||||
prefetch = self._prefetch_data[index]
|
||||
del self._prefetch_data[index]
|
||||
prefetch = self._prefetch_data[offset]
|
||||
del self._prefetch_data[offset]
|
||||
|
||||
buf_offset = self._realpos - index
|
||||
buf_offset = self._realpos - offset
|
||||
if buf_offset > 0:
|
||||
self._prefetch_data[index] = prefetch[:buf_offset]
|
||||
self._prefetch_data[offset] = prefetch[:buf_offset]
|
||||
prefetch = prefetch[buf_offset:]
|
||||
if buf_offset >= len(prefetch):
|
||||
# it's not here.
|
||||
return None
|
||||
if size < len(prefetch):
|
||||
self._prefetch_data[self._realpos + size] = prefetch[size:]
|
||||
prefetch = prefetch[:size]
|
||||
|
@ -384,10 +391,22 @@ class SFTPFile (BufferedFile):
|
|||
# put the offsets in order, since we depend on that for determining
|
||||
# when the reads have finished.
|
||||
self.sftp._log(DEBUG, 'readv(%s, %r)' % (util.hexify(self.handle), chunks))
|
||||
# FIXME: if prefetch() was already called (not readv), don't prefetch.
|
||||
ordered_chunks = list(chunks)
|
||||
ordered_chunks.sort(lambda x, y: cmp(x[0], y[0]))
|
||||
self._start_prefetch(ordered_chunks)
|
||||
|
||||
# break up anything larger than the max read size
|
||||
if len([size for offset, size in ordered_chunks if size > self.MAX_REQUEST_SIZE]) > 0:
|
||||
read_chunks = []
|
||||
for offset, size in ordered_chunks:
|
||||
while size > 0:
|
||||
chunk_size = min(size, self.MAX_REQUEST_SIZE)
|
||||
read_chunks.append((offset, chunk_size))
|
||||
offset += chunk_size
|
||||
size -= chunk_size
|
||||
else:
|
||||
read_chunks = ordered_chunks
|
||||
|
||||
self._start_prefetch(read_chunks)
|
||||
# now we can just devolve to a bunch of read()s :)
|
||||
for x in chunks:
|
||||
self.seek(x[0])
|
||||
|
@ -406,8 +425,6 @@ class SFTPFile (BufferedFile):
|
|||
def _start_prefetch(self, chunks):
|
||||
self._prefetching = True
|
||||
self._prefetch_done = False
|
||||
self._prefetch_so_far = chunks[0][0]
|
||||
self._prefetch_data = {}
|
||||
self._prefetch_reads.extend(chunks)
|
||||
|
||||
t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
|
||||
|
@ -434,7 +451,6 @@ class SFTPFile (BufferedFile):
|
|||
offset, length = self._prefetch_reads.pop(0)
|
||||
assert length == len(data)
|
||||
self._prefetch_data[offset] = data
|
||||
self._prefetch_so_far = offset + length
|
||||
if len(self._prefetch_reads) == 0:
|
||||
self._prefetch_done = True
|
||||
|
||||
|
|
|
@ -273,7 +273,75 @@ class BigSFTPTest (unittest.TestCase):
|
|||
finally:
|
||||
sftp.remove('%s/hongry.txt' % FOLDER)
|
||||
|
||||
def test_7_big_file_big_buffer(self):
|
||||
def test_7_prefetch_readv(self):
|
||||
"""
|
||||
verify that prefetch and readv don't conflict with each other.
|
||||
"""
|
||||
sftp = get_sftp()
|
||||
kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
|
||||
try:
|
||||
f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
|
||||
f.set_pipelined(True)
|
||||
for n in range(1024):
|
||||
f.write(kblob)
|
||||
if n % 128 == 0:
|
||||
sys.stderr.write('.')
|
||||
f.close()
|
||||
sys.stderr.write(' ')
|
||||
|
||||
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
|
||||
|
||||
f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
|
||||
f.prefetch()
|
||||
data = f.read(1024)
|
||||
self.assertEqual(data, kblob)
|
||||
|
||||
chunk_size = 793
|
||||
base_offset = 512 * 1024
|
||||
k2blob = kblob + kblob
|
||||
chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)]
|
||||
for data in f.readv(chunks):
|
||||
offset = base_offset % 1024
|
||||
self.assertEqual(chunk_size, len(data))
|
||||
self.assertEqual(k2blob[offset:offset + chunk_size], data)
|
||||
base_offset += chunk_size
|
||||
|
||||
f.close()
|
||||
sys.stderr.write(' ')
|
||||
finally:
|
||||
sftp.remove('%s/hongry.txt' % FOLDER)
|
||||
|
||||
def test_8_large_readv(self):
|
||||
"""
|
||||
verify that a very large readv is broken up correctly and still
|
||||
returned as a single blob.
|
||||
"""
|
||||
sftp = get_sftp()
|
||||
kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
|
||||
try:
|
||||
f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
|
||||
f.set_pipelined(True)
|
||||
for n in range(1024):
|
||||
f.write(kblob)
|
||||
if n % 128 == 0:
|
||||
sys.stderr.write('.')
|
||||
f.close()
|
||||
sys.stderr.write(' ')
|
||||
|
||||
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
|
||||
|
||||
f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
|
||||
data = list(f.readv([(23 * 1024, 128 * 1024)]))
|
||||
self.assertEqual(1, len(data))
|
||||
data = data[0]
|
||||
self.assertEqual(128 * 1024, len(data))
|
||||
|
||||
f.close()
|
||||
sys.stderr.write(' ')
|
||||
finally:
|
||||
sftp.remove('%s/hongry.txt' % FOLDER)
|
||||
|
||||
def test_9_big_file_big_buffer(self):
|
||||
"""
|
||||
write a 1MB file, with no linefeeds, and a big buffer.
|
||||
"""
|
||||
|
@ -288,7 +356,7 @@ class BigSFTPTest (unittest.TestCase):
|
|||
finally:
|
||||
sftp.remove('%s/hongry.txt' % FOLDER)
|
||||
|
||||
def test_8_big_file_renegotiate(self):
|
||||
def test_A_big_file_renegotiate(self):
|
||||
"""
|
||||
write a 1MB file, forcing key renegotiation in the middle.
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue