Сбой оболочки восходящего потока задач luigi при полной обработке других задач - PullRequest
0 голосов
/ 28 апреля 2019

Мой код:

import luigi
import pickle
from datetime import datetime


class QueryTwitterTrend(luigi.ExternalTask):

    date = luigi.DateMinuteParameter(default=datetime.now())
    country_code = luigi.Parameter(default='usa')

    def requires(self):
        return []

    def output(self, **kwargs):
        kwargs.setdefault('loc', 'dummy')

        return luigi.LocalTarget(
            "data/trends/trends_{}_{}.csv".format(self.date.strftime('%m%d_%Y_%H%M'), kwargs['loc']))

    def run(self):
        from retrieve_trends import run as retrieve_trends
        import pandas as pd

        args_dict = {
            'location': [self.country_code]
        }

        df_container = retrieve_trends(args_dict)
        f = self.output(loc=self.country_code).open('w')
        df_container[self.country_code].to_csv(f, sep=',', encoding='utf-8')
        f.close()


class TrendsTaskWrapper(luigi.WrapperTask):

    def requires(self):
        locations = [
            'usa-nyc',
            'usa-lax',
            'usa-chi',
            'usa-dal',
            'usa-hou',
            'usa-wdc',
            'usa-mia',
            'usa-phi',
            'usa-atl',
            'usa-bos',
            'usa-phx',
            'usa-sfo',
            'usa-det',
            'usa-sea',
        ]

        for loc in locations:
            yield QueryTwitterTrend(country_code=loc)

Сводка выполнения Луиджи:

===== Luigi Execution Summary =====

Scheduled 15 tasks of which:
* 14 ran successfully:
    - 14 QueryTwitterTrend(date=2019-04-27T1955, country_code=usa-atl) ...
* 1 failed:
    - 1 TrendsTaskWrapper()

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

Трассировка:

RuntimeError: Unfulfilled dependencies at run time:QueryTwitterTrend_usa_nyc_2019_04_27T1955_c4a2592db0, QueryTwitterTrend_usa_lax_2019_04_27T1955_e2676d1bef..

В luigi daemon я получаю UPSTREAM_ERROR..

enter image description here

Я не знаю точно, что является причиной этой ошибки.Все мои QueryTwitterTrend задачи работают отлично, с точки зрения создания локального файла, и говорят, что они выполнены в среде Luigi.Проблема в том, что TaskWrapper читается как ошибка.

Что я делаю, чтобы этого не произошло и чтобы все зависимости отслеживались.Скоро у меня будет больше задач, которые зависят от QueryTwitterTrend.

...