Воздушный поток: доступ к полю шаблона из исходной задачи - PullRequest
0 голосов
/ 09 мая 2018

У меня есть две задачи, одна из которых является пользовательским оператором, где у него есть одно поле шаблона (snapshot_date_str), и оно установит поле в «xcom», а другой оператор - S3Sensor, а для bucket_key требуется поле шаблона, заданное в первом задании.

Определение Дага:

SNAPSHOT_DATE = datetime.now().date()
S3_BUCKET = 'test-s3'
TENANT = 'test'

dag = DAG('template_fields_dag',
          default_args=default_args,
          schedule_interval='@hourly',
          concurrency=1,
          catchup=False)

t1 = ContextInitOperator(task_id='set_context', snapshot_date=SNAPSHOT_DATE, tenant=TENANT, dag=dag)

file_task = S3KeySensor(task_id="s3_file_sensor",
                        aws_conn_id='s3_connection',
                        bucket_key='test/{{ snapshot_date_str }}/abc.csv',
                        bucket_name=S3_BUCKET,
                        wildcard_match=True,
                        poke_interval=10,
                        timeout=60,
                        dag=dag)
t1 >> file_task

И мой пользовательский ContextInitOperator устанавливает поле шаблона snapshot_date_str в xcom.

class ContextInitOperator(BaseOperator):

    template_fields = ('snapshot_date_str',)

    @apply_defaults
    def __init__(
            self,
            snapshot_date,
            *args, **kwargs):
        super(ContextInitOperator, self).__init__(*args, **kwargs)
        self.snapshot_date_str = snapshot_date.strftime('%Y-%m-%d')

    def execute(self, context):
        context['task_instance'].xcom_push(key='snapshot_date_str', value=self.snapshot_date_str)

bucket_key требует snapshot_date_str в пути.

Мне пока не нравится Python и Airflow, я что-то упускаю? Любая помощь будет принята с благодарностью.

Ответы [ 2 ]

0 голосов
/ 09 мая 2018

Если это только для получения отформатированной даты, Airflow может помочь вам в этом. В зависимости от ваших потребностей вы можете использовать следующие предопределенные переменные:

bucket_key='test/{{ ds }}/abc.csv',

на сегодня

bucket_key='test/{{ yesterday_ds }}/abc.csv',

за вчерашний день и

bucket_key='test/{{ tomorrow_ds }}/abc.csv',

на завтра. Посмотреть все используемые макросы можно здесь: https://airflow.apache.org/code.html#macros

Это означает, что ваш ContextInitOperator может быть удален.

bucket_key также является шаблонным полем, как видно из исходного файла (https://airflow.incubator.apache.org/_modules/airflow/operators/sensors.html), поэтому использование переменных Jinja будет работать.

Обработка даты в Airflow выполняется немного по-другому, поэтому вам, возможно, придется поэкспериментировать, чтобы получить нужный результат (от https://airflow.incubator.apache.org/scheduler.html):

Обратите внимание, что если вы запустите группу обеспечения доступности баз данных с параметром schedule_interval, равным одному дню, запуск с отметкой 2016-01-01 будет запущен вскоре после 2016-01-01T23: 59. Другими словами, экземпляр задания запускается после окончания периода, который он охватывает.

0 голосов
/ 09 мая 2018

из документации , возможно, вам придется что-то сделать в строках

bucket_key="test/{{ task_instance.xcom_pull(task_ids='set_context', key='snapshot_date_str') }}/abc.csv"
...