Luigi - передайте переменную как вывод между задачами - PullRequest
0 голосов
/ 09 мая 2019

Я знаю, что могу передавать переменные между задачами, сохраняя их в файл и затем читая его в следующей задаче.Однако я не хочу генерировать так много документов в своем проекте, поэтому я пытаюсь передать его непосредственно как переменную.

Мой код выглядит так:

class FilterSpam(luigi.Task):
    time_slice = luigi.parameter.DateMinuteParameter(interval=30, default=datetime.datetime.today())

    filtered_tweets = []

    def requires(self):
        return Streaming(time_slice=self.time_slice)

    def run(self):
        with self.input().open('r') as infile:
            reader = csv.DictReader(infile, delimiter='\t')
            tweets = list(reader)
            self.filtered_tweets, spam = filter_spam(tweets, 0.7)

        with open('data/results/detected_spam.txt', 'a') as spam_file:
            for tweet in spam:    
                spam_json = json.dumps(tweet, ensure_ascii=False)
                spam_file.write(spam_json+'\n')

    def output(self):
        return self.filtered_tweets

class LemmatizeTweets(luigi.Task):
    time_slice = luigi.parameter.DateMinuteParameter(interval=30, default=datetime.datetime.today())

    def requires(self):
        return FilterSpam(time_slice=self.time_slice)

    def run():
        filtered_tweets = self.input() # Lista de diccionarios

        lemmatized_tweets = lemmatize(filtered_tweets)

        with self.output().open('w') as outfile:
            for tweet in lemmatized_tweets:
                tweet_json = json.dumps(tweet, ensure_ascii=False)
                outfile.write(tweet_json+'\n')

    def output(self):
        return luigi.LocalTarget('data/processed/{}.csv'.format(self.time_slice))

, где filtered_tweets - список диктов.

Можно ли передать эту переменную в задачу LemmatizeTweetsбез необходимости сохранять его в файл?Если нет, то какой будет лучший способ сохранить значения?В рассоле .txt с json-объектом в каждой строке ...?

...