Asyncio: очередь заблокирована на пут - PullRequest
0 голосов
/ 22 июня 2019

Я принял код из библиотеки janus . Я хочу добавить две функции в существующий код. Во-первых, это пакетная обработка, которая работает сама по себе. Во-вторых, повторная попытка положить элементы обратно в очередь. Проблема в том, что блок if, который я добавил для повторного ввода элементов обратно в очередь, продолжает блокировать код. Что мне здесь не хватает в асинхронных очередях?

import asyncio
import janus

loop = asyncio.get_event_loop()
queue = janus.Queue(maxsize=10, loop=loop)


def threaded(sync_q):
    for i in range(100):
        sync_q.put(i)
        print(f"Put {i} into queue")
    sync_q.put(None)  # queue end signal
    sync_q.join()


async def async_batch_coro(async_q, batch_size=3):
    while True:
        batch = []
        for _ in range(batch_size):
            val = await async_q.get()
            if val is not None:
                batch.append(val)
            else:
                async_q.task_done()
                break

        for i in batch:
            # If I remove the `if` block here, code works just fine with batching
            if i % 11 == 0:
                await async_q.put(i + 1)
                print(f"Put {i} back into queue")
            async_q.task_done()
        print(f"Received: {batch}")
        if len(batch) < batch_size:
            break


fut = loop.run_in_executor(None, threaded, queue.sync_q)
loop.run_until_complete(async_batch_coro(queue.async_q))
loop.run_until_complete(fut)
...