Мой код:
import luigi
import pickle
from datetime import datetime
class QueryTwitterTrend(luigi.ExternalTask):
date = luigi.DateMinuteParameter(default=datetime.now())
country_code = luigi.Parameter(default='usa')
def requires(self):
return []
def output(self, **kwargs):
kwargs.setdefault('loc', 'dummy')
return luigi.LocalTarget(
"data/trends/trends_{}_{}.csv".format(self.date.strftime('%m%d_%Y_%H%M'), kwargs['loc']))
def run(self):
from retrieve_trends import run as retrieve_trends
import pandas as pd
args_dict = {
'location': [self.country_code]
}
df_container = retrieve_trends(args_dict)
f = self.output(loc=self.country_code).open('w')
df_container[self.country_code].to_csv(f, sep=',', encoding='utf-8')
f.close()
class TrendsTaskWrapper(luigi.WrapperTask):
def requires(self):
locations = [
'usa-nyc',
'usa-lax',
'usa-chi',
'usa-dal',
'usa-hou',
'usa-wdc',
'usa-mia',
'usa-phi',
'usa-atl',
'usa-bos',
'usa-phx',
'usa-sfo',
'usa-det',
'usa-sea',
]
for loc in locations:
yield QueryTwitterTrend(country_code=loc)
Сводка выполнения Луиджи:
===== Luigi Execution Summary =====
Scheduled 15 tasks of which:
* 14 ran successfully:
- 14 QueryTwitterTrend(date=2019-04-27T1955, country_code=usa-atl) ...
* 1 failed:
- 1 TrendsTaskWrapper()
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
Трассировка:
RuntimeError: Unfulfilled dependencies at run time:QueryTwitterTrend_usa_nyc_2019_04_27T1955_c4a2592db0, QueryTwitterTrend_usa_lax_2019_04_27T1955_e2676d1bef..
В luigi daemon
я получаю UPSTREAM_ERROR
..
Я не знаю точно, что является причиной этой ошибки.Все мои QueryTwitterTrend
задачи работают отлично, с точки зрения создания локального файла, и говорят, что они выполнены в среде Luigi.Проблема в том, что TaskWrapper
читается как ошибка.
Что я делаю, чтобы этого не произошло и чтобы все зависимости отслеживались.Скоро у меня будет больше задач, которые зависят от QueryTwitterTrend.