Рассмотрим следующие задачи:
import luigi
class YieldFailTaskInBatches(luigi.Task):
def run(self):
for i in range(5):
yield [
FailTask(i, j)
for j in range(2)
]
class YieldAllFailTasksAtOnce(luigi.Task):
def run(self):
yield [
FailTask(i, j)
for j in range(2)
for i in range(5)
]
class FailTask(luigi.Task):
i = luigi.IntParameter()
j = luigi.IntParameter()
def run(self):
print("i: %d, j: %d" % (self.i, self.j))
if self.j > 0:
raise Exception("i: %d, j: %d" % (self.i, self.j))
Сбой FailTask
, если j > 0
.YieldFailTaskInBatches
выдает FailTask
несколько раз внутри цикла for, тогда как YieldAllFailTasksAtOnce
возвращает все задачи в массиве.
Если я запускаю YieldFailTaskInBatches
, Луиджи запускает задачи, полученные в первом цикле, итак как один из них терпит неудачу (i = 0, j = 1
), Луиджи не уступает остальным.Если я запускаю YieldAllFailTasksAtOnce
, Луиджи выполняет все задачи, как ожидалось.
Мой вопрос таков: как я могу сказать Луиджи продолжать выполнение оставшихся задач на YieldFailTasksInBatches
, даже если некоторые задачи не были выполнены?Возможно ли это вообще?
Причина, по которой я спрашиваю, состоит в том, что у меня есть около ~ 400 тыс. Задач для запуска. Я не хочу запускать их все сразу, так как это заставит Луиджи тратить слишком многомного времени для создания требований каждой задачи (они могут иметь от 1 до 400 требований). Мое текущее решение состоит в том, чтобы выдавать их партиями, по несколько штук за раз, но затем, если какой-либо из них не удался, задача останавливается, а остальные не приносятся.
Похоже, что эта проблема могла бы решить эту проблему, если бы она была реализована, но мне интересно, есть ли другой способ.