Я пытаюсь загрузить файл (ы) из Интернета из витого приложения. Я хотел бы сделать это, используя запросы из-за других функций, которые он предоставляет напрямую или имеет хорошо поддерживаемые библиотеки (повторные попытки, прокси, контроль кэша и т. Д.). Я открыт для решения только для крученого, у которого нет этих функций, но я все равно не могу его найти.
Ожидается, что файлы будут достаточно большими и будут загружаться при медленных соединениях. Поэтому я использую интерфейс запросов stream=True
и iter_content ответа. Более или менее полный фрагмент кода приведен в конце этого вопроса. Точкой входа для этого будет функция http_download
, вызываемая с помощью url
, dst
для записи файла и callback
и необязательного errback
для обработки неудачной загрузки. Я удалил некоторый код, связанный с подготовкой места назначения (создание папок и т. Д.) И код для закрытия сеанса во время выхода из реактора, но я думаю, что он все еще должен работать как есть.
Этот код работает. Файл загружен, витой реактор продолжает работать. Однако у меня, похоже, есть проблема с этим битом кода:
def _stream_download(r, f):
for chunk in r.iter_content(chunk_size=128):
f.write(chunk)
yield None
cooperative_dl = cooperate(_stream_download(response, filehandle))
Поскольку iter_content
возвращается только тогда, когда у него есть чанк для возврата, реактор обрабатывает чанк, запускает другие биты кода, затем возвращается к ожиданию следующего чанка, вместо того, чтобы занимать себя обновлением анимации ожидания вращения в графическом интерфейсе. (код на самом деле не размещен здесь).
Вот вопрос -
- Есть ли способ заставить работать этот генератор таким образом, чтобы он давал управление, когда сам генератор не готов что-то выдавать? Я наткнулся на несколько документов на
twisted.flow
, которые казались подходящими, но, похоже, это не превратило их в искаженные или более не существует сегодня. Этот вопрос может быть прочитан независимо от специфики, то есть в отношении любого генератора произвольной блокировки, или может быть прочитан в непосредственном контексте вопроса.
- Есть ли способ заставить вас загружать файлы асинхронно, используя что-то вроде полнофункциональных запросов? Есть ли существующий витой модуль, который просто делает это, и который я могу просто использовать?
- Каким будет базовый подход к такой проблеме с витыми, независимо от функций http, которые я хочу использовать из запросов. Давайте предположим, что я готов отказаться от них или иным образом реализовать их. Как я могу загрузить файл асинхронно по HTTP.
import os
import re
from functools import partial
from six.moves.urllib.parse import urlparse
from requests import HTTPError
from twisted.internet.task import cooperate
from txrequests import Session
class HttpClientMixin(object):
def __init__(self, *args, **kwargs):
self._http_session = None
def http_download(self, url, dst, callback, errback=None, **kwargs):
dst = os.path.abspath(dst)
# Log request
deferred_response = self.http_session.get(url, stream=True, **kwargs)
deferred_response.addCallback(self._http_check_response)
deferred_response.addCallbacks(
partial(self._http_download, destination=dst, callback=callback),
partial(self._http_error_handler, url=url, errback=errback)
)
def _http_download(self, response, destination=None, callback=None):
def _stream_download(r, f):
for chunk in r.iter_content(chunk_size=128):
f.write(chunk)
yield None
def _rollback(r, f, d):
if r:
r.close()
if f:
f.close()
if os.path.exists(d):
os.remove(d)
filehandle = open(destination, 'wb')
cooperative_dl = cooperate(_stream_download(response, filehandle))
cooperative_dl.whenDone().addCallback(lambda _: response.close)
cooperative_dl.whenDone().addCallback(lambda _: filehandle.close)
cooperative_dl.whenDone().addCallback(
partial(callback, url=response.url, destination=destination)
)
cooperative_dl.whenDone().addErrback(
partial(_rollback, r=response, f=filehandle, d=destination)
)
def _http_error_handler(self, failure, url=None, errback=None):
failure.trap(HTTPError)
# Log error message
if errback:
errback(failure)
@staticmethod
def _http_check_response(response):
response.raise_for_status()
return response
@property
def http_session(self):
if not self._http_session:
# Log session start
self._http_session = Session()
return self._http_session