Цепочка нескольких задач в Луиджи - PullRequest
0 голосов
/ 08 января 2020

У меня есть одна задача Luigi, которая настроена на выборку различных данных на основе переданных ей параметров, и еще одна, предназначенная для получения этих файлов и отправки их на наш набор данных. Моя проблема заключается в том, что я не уверен, как запланировать несколько версий первой задачи по очереди, чтобы впоследствии их можно было отправить на канал данных. Вот мой код:

class ListStep(luigi.Task):

    listId = luigi.IntParameter()
    startDate = luigi.Parameter(None)
    endDate = luigi.Parameter(None)
    logPath = luigi.Parameter(None)
    messagePath = luigi.Parameter(None)
    summaryPath = luigi.Parameter(None)
    contactPath = luigi.Parameter(None)
    logFile = luigi.BoolParameter(True)
    endpoint = luigi.Parameter()
    subscribed = luigi.BoolParameter(True)
    fileSuffix = luigi.Parameter(None)

    def output(self):
        today =  datetime.datetime.now()
        todayName = today.strftime("%m%d%y")
        pipelineNameLog = "./pipelinelog/pipelinelog_{}_{}_{}.csv".format(client.listId, self.endpoint, todayName)
        return luigi.LocalTarget(pipelineNameLog)

    def run(self):
        client = ListkWriter(client_id, client_secret, listId = self.listId, logPath = self.logPath, contactPath = self.contactPath, messagePath = self.messagePath, summaryPath = self.summaryPath)

        if self.endpoint == "message":
            filesList = client.getMessages(startDate = self.startDate, endDate = self.endDate, log = self.logFile, fileSuffix = self.fileSuffix)
        elif self.endpoint == "contacts":
            filesList = client.getContacts(startDate = self.startDate, endDate = self.endDate,  log = self.logFile, fileSuffix = self.fileSuffix, subscribed = self.subscribed)
        elif self.endpoint == "summary":
            filesList = client.getSummary(startDate = self.startDate, endDate = self.endDate, log = self.logFile, fileSuffix = self.fileSuffix)

        with self.output().open('w') as outfile:
            for val in filesList:
                outfile.write(",".join([val, datetime.datetime.now().strftime("%H:%M:%S")]))
                outfile.write("\n")

class Transfer(luigi.Task):

    step = luigi.TaskParameter()
    uploadPath = luigi.Parameter()

    def requires(self):
        return self.step

    def output(self):
        today =  datetime.datetime.now()
        todayName = today.strftime("%m%d%y")
        pipelineNameLog = "./pipelinelog/pipelinelog_{}_{}_{}.csv".format(client.listId, self.step.endpoint, todayName)
        return luigi.LocalTarget(pipelineNameLog)

    def run(self):
        containerClient = ContainerClient.from_connection_string(storageCreds, 'datalake')
        with self.input().open('r') as infile:
            fileUpload = infile.readlines()

        for file in fileUpload:
            pathFile = file.split(",")[0]
            fileName = ntpath.basename(pathFile)
            uploadName = self.uploadPath + fileName
            with open(pathFile, 'rb') as f:
                containerClient.upload_blob(uploadName, f)

path = "/test/"

mMessage = ListStep(endpoint = "message", startDate = startDate, listId = listDict['m'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
gMessage = ListStep(endpoint = "message", startDate = startDate, listId = listDict['g'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
mSummary = ListStep(endpoint = "summary", startDate = startDate, listId = listDict['m'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
gSummary = ListStep(endpoint = "summary", startDate = startDate, listId = listDict['g'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)

luigi.run([Transfer(step = mMessage, uploadPath = path), Transfer(step = gMessage, uploadPath = path),
           Transfer(step = mSummary, uploadPath = path), Transfer(step = gSummary, uploadPath = path)],
          local_scheduler = True)

Чего я хочу добиться, так это того, чтобы после выборки данных с помощью задачи ListStep (с указанными параметрами) их можно было отправлять на набор данных с помощью задачи Transfer. После того, как первый набор операций выполнен, я хочу, чтобы он перешел к следующему набору объектов. Моя цель не в том, чтобы все это работало параллельно, а скорее в последовательности. Когда я выполняю этот сценарий, кажется, что планировщик считает только 5 задач, а не 8. Кроме того, он переходит к выполнению задачи mMessage, затем задачи gMessage, не выполняя задачу Transfer. Позднее происходит сбой из-за ошибки FileExistError и выводится следующая сводка:

===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 2 ran successfully:
    - 2 ListrakStep(...)
* 2 failed:
    - 2 ListrakStep(...)
* 1 were left pending, among these:
    * 1 was not granted run permission by the scheduler:
        - 1 Transfer(step=ListrakStep, destination=datalake, uploadPath=test/)

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

1 Ответ

0 голосов
/ 03 февраля 2020

Во-первых, если у вас нет задач, требующих других задач, вы не можете гарантировать, что задачи будут выполняться в определенном порядке. Возможно, вы читали о системе приоритетов в документах, но она определяет только то, что назначается работникам, а не когда работники выполнят свои задачи. Один из способов упорядочить задачу - использовать параметр задачи и создать последовательность, подобную этой:

class SequenceTask(luigi.Task):
    prev_task = luigi.TaskParameter()

    def requires(self):
        return self.prev_task
    ...

a = SequenceTask(...)
b = SequenceTask(prev_task=a, ...)
c = SequenceTask(prev_task=b, ...)

Вы действительно делаете это в вашем TransferTask (хотя я думаю, что вместо этого вам нужно ListStep TransferTask

Я не верю, что планировщик обнаружил только 5 задач. Похоже, что запланировано 5, но затем не дало разрешения на выполнение первой задачи Transfer.

Примечание: вывод в ListStep неверен и будет вызывает у вас проблемы в будущем. Вам необходимо вычислить дату и время вне функции output, в противном случае при повторном вызове output даты будут другими, поэтому задача никогда не будет выполнена.

...