Python: асинхронно обрабатывать задачи, которые являются результатом других асинхронных задач - PullRequest
0 голосов
/ 14 января 2019

Я пытаюсь получить данные всех транзакций для нескольких адресов из API. Каждый адрес может иметь несколько страниц транзакций, которые я узнаю только тогда, когда запрашиваю первую страницу.

У меня есть методы api.get_address_data(address, page) и api.get_transaction_data(tx).

Синхронный код для того, что я хочу сделать, будет выглядеть так:

def all_transaction_data(addresses):
    for address in addresses:
        data = api.get_address_data(address, page=0)
        transactions = data.transactions
        for n in range(1, data.total_pages):
            next_page = api.get_address_data(address, page=n)
            transactions += next_page.transactions
        for tx in data.transactions:
            yield api.get_transaction_data(tx)

Меня не волнует порядок полученных транзакций (мне придется изменить их порядок, когда все они будут готовы). Я могу разместить все данные в памяти, но это очень мало запросов, поэтому я бы хотел делать как можно больше параллельно.

Каков наилучший способ сделать это? Я играл с asyncio (вызовы API находятся под моим контролем, поэтому я могу преобразовать их в async), но у меня проблемы с чередованием слоев: мое лучшее решение - сначала выбрать все адреса, затем перечислить все страницы и наконец получить все транзакции в одной большой партии. Мне бы хотелось, чтобы каждый шаг обработки планировался немедленно, когда соответствующие входные данные готовы, и результаты собираются в один большой список (или выводятся из одного генератора).

Кажется, мне нужна какая-то очередь задач с открытым концом, где задача "get-address" извлекает данные и ставит в очередь кучу задач "get-pages", которые, в свою очередь, ставят в очередь задачи "get -action", и только потом они собираются в список результатов?

Можно ли это сделать с помощью asyncio? Будет ли что-то вроде gevent более подходящим, или, может быть, просто ThreadPoolExecutor? Есть ли лучший подход, чем я описал до сих пор?

Обратите внимание, что я хотел бы избежать инверсии потока управления или, по крайней мере, скрыть это как деталь реализации. То есть, вызывающий этот код должен иметь возможность просто позвонить for tx in all_transaction_data() или в худшем случае async for.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...