Я принял код из библиотеки janus . Я хочу добавить две функции в существующий код. Во-первых, это пакетная обработка, которая работает сама по себе. Во-вторых, повторная попытка положить элементы обратно в очередь. Проблема в том, что блок if
, который я добавил для повторного ввода элементов обратно в очередь, продолжает блокировать код. Что мне здесь не хватает в асинхронных очередях?
import asyncio
import janus
loop = asyncio.get_event_loop()
queue = janus.Queue(maxsize=10, loop=loop)
def threaded(sync_q):
for i in range(100):
sync_q.put(i)
print(f"Put {i} into queue")
sync_q.put(None) # queue end signal
sync_q.join()
async def async_batch_coro(async_q, batch_size=3):
while True:
batch = []
for _ in range(batch_size):
val = await async_q.get()
if val is not None:
batch.append(val)
else:
async_q.task_done()
break
for i in batch:
# If I remove the `if` block here, code works just fine with batching
if i % 11 == 0:
await async_q.put(i + 1)
print(f"Put {i} back into queue")
async_q.task_done()
print(f"Received: {batch}")
if len(batch) < batch_size:
break
fut = loop.run_in_executor(None, threaded, queue.sync_q)
loop.run_until_complete(async_batch_coro(queue.async_q))
loop.run_until_complete(fut)