У меня есть django-viewflow рабочий процесс, который включает в себя трехсторонний Split () для обработки электронной почты, SMS и т. Д. Поскольку для выполнения каждого из этих действий может потребоваться длительный период времени, я представляюкаждая из 3-х разделенных ветвей в виде пары узлов:
- Обычный узел Handler (), который порождает задание Celery.
- Пользовательский узел ожидания сельдерея.
Пользовательский узел выглядит следующим образом:
class CeleryEvent(mixins.TaskDescriptionViewMixin,
mixins.NextNodeMixin, mixins.DetailViewMixin,
mixins.UndoViewMixin,
mixins.CancelViewMixin, Event):
....
activation_class = derived-from-AbstractJobActivation
setting task_type = "somestring"
Вызов кода Viewflow при завершении задания Celery следует модели другого вопроса и конкретно включает в себязамок заложен там.Как правило, результат работает отлично.Однако время от времени я получаю это исключение из join.py:py1*601**
tasks = flow_class.task_class._default_manager.filter(
flow_task=flow_task,
process=process,
status=STATUS.STARTED)
if len(tasks) > 1:
raise FlowRuntimeError('More than one join instance for process found')
Viewflow 1.3.0 * 3 ветви присоединяются так:
close_join = flow.Join(wait_all=True). \
Next(this.alert_devops)
Я немногосбитый с толку причиной этого, как при проверке после ошибки, комбинация process и flow_task для close_join
в состоянии STARTED происходит дважды.Я задаюсь вопросом, может ли что-то, что я делаю, могло быть причиной проблемы.Насколько я знаю, ни один из моего кода на самом деле не пишет в эту таблицу напрямую.
Я отмечаю, что в таблице задач нет есть unique_together('process', 'flow_task')
, что, как я думаю может быть связано с тем, что цикл Viewflow вызовет многократное нажатие на одну и ту же flow_task.Поскольку в моем коде нет циклов (пока), я думаю, что было бы неплохо временно добавить такое ограничение;хотя бы тогда создатель нелегального государства станет точкой отказа?
Возможно ли, что взятие блокировки небезопасно для всех процессов?Поскольку Celery выполняет этот фрагмент кода на нескольких процессах в машине, это может объяснить проблему?
lock = self.flow_class.lock_impl(self.flow_class.instance)
with lock(self.flow_class, task.process_id):
#
# Re-acquire the task under a lock (see the StackOverflow thread).
#
task = self.flow_class.task_class._default_manager.get(pk=task.pk)
activation = self.activation_class()
activation.initialize(self, task)
activation.start()
activation.done()