Оптимальная архитектура для асинхронной программы с asyncio и aiohttp - PullRequest
0 голосов
/ 28 июня 2018

Я пытаюсь понять, как лучше всего построить программу, выполнив следующее:

Рассмотрим несколько анализов. Каждый анализ запрашивает данные из нескольких источников данных (API REST). В каждом анализе, когда все данные собираются из источников данных, данные проверяются на одно или несколько условий. Если эти условия выполнены, другой запрос делается к другому источнику данных.

Цель состоит в том, чтобы собрать данные для всех асинхронных анализов, проверить условия для каждого анализа, запросить, выполнены ли условия, а затем повторить. Таким образом, следующие требования:

  1. Данные проверяются для условий после . Все данные собираются в конкретном анализе, а не после того, как данные собраны во всех анализах.
  2. Если условия соблюдены, запрос делается первым делом, а не после того, как условия проверены для всех анализов.
  3. Получение данных -> проверка условий -> возможно, цикл запроса чего-либо запланирован для запуска каждые X минут или часов.

Я сделал следующее демо:

import asyncio
import random


async def get_data(list_of_data_calls):
    tasks = []
    for l in list_of_data_calls:
        tasks.append(asyncio.ensure_future(custom_sleep(l)))
    return await asyncio.gather(*tasks)


async def custom_sleep(time):
    await asyncio.sleep(time)
    return random.randint(0, 100)


async def analysis1_wrapper():
    while True:
        print("Getting data for analysis 1")
        res = await get_data([5, 3])
        print("Data collected for analysis 1")
        for integer in res:
            if integer > 80:
                print("Condition analysis 1 met")
            else:
                print("Condition analysis 1 not met")
        await asyncio.sleep(10)


async def analysis2_wrapper():
    while True:
        print("Getting data for analysis 2")
        res = await get_data([5, 3])
        print("Data collected for analysis 2")
        for integer in res:
            if integer > 50:
                print("Condition analysis 2 met")
            else:
                print("Condition analysis 2 not met")
        await asyncio.sleep(10)


loop = asyncio.get_event_loop()
tasks = analysis1_wrapper(), analysis2_wrapper()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

Это дает следующий вывод:

Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 not met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met
Getting data for analysis 2
Getting data for analysis 1
Data collected for analysis 2
Condition analysis 2 not met
Condition analysis 2 not met
Data collected for analysis 1
Condition analysis 1 not met
Condition analysis 1 not met

Кажется, это работает, как я хочу. Однако из-за моего ограниченного опыта работы с asyncio и aiohttp я не уверен, является ли это хорошим способом сделать это. Я хочу иметь возможность добавлять шаги в конвейер в будущем, например, делать что-то на основе логики вокруг запроса, если выполняются условия. Кроме того, он должен масштабироваться до многих анализов без потери слишком большой скорости.

1 Ответ

0 голосов
/ 29 июня 2018

Да, это в основном так. Несколько вещей для размышления:

  1. Лимит параллелизма.

Хотя у вас может быть неограниченное количество одновременных задач, но по мере увеличения параллелизма время отклика каждой задачи также увеличивается, при этом пропускная способность перестает расти в какой-то момент или даже уменьшается. Поскольку существует только один основной поток для выполнения всего, обратные вызовы должны стоять в очереди, когда их слишком много, даже если сетевой ответ пришел несколько миллисекунд назад. Чтобы сбалансировать это, вам обычно требуется семафор для управления максимальным параллелизмом для производительности.

  1. Операции с интенсивным использованием процессора.

Ваш код не показывался, но беспокоит то, что проверка состояния может быть интенсивной ЦП. В этом случае вы должны отложить задачу в пул потоков (без проблемы GIL) или подпроцессах (если проблема с GIL) по двум причинам: 1. Остановить заблокированный основной поток от нарушения параллелизма. 2. Используйте несколько процессоров более эффективно.

  1. Управление задачами.

Ваш текущий код спит 10 секунд в цикле для каждого анализа. Это затрудняет постепенное отключение анализаторов, не говоря уже о масштабировании на лету. Идеальной моделью может быть шаблон производитель-потребитель , в котором вы создаете задачи с каким-либо управлением в очередь , а группа рабочих извлекает задачи из очереди и работает с ними одновременно.

...