У меня есть одна задача Luigi, которая настроена на выборку различных данных на основе переданных ей параметров, и еще одна, предназначенная для получения этих файлов и отправки их на наш набор данных. Моя проблема заключается в том, что я не уверен, как запланировать несколько версий первой задачи по очереди, чтобы впоследствии их можно было отправить на канал данных. Вот мой код:
class ListStep(luigi.Task):
listId = luigi.IntParameter()
startDate = luigi.Parameter(None)
endDate = luigi.Parameter(None)
logPath = luigi.Parameter(None)
messagePath = luigi.Parameter(None)
summaryPath = luigi.Parameter(None)
contactPath = luigi.Parameter(None)
logFile = luigi.BoolParameter(True)
endpoint = luigi.Parameter()
subscribed = luigi.BoolParameter(True)
fileSuffix = luigi.Parameter(None)
def output(self):
today = datetime.datetime.now()
todayName = today.strftime("%m%d%y")
pipelineNameLog = "./pipelinelog/pipelinelog_{}_{}_{}.csv".format(client.listId, self.endpoint, todayName)
return luigi.LocalTarget(pipelineNameLog)
def run(self):
client = ListkWriter(client_id, client_secret, listId = self.listId, logPath = self.logPath, contactPath = self.contactPath, messagePath = self.messagePath, summaryPath = self.summaryPath)
if self.endpoint == "message":
filesList = client.getMessages(startDate = self.startDate, endDate = self.endDate, log = self.logFile, fileSuffix = self.fileSuffix)
elif self.endpoint == "contacts":
filesList = client.getContacts(startDate = self.startDate, endDate = self.endDate, log = self.logFile, fileSuffix = self.fileSuffix, subscribed = self.subscribed)
elif self.endpoint == "summary":
filesList = client.getSummary(startDate = self.startDate, endDate = self.endDate, log = self.logFile, fileSuffix = self.fileSuffix)
with self.output().open('w') as outfile:
for val in filesList:
outfile.write(",".join([val, datetime.datetime.now().strftime("%H:%M:%S")]))
outfile.write("\n")
class Transfer(luigi.Task):
step = luigi.TaskParameter()
uploadPath = luigi.Parameter()
def requires(self):
return self.step
def output(self):
today = datetime.datetime.now()
todayName = today.strftime("%m%d%y")
pipelineNameLog = "./pipelinelog/pipelinelog_{}_{}_{}.csv".format(client.listId, self.step.endpoint, todayName)
return luigi.LocalTarget(pipelineNameLog)
def run(self):
containerClient = ContainerClient.from_connection_string(storageCreds, 'datalake')
with self.input().open('r') as infile:
fileUpload = infile.readlines()
for file in fileUpload:
pathFile = file.split(",")[0]
fileName = ntpath.basename(pathFile)
uploadName = self.uploadPath + fileName
with open(pathFile, 'rb') as f:
containerClient.upload_blob(uploadName, f)
path = "/test/"
mMessage = ListStep(endpoint = "message", startDate = startDate, listId = listDict['m'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
gMessage = ListStep(endpoint = "message", startDate = startDate, listId = listDict['g'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
mSummary = ListStep(endpoint = "summary", startDate = startDate, listId = listDict['m'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
gSummary = ListStep(endpoint = "summary", startDate = startDate, listId = listDict['g'], summaryPath = path, logPath = path, messagePath = path, contactPath = path)
luigi.run([Transfer(step = mMessage, uploadPath = path), Transfer(step = gMessage, uploadPath = path),
Transfer(step = mSummary, uploadPath = path), Transfer(step = gSummary, uploadPath = path)],
local_scheduler = True)
Чего я хочу добиться, так это того, чтобы после выборки данных с помощью задачи ListStep (с указанными параметрами) их можно было отправлять на набор данных с помощью задачи Transfer. После того, как первый набор операций выполнен, я хочу, чтобы он перешел к следующему набору объектов. Моя цель не в том, чтобы все это работало параллельно, а скорее в последовательности. Когда я выполняю этот сценарий, кажется, что планировщик считает только 5 задач, а не 8. Кроме того, он переходит к выполнению задачи mMessage, затем задачи gMessage, не выполняя задачу Transfer. Позднее происходит сбой из-за ошибки FileExistError и выводится следующая сводка:
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 2 ran successfully:
- 2 ListrakStep(...)
* 2 failed:
- 2 ListrakStep(...)
* 1 were left pending, among these:
* 1 was not granted run permission by the scheduler:
- 1 Transfer(step=ListrakStep, destination=datalake, uploadPath=test/)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====