Похоже, вам нужно настроить рабочий поток для управления загрузкой с retrbinary
Я сделал быстрый Гист для этого, так как мы столкнулись с той же проблемой. Кажется, этот метод работает.
https://gist.github.com/Richard-Mathie/ffecf414553f8ca4c56eb5b06e791b6f
class FTPDownloader(object):
def __init__(self, host, user, password, timeout=0.01):
self.ftp = FTP(host)
self.ftp.login(user, password)
self.timeout = timeout
def getBytes(self, filename):
print("getBytes")
self.ftp.retrbinary("RETR {}".format(filename) , self.bytes.put)
self.bytes.join() # wait for all blocks in the queue to be processed
self.finished.set() # mark streaming as finished
def sendBytes(self):
while not self.finished.is_set():
try:
yield self.bytes.get(timeout=self.timeout)
self.bytes.task_done()
except Empty:
self.finished.wait(self.timeout)
self.worker.join()
def download(self, filename):
self.bytes = Queue()
self.finished = Event()
self.worker = Thread(target=self.getBytes, args=(filename,))
self.worker.start()
return self.sendBytes()
Вероятно, следует добавить некоторые таймауты и логику для обработки тайм-аутов соединений, но это базовая форма.
Объяснение
Очереди не гарантируют, что рабочий процесс getBytes
завершился, когда они пусты, поэтому вы должны иметь семафор / Событие, чтобы указать генератору sendBytes
, когда рабочий закончил. Однако мне нужно подождать, пока все блоки в очереди будут обработаны в первую очередь, следовательно, self.bytes.join()
до завершения установки.
Интересно, может ли кто-нибудь придумать более элегантный способ сделать это.