Поток Python с FTP-сервера на сервер Flask для загрузки - PullRequest
0 голосов
/ 25 июня 2018

У меня есть приложение Python Flask, которое получает запрос на загрузку файла с удаленного FTP-сервера. Я использовал BytesIO для сохранения содержимого файла, загруженного с FTP-сервера, используя retrbinary:

import os

from flask import Flask, request, send_file
from ftplib import FTP
from io import BytesIO

app = Flask(__name__)

@app.route('/')
def hello_world():
    return 'Hello, World!'

@app.route('/download_content', methods=['GET'])
def download_content():
    filepath = request.args.get("filepath").strip()
    f = FTP(my_server)
    f.login(my_username, my_password)
    b = BytesIO()
    f.retrbinary("RETR " + filepath, b.write)
    b.seek(0)
    return send_file(b, attachment_filename=os.path.basename(filepath))

app.run("localhost", port=8080)

Проблема здесь в том, что при достижении маршрута download_content сначала содержимое файла попадает в объект BytesIO, а затем отправляется на внешний интерфейс для загрузки.

Как я могу транслировать файл на веб-интерфейс во время его загрузки с FTP-сервера? Я не могу дождаться загрузки файла целиком в BytesIO объекте, а затем выполнить send_file, так как это может быть как неэффективно, так и требует больше времени.

Я читал, что send_file Flask принимает объект generator, но как я могу сделать объекты BytesIO yield до send_file кусками?

1 Ответ

0 голосов
/ 11 июля 2018

Похоже, вам нужно настроить рабочий поток для управления загрузкой с retrbinary

Я сделал быстрый Гист для этого, так как мы столкнулись с той же проблемой. Кажется, этот метод работает.

https://gist.github.com/Richard-Mathie/ffecf414553f8ca4c56eb5b06e791b6f

class FTPDownloader(object):
  def __init__(self, host, user, password, timeout=0.01):
    self.ftp = FTP(host)
    self.ftp.login(user, password)
    self.timeout = timeout

  def getBytes(self, filename):
    print("getBytes")
    self.ftp.retrbinary("RETR {}".format(filename) , self.bytes.put)
    self.bytes.join()   # wait for all blocks in the queue to be processed
    self.finished.set() # mark streaming as finished

  def sendBytes(self):
    while not self.finished.is_set():
      try:
        yield self.bytes.get(timeout=self.timeout)
          self.bytes.task_done()
      except Empty:
        self.finished.wait(self.timeout)
    self.worker.join()

  def download(self, filename):
    self.bytes = Queue()
    self.finished = Event()
    self.worker = Thread(target=self.getBytes, args=(filename,))
    self.worker.start()
    return self.sendBytes()

Вероятно, следует добавить некоторые таймауты и логику для обработки тайм-аутов соединений, но это базовая форма.

Объяснение

Очереди не гарантируют, что рабочий процесс getBytes завершился, когда они пусты, поэтому вы должны иметь семафор / Событие, чтобы указать генератору sendBytes, когда рабочий закончил. Однако мне нужно подождать, пока все блоки в очереди будут обработаны в первую очередь, следовательно, self.bytes.join() до завершения установки.

Интересно, может ли кто-нибудь придумать более элегантный способ сделать это.

...