Повторите потерянные или не выполненные задачи (сельдерей, Django и RabbitMQ) - PullRequest
15 голосов
/ 17 марта 2011

Есть ли способ определить, потеряно ли какое-либо задание, и повторить его?


Я думаю, что причиной пропажи может быть ошибка диспетчера или сбой рабочего потока.

Я планировал повторить их, но я не уверен, как определить, какие задачи нужно удалить?

А как сделать этот процесс автоматически? Могу ли я использовать свой собственный планировщик, который будет создавать новые задачи?

Редактировать: я нашел из документации, что RabbitMQ никогда не теряет задачи, но что происходит, когда рабочий поток падает в середине выполнения задачи?

1 Ответ

29 голосов
/ 17 марта 2011

Вам нужно установить

CELERY_ACKS_LATE = True

Позднее подтверждение означает, что сообщения задачи будут подтверждены после ее выполнения, а не только до этого, что является поведением по умолчанию,Таким образом, если рабочий сбой, у кролика MQ все равно будет сообщение.

Очевидно, что полного сбоя (Кролик + рабочие) в то же время нет способа восстановить задачу, кроме случаев, когда вы ведете журналирование.о начале и завершении задачи.Лично я пишу в mongodb строку каждый раз, когда запускается задача, и другую, когда задача завершается (независимо формирует результат), таким образом я могу узнать, какая задача была прервана, анализируя журналы mongo.

Выэто можно легко сделать, переопределив методы __call__ и after_return базового класса задач сельдерея.

Ниже вы увидите фрагмент моего кода, который использует класс taskLogger в качестве диспетчера контекста (с точкой входа и выхода)).Класс taskLogger просто записывает строку, содержащую информацию о задаче в экземпляре mongodb.

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

Надеюсь, это может помочь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...