Я работаю над созданием моего первого конвейера Luigi, и в настоящее время я тестирую задачи по отдельности, прежде чем строить свои зависимости.Во время тестирования я использую версию следующего основного метода для создания задачи:
if __name__ == "__main__":
headers = dict()
headers["Content-Type"] = "application/json"
headers["Accept"] = "application/json"
luigi.build[(CSVValidator(jsonfile = '/sample_input/sample_csv.json',
docfile = None,
error_limit = 2,
order_fields = 3,
output_file = 'validation_is_us.txt',
header = headers)])
luigi.run()
Вот так выглядит мой csv_validator:
class CSVValidator(luigi.Task):
jsonfile = luigi.Parameter()
docfile = luigi.Parameter()
error_limit = luigi.Parameter()
order_fields = luigi.Parameter()
output_file = luigi.Parameter()
header = luigi.DictParameter()
def output(self):
return luigi.LocalTarget(self.output_file + "/csv_validator_data_%s.txt" % time.time())
def run(self):
output_file = self.output().open('w')
files = {}
data = {}
files["jsonfile"] = open(self.jsonfile, 'rb')
files["docfile"] = open(self.docfile, 'rb')
data["error_limit"] = self.error_limit
data["order_fields"] = self.order_fields
r = requests.post(*****~~~~~*****~~~~~,
headers=headers,
data=data, files=files)
task_response = r.text.encode(encoding="UTF-8")
print type(task_response)
print(task_response)
jsontaskdata = json.loads(task_response)
json.dump(jsontaskdata, output_file)
print("validated")
output_file.close()
Эта задача, однако,на самом деле никогда не бегать.Вместо этого центральный планировщик luigi утверждает, что эта задача уже выполнена:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 1 complete ones were encountered:
- 1 CSVValidator(...)
* 1 ran successfully:
- 1 Downloader(...)
Этот прогресс выглядит :), потому что не было ни неудачных задач, ни отсутствующих зависимостей
Другие задачи, которые я создал, Загрузчик дляНапример, выполняйте успешно каждый раз.Что определяет завершенную задачу здесь?Я не понимаю, что это значит.
Спасибо за ваше время!