Сельдерей, с какого каталога начинать - PullRequest
0 голосов
/ 24 мая 2019

Мне нужна помощь в отношении работников сельдерея. Я особенно не в состоянии понять, откуда (из какого каталога) нужно запускать команду celery worker и какова ее концепция и некоторые вещи, связанные с импортом.

Допустим, у меня есть следующая структура каталогов:

.
├── __init__.py
├── entry.py
├── state1
│   ├── __init__.py
│   ├── family1
│   │   ├── __init__.py
│   │   ├── task1.py
│   │   ├── task2.py
│   │   └── task3.py
│   └── family2
│       ├── __init__.py
│       └── task1.py
└── state2
    ├── __init__.py
    ├── family1
    │   ├── __init__.py
    │   ├── task1.py
    │   └── task2.py
    └── family2
        ├── __init__.py
        ├── task1.py
        └── task2.py

. в корне находится текущий рабочий каталог с именем project

каждый из taskn.py (task1.py, task2.py и т. Д.) - это отдельные задачи. Каждый файл задачи выглядит примерно так:

from celery import Celery
from celery.result import AsyncResult
from kombu import Queue

_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost')
CELERY_CONFIG = {
    'CELERY_DEFAULT_QUEUE': 'default',
    'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
    'CELERY_TASK_SERIALIZER': 'pickle',
    'CELERY_ACCEPT_CONTENT': ['json','pickle']
}

celapp.conf.update(**CELERY_CONFIG)

@celapp.task()
def t1():
    print("starting task")
    time.sleep(5)
    print("Finished task")

Ниже приводится содержание entry.py:

import json
from flask_cors import CORS
from flask import Flask, Response, render_template
from flask import request, jsonify, redirect
from functools import wraps
<what would be the import statement to import all the tasks>

_name_ = "project_x"
app     = Flask(_name_)

@app.route("/api1", methods=['POST'])
def api1():
    req = request.jsonify
    if not req:
        return jsonify(success=False, msg="Missing request parameters", code="1")
    else:
        param1 = req.get('p1')
        param2 = req.get('p2')
        tId = startTask()
        return jsonify(success="True", msg="All Good", taskId=tId)


def startTask():
    tId = "abcd123"
    created_task = state1.family1.task1.subtask(queue='q1')
    created_task.delay()
    return tId


if __name__ == '__main__':
    app.run(debug=True, host="192.168.1.7", port="4444")

entry.py - это приложение-колба, из которого будет запускаться api1, а затем, в зависимости от параметров, я захочу запустить конкретную задачу.

Теперь вот мои вопросы:

  1. что будет оператор import для импорта всех задач в файле entry.py
  2. откуда я начинаю рабочего. Я имею в виду, из какого каталога я должен запустить команду Celery -A <directory name> worker -l info и почему?
  3. Во многих примерах я видел, что существует четкое разделение между задачами и файлом CeleryApp. Может кто-нибудь предложить, как лучше организовать мои задачи, конфиги сельдерея и т. Д. И как два вышеуказанных вопроса будут соответствовать этой новой предложенной структуре?

Ответы [ 2 ]

1 голос
/ 24 мая 2019

Хорошо, надеюсь, это поможет. Я отвечу в обратном порядке, как вы просили.

Во многих примерах я видел, что существует четкое разделение между задачами и файл CeleryApp. Может ли кто-нибудь, пожалуйста, предложить, что будет лучший способ организовать мои задачи, и конфиги сельдерея и т. д. и как бы вышеуказанные 2 вопроса соответствуют этой новой предложенной структуре?

Первая проблема, которую я вижу с добавленными вами фрагментами, каждый из имеющихся у вас taskn.py имеет свой собственный экземпляр celery. Вам нужно поделиться этим экземпляром между каждым taskn.py. я рекомендую создать celery_app.py

my_app
├── __init__.py
├── entry.py
├── celery_app.py
│   ├── ...

В этом файле вы создадите экземпляр сельдерея

from celery import Celery
from celery.result import AsyncResult
from kombu import Queue

_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost')
CELERY_CONFIG = {
    'CELERY_DEFAULT_QUEUE': 'default',
    'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
    'CELERY_TASK_SERIALIZER': 'pickle',
    'CELERY_ACCEPT_CONTENT': ['json','pickle']
}

celapp.conf.update(**CELERY_CONFIG)
celery_app.conf.imports = [
    'state1.family1.task1',
    'my_app.state1.family1.task2',  # Or Maybe
    ...
]

Затем в каждом taskn.py вы можете импортировать этот экземпляр, и каждое задание будет зарегистрировано под тем же приложением сельдерея

from my_app.celery_app import celapp

@celapp.task()
def t1():
    print("starting task")
    time.sleep(5)
    print("Finished task")

откуда я начинаю рабочего. Я имею в виду, из какого каталога я должен запустить команду Celery -A рабочий -l info и почему?

Тогда вы должны легко позвонить Celery -A my_app.celery_app worker -l info, потому что ваш экземпляр сельдерея будет в модуле my_app, подмодуле celery_app

что будет оператор import для импорта всех задач в entry.py

Наконец, с entry.py вы можете сделать import state1.family1.task1 import t1 и вызвать t1.delay() или любую зарегистрированную задачу.

0 голосов
/ 01 июня 2019

Поэтому, следуя совету @Patricio, казалось, что это действительно была ошибка импорта.Моя новая структура каталогов выглядит следующим образом:

.
├── __init__.py
├── celeryConfig
│   ├── __init__.py
│   └── celeryApp.py
├── entry.py
├── state1
│   ├── __init__.py
│   ├── family1
│   │   ├── __init__.py
│   │   ├── task1.py
│   │   ├── task2.py
│   │   └── task3.py
│   └── family2
│       ├── __init__.py
│       └── task1.py
└── state2
    ├── __init__.py
    ├── family1
    │   ├── __init__.py
    │   ├── task1.py
    │   └── task2.py
    └── family2
        ├── __init__.py
        ├── task1.py
        └── task2.py

, тогда как содержимое celeryConfig/celeryApp.py выглядит следующим образом:

from celery import Celery
from celery.result import AsyncResult
from kombu import Queue

_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost', include=['state1.family1.task1'])
CELERY_CONFIG = {
    'CELERY_DEFAULT_QUEUE': 'default',
    'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
    'CELERY_TASK_SERIALIZER': 'pickle',
    'CELERY_ACCEPT_CONTENT': ['json','pickle']
}

celapp.conf.update(**CELERY_CONFIG)

, а содержимое taskn.py выглядит примерно так:

from celeryConfig.celeryApp import celapp
import time

@celapp.task()
def t1():
    print("starting task")
    time.sleep(5)
    print("Finished task")

, в то время как entry.py остается, как есть, с одним изменением, как показано ниже:

from state1.family1.task1 import t1

А теперь, когда сельдерей запускается как: celery -A celeryConfig.celeryApp worker -l infoиз корневого каталога project все работает нормально.В качестве результата вышеприведенной команды я получаю сообщение

.
.
.
[tasks]
  . state1.family1.task1.t1

.
.
.

, указывающее, что сельдерей запущен правильно и что задача действительно была зарегистрирована.Итак, теперь, чтобы зарегистрировать все задачи, я могу прочитать каталог / каталоги и динамически создать список include в celeryApp.py.(Опубликуем больше об этом, как только сделаем)

Спасибо @ Патрисио

...