Asyncio задача, которая добавляет свои копии в цикл - PullRequest
0 голосов
/ 11 июня 2018

Я довольно новичок в Asyncio, и я не могу понять это сам в настоящее время и буду очень признателен за любую помощь.

Вариант использования следующий:

  • Есть веб-сервис, который мне нужен для извлечения данных из этих дросселей и черных списков, когда к нему отправляется слишком много запросов.
  • Мне нужно сделать большое количество запросов к этому веб-сервису для данных
  • Веб-служба отправляет данные постраничным способом, т. Е. Когда данных слишком много для данного запроса, необходимо выполнить последующие запросы, чтобы получить больше страниц.
  • Требуется ли больше страницизвлеченные данные можно выяснить, изучив ответ на конкретный запрос.
  • Как только данные получены на стороне клиента, их необходимо записать на диск

Итак, на мой взгляд,установка может быть следующей: - Подготовлен начальный список запросов, которые необходимо сделать. - Семафор контролирует, сколько запросов делается за единицу времени для управления дросселированием.- Все начальные запросы добавляются в цикл.- Когда получен ответ, для сохранения данных отправляется отдельная сопрограмма (или, может быть, поток?).Я не хочу, чтобы упорство блокировало получение большего количества данных.- Когда ответ получен, он проверяется, чтобы узнать, нужно ли запрашивать больше страниц для получения полных данных.Если требуется больше страниц, тогда в цикл добавляется еще одна задача для извлечения следующей страницы.

Я написал некоторый код для минимального примера, который должен обеспечить основу для того, чего я пытаюсь достичь:

import asyncio
import time 
import datetime
from random import random

sema = asyncio.Semaphore(2)

async def my_worker():
    async with sema:
        print("{}".format(datetime.datetime.now()))
        print("I'm going to fetch some data")
        result = await data_fetcher()

        print("I'm going to save data to disk")
        await write_result_to_disk(result)

        if random() > 0.5:
            print("I need to create and run a new worker here to fetch more data")

async def data_fetcher():
    await asyncio.sleep(3)
    return "Bla bla bla"

async def write_result_to_disk(result):
    await asyncio.sleep(3)
    print(result)

blah = [my_worker(), my_worker(), my_worker(), my_worker(), my_worker()]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*blah))
print("All Workers Completed")
loop.close()

Это похоже на правильную настройку семафора и запуск рабочих, но оставляет несколько вопросов без ответа:

  • Во-первых, как мне динамически добавить больше работников (для полученияпоследующие страницы) в цикл?
  • Как я могу обработать постоянство, чтобы оно не блокировало выборку данных?
  • Предполагая, что несколько страниц должны оказаться в одном файле, как я могубезопасно собрать все данные из этих запросов, объединить их, а затем сохранить, не блокируя другие запросы на выборку данных?

Заранее благодарен за любую помощь!

1 Ответ

0 голосов
/ 11 июня 2018

Прежде всего, как мне динамически добавлять в цикл больше рабочих (для извлечения последующих страниц)?

Вы можете ставить новые сопрограммы в цикл событий с помощью asyncio.ensure_future.

Как я могу обработать постоянство, чтобы оно не блокировало выборку данных?

Если вы говорите о записи в базу данныхЕсть библиотеки для этого.Если вы говорите о записи в файл, то это сложно - локальный файловый ввод-вывод почти всегда блокируется, поэтому вам придется делегировать работу отдельному потоку.К счастью, asyncio предоставляет для этого помощника: loop.run_in_executor.

Если предположить, что несколько страниц должны оказаться в одном файле, как я могу безопасно собрать все данные изэти запросы, объединить их, а затем сохранить, не блокируя другие запросы на выборку данных?

Это начинает выходить за пределы хорошего вопроса для SO.Для этого вам следует ознакомиться с различными шаблонами параллелизма.

...