Передача данных между таможенными операторами воздушного потока - PullRequest
0 голосов
/ 26 апреля 2020

Я создаю свои собственные пользовательские операторы в потоке воздуха и хотел бы использовать выходные данные одного оператора в качестве входных данных другого оператора. В настоящее время я сохраняю выходные данные в s3 и считываю их из s3 в следующем операторе, что представляется эффективным способом. Я наткнулся на следующий пост Воздушный поток и передача данных между операторами , но я не мог понять это совершенно ясно. Я был бы очень признателен, если бы кто-нибудь дал мне хороший пример передачи данных между операторами.

class SFScoreDataOperator(BaseOperator):
    @apply_defaults
    def __init__(self, aws_key, aws_secret,
                 model_version='2020-04-v2',
                 *args, **kwargs):
        self.__version__ = model_version
        self.aws_key = aws_key
        self.aws_secret = aws_secret
        super(SFScoreDataOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        SuccessFactorData(aws_key=self.aws_key, aws_secret=self.aws_secret, to_random_shuffle=False).get_data()


class SFScoreGetTrainOperator(BaseOperator):
    @apply_defaults
    def __init__(self, aws_key, aws_secret,
                 model_version='2020-04-v2',
                 *args, **kwargs):
        self.__version__ = model_version
        self.aws_key = aws_key
        self.aws_secret = aws_secret
        super(SFScoreGetTrainOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        CM = SuccessFactorTrain(aws_key=self.aws_key, aws_secret=self.aws_secret)
        df_to_predict = CM.fetch_predict_data()
        df_train_test = CM.fetch_final_data()
        CM.train_test(final_df_for_train_test=df_train_test, segment_type=None)

class SuccessFactorDataPluginV2(AirflowPlugin):
    name = 'success_factor_data_plugin_v2'
    operators = [SFScoreDataOperator]


class SuccessFactorTrainPluginV2(AirflowPlugin):
    name = 'success_factor_train_plugin_v2'
    operators = [SFScoreGetTrainOperator]

Метод класса get_data в классе SFScoreDataOperator выводит две таблицы, которые являются входными данными для метода train_test в SFScoreGetTrainOperator, а метод train_test выводит 3 переменные, которые будут входными данными для следующего пользовательского оператора. Не все выходы имеют формат CSV, поэтому я не могу записать их в s3. Я прочитал документацию по XCom, но не уверен, как ее реализовать, поэтому буду очень признателен, если смогу получить pu sh. Спасибо!

...