Как игнорировать сбои в задачах Luigi, вызванных выполнением другой задачи () - PullRequest
0 голосов
/ 28 ноября 2018

Рассмотрим следующие задачи:

import luigi


class YieldFailTaskInBatches(luigi.Task):
    def run(self):
        for i in range(5):
            yield [
                FailTask(i, j)
                for j in range(2)
            ]


class YieldAllFailTasksAtOnce(luigi.Task):
    def run(self):
        yield [
            FailTask(i, j)
            for j in range(2)
            for i in range(5)
        ]

class FailTask(luigi.Task):
    i = luigi.IntParameter()
    j = luigi.IntParameter()

    def run(self):
        print("i: %d, j: %d" % (self.i, self.j))
        if self.j > 0:
            raise Exception("i: %d, j: %d" % (self.i, self.j))

Сбой FailTask, если j > 0.YieldFailTaskInBatches выдает FailTask несколько раз внутри цикла for, тогда как YieldAllFailTasksAtOnce возвращает все задачи в массиве.

Если я запускаю YieldFailTaskInBatches, Луиджи запускает задачи, полученные в первом цикле, итак как один из них терпит неудачу (i = 0, j = 1), Луиджи не уступает остальным.Если я запускаю YieldAllFailTasksAtOnce, Луиджи выполняет все задачи, как ожидалось.

Мой вопрос таков: как я могу сказать Луиджи продолжать выполнение оставшихся задач на YieldFailTasksInBatches, даже если некоторые задачи не были выполнены?Возможно ли это вообще?

Причина, по которой я спрашиваю, состоит в том, что у меня есть около ~ 400 тыс. Задач для запуска. Я не хочу запускать их все сразу, так как это заставит Луиджи тратить слишком многомного времени для создания требований каждой задачи (они могут иметь от 1 до 400 требований). Мое текущее решение состоит в том, чтобы выдавать их партиями, по несколько штук за раз, но затем, если какой-либо из них не удался, задача останавливается, а остальные не приносятся.

Похоже, что эта проблема могла бы решить эту проблему, если бы она была реализована, но мне интересно, есть ли другой способ.

1 Ответ

0 голосов
/ 05 декабря 2018

Это очень хакерский, но он должен делать то, что вы хотите:

class YieldAll(luigi.Task):
    def run(self):
        errors = list()
        for i in range(5):
            for j in range(2):
                try:
                    FailTask(i, j).run()
                except Exception as e:
                    errors.append(e)

        if errors:
            raise ValueError(f' all traceback: {errors}')

class FailTask(luigi.Task):
    i = luigi.IntParameter()
    j = luigi.IntParameter()

    def run(self):
        print("i: %d, j: %d" % (self.i, self.j))
        if self.j > 0:
            raise Exception("i: %d, j: %d" % (self.i, self.j))

, так что в основном вы запускаете задачу вне контекста luigi.если вы не выведете цель, luigi никогда не узнает, была ли она запущена или нет.

единственная задача, которую знает luigi - это YieldAll.Если какой-либо из YieldAll создает ошибку, код перехватит ее и установит для задачи YieldAll статус ошибки.

...