У меня есть две задачи, одна из которых является пользовательским оператором, где у него есть одно поле шаблона (snapshot_date_str
), и оно установит поле в «xcom», а другой оператор - S3Sensor
, а для bucket_key
требуется поле шаблона, заданное в первом задании.
Определение Дага:
SNAPSHOT_DATE = datetime.now().date()
S3_BUCKET = 'test-s3'
TENANT = 'test'
dag = DAG('template_fields_dag',
default_args=default_args,
schedule_interval='@hourly',
concurrency=1,
catchup=False)
t1 = ContextInitOperator(task_id='set_context', snapshot_date=SNAPSHOT_DATE, tenant=TENANT, dag=dag)
file_task = S3KeySensor(task_id="s3_file_sensor",
aws_conn_id='s3_connection',
bucket_key='test/{{ snapshot_date_str }}/abc.csv',
bucket_name=S3_BUCKET,
wildcard_match=True,
poke_interval=10,
timeout=60,
dag=dag)
t1 >> file_task
И мой пользовательский ContextInitOperator
устанавливает поле шаблона snapshot_date_str
в xcom.
class ContextInitOperator(BaseOperator):
template_fields = ('snapshot_date_str',)
@apply_defaults
def __init__(
self,
snapshot_date,
*args, **kwargs):
super(ContextInitOperator, self).__init__(*args, **kwargs)
self.snapshot_date_str = snapshot_date.strftime('%Y-%m-%d')
def execute(self, context):
context['task_instance'].xcom_push(key='snapshot_date_str', value=self.snapshot_date_str)
bucket_key
требует snapshot_date_str
в пути.
Мне пока не нравится Python и Airflow, я что-то упускаю? Любая помощь будет принята с благодарностью.