У меня есть три задачи, каждая из которых имеет MongoRangeTarget, и вторая задача возвращает невыполненную зависимость, хотя она успешно выполняется и приводит к сбою третьих задач.
Сценарий ниже:
import luigi
from luigi.contrib.mongodb import MongoRangeTarget
class TaskAInsertTarget(MongoRangeTarget):
def __init__(self, client: pymongo.MongoClient, db: str, collection: str,
ids: list, field: str):
super().__init__(client, db, collection, ids, field)
self.ids = ids
def exists(self):
return not self.get_empty_ids()
def write(self, values):
self.get_collection().insert_many(values)
def get_empty_ids(self):
return set(self.ids) - set(existing_ids_in_collection)
class TaskA(luigi.Task):
def run(self):
self.output().write(values)
def output(luigi.Task):
return TaskAInsertTarget(client, database, collection, ids, field)
class TaskBInsertTarget(MongoRangeTarget):
def __init__(self, client: pymongo.MongoClient, db: str, collection: str,
ids: list, field: str):
super().__init__(client, db, collection, ids, field)
self.ids = ids
def exists(self):
return not self.get_empty_ids()
def write(self, values):
# query to update all rows with matching id
self.get_collection().update_many(values) # matching with ids
def get_empty_ids(self):
return set(self.ids) - set(existing_ids_in_collection)
class TaskB(luigi.Task):
def requires(self):
return TaskA()
def run(self):
self.output().write(values)
def output(luigi.Task):
return TaskBInsertTarget(client, database, collection, ids, field)
class TaskCInsertTarget(MongoRangeTarget):
def __init__(self, client: pymongo.MongoClient, db: str, collection: str,
ids: list, field: str):
super().__init__(client, db, collection, ids, field)
self.ids = ids
def exists(self):
return not self.get_empty_ids()
def write(self, values):
# query to update all rows with matching id
self.get_collection().update_many(values) # matching with ids
def get_empty_ids(self):
return set(self.ids) - set(existing_ids_in_collection)
class TaskC(luigi.Task):
def requires(self):
return TaskB()
def run(self):
self.output().write(values)
def output(luigi.Task):
return TaskBInsertTarget(client, database, collection, ids, field)
Это общий сценарий.Когда я запускаю весь конвейер, принимая TaskC в качестве точки входа, TaskA проходит / завершеноTaskB выполняется успешно, но происходит сбой TaskC, поскольку существует ошибка зависимостей Unfulfilled для TaskB.
Но, когда я запускаю его отдельно, как TaskA <- TaskB (точка входа), он проходит.Точно так же и для TaskA <- TaskC. </p>
Это меня очень беспокоило.Кто-нибудь может дать мне несколько идей о том, почему это может происходить?
Заранее спасибо.