Задания застряли в очереди из сельдерея - PullRequest
1 голос
/ 04 февраля 2020

Я пытался реализовать очередь задач с помощью очереди Redis, которая располагалась поверх Redis. Я избавился от этого и пошел к Celery поверх RabbitMQ, основываясь на проблемах, которые у меня были, как описано здесь: Блокировка очереди Redis

Я ссылаюсь на приведенный выше (без ответа) ТАК вопрос, как я полагаю эти две проблемы достаточно похожи, чтобы быть потенциально связанными - будь то код или настройка с моей стороны.

Я могу отправлять задачи в свою очередь Celery и вижу, что они находятся там, либо вызывая rabbitmqctl list_queues внутри моего Кролика docker контейнера bash или по телефону

>>> add_nums.delay(2,3)

<AsyncResult: 197315b1-e18b-4945-bf0a-cc6b6b829bfb>

>>> result = add_nums.AsyncResult( 197315b1-e18b-4945-bf0a-cc6b6b829bfb)

где

>>> result.status 'PENDING'

независимо от того, сколько раз я проверял.

Я пытался добавить ignore_result=True в вызове декоратора, но это не имеет никакого эффекта.

мой рабочий класс:

./workerA.py
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger( __name__)

# Celery configuration
CELERY_BROKER_URL = 'amqp://***:***@rabbit:5672/' #where the asterisks indicate user, pwd
CELERY_RESULT_BACKEND = 'rpc://'

# Initialize celery
celery = Celery( 'workerA', 
        broker=CELERY_BROKER_URL,
        backend=CELERY_RESULT_BACKEND)

@celery.task( ignore_result=True)
def add_nums( a, b):
    logger.info( f'{ a+b=}')
    return a+b

Мой основной:

./app.py
import logging
from flask.logging import default_handler
from workerA import add_nums
from workerB import sub_nums
from flask import (
        Flask,
        request,
        jsonify,
    )

logger = logging.getLogger( )
logger.addHandler( default_handler)
logger.setLevel( logging.INFO)

app = Flask( __name__)

@app.route( '/')
def index():
    return 'hello world!'

@app.route( '/add')
def add():
    logger.info( 'in add method')
    first_num, second_num = ( 1,2)

    logger.info( f' { first_num=}')

    result = add_nums.delay( first_num, second_num)
    logger.info( f' {result=}')
    logger.info( f' {result.state=}')

    return jsonify({ 'result': result.result}), 200

@app.route( '/subtract')
def subtract():
    logger.info( 'in sub method')
    first_num, second_num = ( 1,2)

    result = sub_nums.delay( first_num, second_num)
    logger.info( f' {result=}')

    return jsonify( {'result': result.result}), 200

if __name__ == '__main__':
    app.run( debug=True)

Вызов result.get( timeout=n) всегда приводит к таймауту, независимо от того, как установлено n: короче говоря, эти очереди не выполняются.

Для полноты моего docker -compose.yml:

version: "3"
services:
  web:
    build:
      context: .
      dockerfile: Dockerfile
    restart: always
    ports:
      - 5000:5000
    command: python ./app.py -h 0.0.0.0
    depends_on:
      - rabbit
    volumes:
      - .:/app
  rabbit:
    hostname: rabbit
    image: rabbitmq:management
    environment:
      - RABBITMQ_DEFAULT_USER=***
      - RABBITMQ_DEFAULT_PASS=***
    ports:
      - "5673:5672"
      - "15672:15672"
  worker_1:
    build:
      context: .
    hostname: worker_1
    entrypoint: celery
    command: -A workerA worker --loglevel=info -Q workerA
    volumes:
      - .:/app
    links:
      - rabbit
    depends_on:
      - rabbit
  worker_2:
    build:
      context: .
    hostname: worker_2
    entrypoint: celery
    command: -A workerB worker --loglevel=info -Q workerB
    volumes:
      - .:/app
    links:
      - rabbit
    depends_on:
      - rabbit

и моего документа kerfile:

FROM python:3

ADD requirements.txt /app/requirements.txt

WORKDIR /app/

RUN pip install -r requirements.txt

EXPOSE 5000

Я использую Docker Рабочий стол для Ма c 2.2.0.0 и мой OSX 10.15.2 (Catalina)

любая помощь в этом вопросе будет с благодарностью. Эти проблемы с очередью стали для меня серьезным препятствием

1 Ответ

1 голос
/ 05 февраля 2020

Может показаться, что причина этой проблемы в том, что нет настроенного бэкэнда для хранения результатов. Создание экземпляра объекта Celery с помощью Celery (..., backend = 'rp c: //'), по-видимому, не делает ничего, кроме отключения ошибки "NotImplementedError: Бэкэнд не настроен", который вы могли бы получить в противном случае. Я считаю, что документация в этом смысле вводит в заблуждение.

Доступ к пробной версии Redis для производительности. У меня также есть Elasticsearch и MongoDB, используемые в других местах для моего приложения, на которые я мог бы ориентироваться, но мне больше нравится Redis. Получит результаты, когда это будет сделано, после обеда.

...