Что определяет «завершенную» задачу Луиджи? - PullRequest
0 голосов
/ 25 октября 2018

Я работаю над созданием моего первого конвейера Luigi, и в настоящее время я тестирую задачи по отдельности, прежде чем строить свои зависимости.Во время тестирования я использую версию следующего основного метода для создания задачи:

if __name__ == "__main__":

    headers = dict()
    headers["Content-Type"] = "application/json"
    headers["Accept"] = "application/json"

    luigi.build[(CSVValidator(jsonfile = '/sample_input/sample_csv.json',
                docfile = None,
                error_limit = 2,
                order_fields = 3,
                output_file = 'validation_is_us.txt',
                header = headers)])

    luigi.run()

Вот так выглядит мой csv_validator:

class CSVValidator(luigi.Task):
    jsonfile = luigi.Parameter()
    docfile = luigi.Parameter()
    error_limit = luigi.Parameter()
    order_fields = luigi.Parameter()
    output_file = luigi.Parameter()
    header = luigi.DictParameter()

    def output(self):
        return luigi.LocalTarget(self.output_file + "/csv_validator_data_%s.txt" % time.time())

    def run(self):
        output_file = self.output().open('w')
        files = {}
        data = {}
        files["jsonfile"] = open(self.jsonfile, 'rb')
        files["docfile"] = open(self.docfile, 'rb')
        data["error_limit"] = self.error_limit
        data["order_fields"] = self.order_fields
        r = requests.post(*****~~~~~*****~~~~~,
                      headers=headers,
                      data=data, files=files)
        task_response = r.text.encode(encoding="UTF-8")
        print type(task_response)
        print(task_response)
        jsontaskdata = json.loads(task_response)
        json.dump(jsontaskdata, output_file)
        print("validated")
        output_file.close()

Эта задача, однако,на самом деле никогда не бегать.Вместо этого центральный планировщик luigi утверждает, что эта задача уже выполнена:

===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 CSVValidator(...)
* 1 ran successfully:
    - 1 Downloader(...)

Этот прогресс выглядит :), потому что не было ни неудачных задач, ни отсутствующих зависимостей

Другие задачи, которые я создал, Загрузчик дляНапример, выполняйте успешно каждый раз.Что определяет завершенную задачу здесь?Я не понимаю, что это значит.

Спасибо за ваше время!

1 Ответ

0 голосов
/ 03 января 2019

Целевой объект, который возвращается методом вывода, определяет, завершена ли Задача или нет.

Объект может быть создан, если определенный выходной файл уже существует, или какое-то другое условие, включаяналичие внешних ресурсов.Например, в luigi.contrib.esindex.py (проверка на наличие) индекса ElasticSearch в некотором удаленном кластере создаст объект Target и сообщит вам, что задача (CopyIndex) выполнена.

Возможно, вы также захотитепосмотрите на этот ответ: https://stackoverflow.com/a/34638943/4125622

И это обсуждение: https://github.com/spotify/luigi/issues/595

...