Неудовлетворенная зависимость Луги в MongoRangeTarget - PullRequest
0 голосов
/ 29 сентября 2019

У меня есть три задачи, каждая из которых имеет MongoRangeTarget, и вторая задача возвращает невыполненную зависимость, хотя она успешно выполняется и приводит к сбою третьих задач.

Сценарий ниже:

import luigi
from luigi.contrib.mongodb import MongoRangeTarget

class TaskAInsertTarget(MongoRangeTarget):
    def __init__(self, client: pymongo.MongoClient, db: str, collection: str,
                 ids: list, field: str):
        super().__init__(client, db, collection, ids, field)
        self.ids = ids

    def exists(self):
        return not self.get_empty_ids()

    def write(self, values):
        self.get_collection().insert_many(values)

    def get_empty_ids(self):
        return set(self.ids) - set(existing_ids_in_collection)

class TaskA(luigi.Task):
    def run(self):
        self.output().write(values)

    def output(luigi.Task):
        return TaskAInsertTarget(client, database, collection, ids, field)

class TaskBInsertTarget(MongoRangeTarget):
    def __init__(self, client: pymongo.MongoClient, db: str, collection: str,
                 ids: list, field: str):
        super().__init__(client, db, collection, ids, field)
        self.ids = ids

    def exists(self):
        return not self.get_empty_ids()

    def write(self, values):
        # query to update all rows with matching id
        self.get_collection().update_many(values) # matching with ids 

    def get_empty_ids(self):
        return set(self.ids) - set(existing_ids_in_collection)

class TaskB(luigi.Task):
    def requires(self):
        return TaskA()

    def run(self):
        self.output().write(values)

    def output(luigi.Task):
        return TaskBInsertTarget(client, database, collection, ids, field)

class TaskCInsertTarget(MongoRangeTarget):
    def __init__(self, client: pymongo.MongoClient, db: str, collection: str,
                 ids: list, field: str):
        super().__init__(client, db, collection, ids, field)
        self.ids = ids

    def exists(self):
        return not self.get_empty_ids()

    def write(self, values):
        # query to update all rows with matching id
        self.get_collection().update_many(values) # matching with ids 

    def get_empty_ids(self):
        return set(self.ids) - set(existing_ids_in_collection)

class TaskC(luigi.Task):
    def requires(self):
        return TaskB()

    def run(self):
        self.output().write(values)

    def output(luigi.Task):
        return TaskBInsertTarget(client, database, collection, ids, field)

Это общий сценарий.Когда я запускаю весь конвейер, принимая TaskC в качестве точки входа, TaskA проходит / завершеноTaskB выполняется успешно, но происходит сбой TaskC, поскольку существует ошибка зависимостей Unfulfilled для TaskB.

Но, когда я запускаю его отдельно, как TaskA <- TaskB (точка входа), он проходит.Точно так же и для TaskA <- TaskC. </p>

Это меня очень беспокоило.Кто-нибудь может дать мне несколько идей о том, почему это может происходить?

Заранее спасибо.

...