Как получить идентификатор задачи из сельдерея в случае запланированных задач (бить) - PullRequest
1 голос
/ 27 июня 2019

Чтобы получить доступ к информации о задаче сельдерея, мне нужен task_id.Когда задача сельдерея запускается вручную, я легко могу получить идентификатор этой задачи с помощью task.id (и записать его в БД или сделать что-то еще).Если я использую сельдерей, который периодически отправляет задания работнику, это кажется невозможным.

Итак, мой вопрос: как получить идентификатор задачи в тот момент, когда удар отправляет задачу работнику сельдерея?

В тот момент, когда рабочий получает задание, консоль показывает идентификатор задачи.Поэтому меня беспокоит то, что в момент, когда задание отправлено рабочим в такт, у него нет идентификатора задачи.

Ручной случай для получения task_id:

task = tasks.LongRunningTask.delay(username_from_formTargetsLaden, password_from_formTargetsLaden, url_from_formTargetsLaden)

task_id = task.id

Возможно, некоторые из васесть идея?

1 Ответ

0 голосов
/ 27 июня 2019

Я нашел ответ на эту маленькую проблему:

Если вам нужен идентификатор задачи, которая была первоначально отправлена ​​в такт, вы можете просто добавить функцию проверки для вашего (запланированного) работника задача.

Настройка периодической задачи

Это расписание, которое «напоминает» сельдерея каждый день в 11:08 (UTC), чтобы запуститьзадача.

@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    test = sender.add_periodic_task(crontab(minute=8, hour=11), CheckLists.s(app.config['USR'], app.config['PWD']))

Задача, которая должна выполняться периодически

Это запланированная задача, которая будет выполняться сельдереем после того, как рабочий получил «напоминание» от удара.

@celery.task(bind=True)
def CheckLists(self, arg1, arg2):
    #get task_id von scheduled Task 'Check-List'
    i = inspect()
    activetasks = i.active()
    list_of_tasks = {'activetasks': activetasks}
    task_id = list_of_tasks['activetasks']['celery@DESKTOP-XXXXX'][0]['id']   #adapt this section depending on environment (local, webserver, etc...)
    task_type = "CHECK_LISTS"
    task_id_to_db = Tasks(task_id, task_type)
    db.session.add(task_id_to_db)
    db.session.commit()

    long_runnning_task 
    [...more task relevant code here...] 

Итак, я использую app.control.inspect, который позволяет вам проверять работающих работников.Он использует команды дистанционного управления под капотом.С i.active() вы получите словарь, который вы можете легко разобрать.

Пока я не найду никакой документации, как проще получить task_id из периодической задачи, я буду придерживаться этогорешение.

После сохранения идентификатора задачи вы можете легко опрашивать состояние задачи и т. д., например, с помощью AJAX.

Надеюсь, что это поможет вам, ребята:)

...