Я создаю свои собственные пользовательские операторы в потоке воздуха и хотел бы использовать выходные данные одного оператора в качестве входных данных другого оператора. В настоящее время я сохраняю выходные данные в s3 и считываю их из s3 в следующем операторе, что представляется эффективным способом. Я наткнулся на следующий пост Воздушный поток и передача данных между операторами , но я не мог понять это совершенно ясно. Я был бы очень признателен, если бы кто-нибудь дал мне хороший пример передачи данных между операторами.
class SFScoreDataOperator(BaseOperator):
@apply_defaults
def __init__(self, aws_key, aws_secret,
model_version='2020-04-v2',
*args, **kwargs):
self.__version__ = model_version
self.aws_key = aws_key
self.aws_secret = aws_secret
super(SFScoreDataOperator, self).__init__(*args, **kwargs)
def execute(self, context):
SuccessFactorData(aws_key=self.aws_key, aws_secret=self.aws_secret, to_random_shuffle=False).get_data()
class SFScoreGetTrainOperator(BaseOperator):
@apply_defaults
def __init__(self, aws_key, aws_secret,
model_version='2020-04-v2',
*args, **kwargs):
self.__version__ = model_version
self.aws_key = aws_key
self.aws_secret = aws_secret
super(SFScoreGetTrainOperator, self).__init__(*args, **kwargs)
def execute(self, context):
CM = SuccessFactorTrain(aws_key=self.aws_key, aws_secret=self.aws_secret)
df_to_predict = CM.fetch_predict_data()
df_train_test = CM.fetch_final_data()
CM.train_test(final_df_for_train_test=df_train_test, segment_type=None)
class SuccessFactorDataPluginV2(AirflowPlugin):
name = 'success_factor_data_plugin_v2'
operators = [SFScoreDataOperator]
class SuccessFactorTrainPluginV2(AirflowPlugin):
name = 'success_factor_train_plugin_v2'
operators = [SFScoreGetTrainOperator]
Метод класса get_data в классе SFScoreDataOperator выводит две таблицы, которые являются входными данными для метода train_test в SFScoreGetTrainOperator, а метод train_test выводит 3 переменные, которые будут входными данными для следующего пользовательского оператора. Не все выходы имеют формат CSV, поэтому я не могу записать их в s3. Я прочитал документацию по XCom, но не уверен, как ее реализовать, поэтому буду очень признателен, если смогу получить pu sh. Спасибо!