Использование asyncio.Queue для параллельного потока производитель-потребитель в Python3 - PullRequest
0 голосов
/ 08 марта 2019

Я хочу сделать некоторые вычисления для строк огромного файла, и я считаю, что чтение строк занимает менее 5% времени, вычисление каждой строки занимает 95%.

Тогда я нахожу asyncio.Queue кажетсяхороший реферат для моих нужд.Однако для достижения параллели необходимо использовать concurrent.futures.ThreadPoolExecutor или ProcessPoolExecutor.Без них asyncio является однопоточным.

Я не могу найти пример, чтобы знать, как его написать.Большинство примеров печатает только в потребителе, без возврата каких-либо значений.

Не могли бы вы написать пример для меня?

Для демонстрации производитель поставил в очередь много строк из 10 чисел, 5 потребителейразделите строку и вычислите сумму и среднее для одной строки, а main () суммирует суммы и средние значения, чтобы напечатать два результата в конце.


Пример по умолчанию - однопоточный.Просто измените await asyncio.sleep(sleep_for) на for i in range(10000000):pass.

И тогда только «рабочий-0» появился.% CPU всегда меньше 100%.

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()
        print(f'{name} has Begin')

        # Sleep for the "sleep_for" seconds.
        #await asyncio.sleep(sleep_for)
        for i in range(10000000):pass

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

Кроме того, я надеюсь, что код обратно совместим с Python 3.5.3, так как PyPy выглядит быстрее.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...