Я использую Celery v4.4 в сочетании с asyncio из https://pypi.org/project/celery-pool-asyncio/. Все работает нормально, однако я хочу использовать группы сейчас, чтобы мгновенно поместить 100 задач в очередь из сельдерея. Я использовал пример документации сельдерея, а также попробовал этот Добавьте n задач в очередь сельдерея и дождитесь результатов . '' '
# Start recording time
begin = time.time()
future = []
for i in calcs:
future.append(await calculation_of_primes.delay(i))
result = []
for i in future:
result.append(await i.get())
# end recording time.
end = time.time()
print(f'run of seperate: {end-begin}')
"""UNTIL HERE IT WORKS FINE!"""
begin2 = time.time()
job = group(
calculation_of_primes.s(10000),
calculation_of_primes.s(10000),
calculation_of_primes.s(10000),
calculation_of_primes.s(10000),
calculation_of_primes.s(10000))
res = job.apply_async()
result = res.get()
end2 = time.time()
print(f'run of together: {end2-begin2}')
Первая часть работает нормально, но я получаю ошибки по второй части. Это ошибки:
-
File "/home/sam/Desktop/revolve/pycelery/speedtest.py", line 51, in run
result = res.get()
File "/home/sam/Desktop/revolve/.venv/lib/python3.7/site-packages/celery/result.py", line 703, in get
on_interval=on_interval,
File "/home/sam/Desktop/revolve/.venv/lib/python3.7/site-packages/celery/result.py", line 822, in join_native
on_message, on_interval):
File "/home/sam/Desktop/revolve/.venv/lib/python3.7/site-packages/celery/backends/asynchronous.py", line 144, in iter_native
for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs):
TypeError: 'async_generator' object is not iterable