Как собрать результаты задачи в Трио? - PullRequest
0 голосов
/ 05 октября 2018

Я написал скрипт, который использует детскую и модуль asks для циклического прохождения и вызова API на основе переменных цикла.Я получаю ответы, но не знаю, как вернуть данные, как если бы вы использовали asyncio.

У меня также есть вопрос об ограничении API до 5 в секунду.

from datetime import datetime
import asks
import time
import trio

asks.init("trio")
s = asks.Session(connections=4)

async def main():
    start_time = time.time()

    api_key = 'API-KEY'
    org_id = 'ORG-ID'
    networkIds = ['id1','id2','idn']

    url = 'https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600'
    headers = {'X-Cisco-Meraki-API-Key': api_key, 'Content-Type': 'application/json'}

    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers)

    print("Total time:", time.time() - start_time)



async def fetch(url, headers):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)




if __name__ == "__main__":
    trio.run(main)

Когда язапустите nursery.start_soon (fetch ...), я печатаю данные в fetch, но как мне вернуть данные?Я не видел ничего похожего на функцию asyncio.gather (* tasks).

Кроме того, я могу ограничить количество сеансов до 1-4, что помогает опуститься ниже предела 5 API в секунду, ноИнтересно, есть ли встроенный способ гарантировать, что в любую секунду вызывается не более 5 API?

Ответы [ 4 ]

0 голосов
/ 05 июня 2019

Исходя из этого ответа , вы можете определить следующую функцию:

async def gather(*tasks):

    async def collect(index, task, results):
        task_func, *task_args = task
        results[index] = await task_func(*task_args)

    results = {}
    async with trio.open_nursery() as nursery:
        for index, task in enumerate(tasks):
            nursery.start_soon(collect, index, task, results)
    return [results[i] for i in range(len(tasks))]

Затем вы можете использовать трио точно так же, как и асинхронное, просто пропатчив трио (добавив сборку).функция):

import trio
trio.gather = gather

Вот практический пример:

async def child(x):
    print(f"Child sleeping {x}")
    await trio.sleep(x)
    return 2*x

async def parent():
    tasks = [(child, t) for t in range(3)]
    return await trio.gather(*tasks)

print("results:", trio.run(parent))
0 голосов
/ 05 октября 2018

Когда я запускаю nursery.start_soon (fetch ...), я печатаю данные в fetch, но как мне вернуть данные?Я не видел ничего похожего на функцию asyncio.gather (* tasks).

Вы задаете два разных вопроса, поэтому я просто отвечу на этот.Матиас уже ответил на другой ваш вопрос.

Когда вы звоните start_soon(), вы просите Трио запустить задание в фоновом режиме, а затем продолжайте.Вот почему Trio может запускать fetch() одновременно несколько раз.Но поскольку Trio продолжает работать, нет способа «вернуть» результат, как это обычно делает функция Python.куда бы он вообще вернулся?

Вы можете использовать очередь, чтобы задачи fetch() могли отправлять результаты другой задаче для дополнительной обработки.

Чтобы создать очередь:

response_queue = trio.Queue()

Когда вы запускаете задачи извлечения, передайте очередь в качестве аргумента и отправьте sentintel в очередь, когда закончите:

async with trio.open_nursery() as nursery:
    for i in networkIds:
        nursery.start_soon(fetch, url.format(i), headers)
await response_queue.put(None)

После загрузки URL-адреса поместите ответ в очередь.:

async def fetch(url, headers, response_queue):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    # Add responses to queue
    await response_queue.put(response)
    print("Finished: ", url, len(response.content), response.status_code)

С учетом вышеуказанных изменений ваши задачи извлечения будут помещать ответы в очередь.Теперь вам нужно прочитать ответы из очереди, чтобы вы могли обработать их.Для этого вы можете добавить новую функцию:

async def process(response_queue):
    async for response in response_queue:
        if response is None:
            break
        # Do whatever processing you want here.

Эту функцию процесса следует запускать как фоновую задачу, прежде чем запускать любые задачи извлечения, чтобы она обрабатывала ответы сразу после их получения.

Подробнее см. В разделе Синхронизация и обмен данными между задачами в документации Trio.

0 голосов
/ 06 декабря 2018

Технически, trio.Queue устарела в трио 0,9.Его заменили на trio.open_memory_channel.

Краткий пример:

sender, receiver = trio.open_memory_channel(len(networkIds)
async with trio.open_nursery() as nursery:
    for i in networkIds:
        nursery.start_soon(fetch, sender, url.format(i), headers)

async for value in receiver:
    # Do your job here
    pass

А в вашей функции fetch вы должны вызывать async sender.send(value) где-нибудь.

0 голосов
/ 05 октября 2018

Возвращение данных: передайте networkID и dict задачам fetch:

async def main():
    …
    results = {}
    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers, results, i)
    ## results are available here

async def fetch(url, headers, results, i):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)
    results[i] = response

Альтернативно, создайте trio.Queue, к которому вы put получите результаты;Ваша основная задача может затем прочитать результаты из очереди.

Ограничение API: создайте trio.Queue(10) и запустите задачу следующим образом:

async def limiter(queue):
    while True:
        await trio.sleep(0.2)
        await queue.put(None)

Передайте эту очередь на fetch, в качестве другого аргумента, и вызывайте await limit_queue.get() перед каждым вызовом API.

...