serialize outgoing requests (duh) -- when prefetching multiple files, there may be several threads pumping out read requests
This commit is contained in:
parent
2d06308fd9
commit
62a83e7d03
|
@ -22,6 +22,7 @@ Client-mode SFTP support.
|
||||||
|
|
||||||
import errno
|
import errno
|
||||||
import os
|
import os
|
||||||
|
import threading
|
||||||
import weakref
|
import weakref
|
||||||
from paramiko.sftp import *
|
from paramiko.sftp import *
|
||||||
from paramiko.sftp_attr import SFTPAttributes
|
from paramiko.sftp_attr import SFTPAttributes
|
||||||
|
@ -57,6 +58,8 @@ class SFTPClient (BaseSFTP):
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.ultra_debug = False
|
self.ultra_debug = False
|
||||||
self.request_number = 1
|
self.request_number = 1
|
||||||
|
# lock for request_number
|
||||||
|
self._lock = threading.Lock()
|
||||||
self._cwd = None
|
self._cwd = None
|
||||||
# request # -> SFTPFile
|
# request # -> SFTPFile
|
||||||
self._expecting = weakref.WeakValueDictionary()
|
self._expecting = weakref.WeakValueDictionary()
|
||||||
|
@ -523,23 +526,28 @@ class SFTPClient (BaseSFTP):
|
||||||
return self._read_response(num)
|
return self._read_response(num)
|
||||||
|
|
||||||
def _async_request(self, fileobj, t, *arg):
|
def _async_request(self, fileobj, t, *arg):
|
||||||
msg = Message()
|
# this method may be called from other threads (prefetch)
|
||||||
msg.add_int(self.request_number)
|
self._lock.acquire()
|
||||||
for item in arg:
|
try:
|
||||||
if type(item) is int:
|
msg = Message()
|
||||||
msg.add_int(item)
|
msg.add_int(self.request_number)
|
||||||
elif type(item) is long:
|
for item in arg:
|
||||||
msg.add_int64(item)
|
if type(item) is int:
|
||||||
elif type(item) is str:
|
msg.add_int(item)
|
||||||
msg.add_string(item)
|
elif type(item) is long:
|
||||||
elif type(item) is SFTPAttributes:
|
msg.add_int64(item)
|
||||||
item._pack(msg)
|
elif type(item) is str:
|
||||||
else:
|
msg.add_string(item)
|
||||||
raise Exception('unknown type for %r type %r' % (item, type(item)))
|
elif type(item) is SFTPAttributes:
|
||||||
self._send_packet(t, str(msg))
|
item._pack(msg)
|
||||||
num = self.request_number
|
else:
|
||||||
self._expecting[num] = fileobj
|
raise Exception('unknown type for %r type %r' % (item, type(item)))
|
||||||
self.request_number += 1
|
num = self.request_number
|
||||||
|
self._expecting[num] = fileobj
|
||||||
|
self._send_packet(t, str(msg))
|
||||||
|
self.request_number += 1
|
||||||
|
finally:
|
||||||
|
self._lock.release()
|
||||||
return num
|
return num
|
||||||
|
|
||||||
def _read_response(self, waitfor=None):
|
def _read_response(self, waitfor=None):
|
||||||
|
@ -550,6 +558,9 @@ class SFTPClient (BaseSFTP):
|
||||||
if num not in self._expecting:
|
if num not in self._expecting:
|
||||||
# might be response for a file that was closed before responses came back
|
# might be response for a file that was closed before responses came back
|
||||||
self._log(DEBUG, 'Unexpected response #%d' % (num,))
|
self._log(DEBUG, 'Unexpected response #%d' % (num,))
|
||||||
|
if waitfor is None:
|
||||||
|
# just doing a single check
|
||||||
|
return
|
||||||
continue
|
continue
|
||||||
fileobj = self._expecting[num]
|
fileobj = self._expecting[num]
|
||||||
del self._expecting[num]
|
del self._expecting[num]
|
||||||
|
|
Loading…
Reference in New Issue