Как написать / прочитать отметку времени из переменной в потоке воздуха? - PullRequest
0 голосов
/ 13 сентября 2018

Я работаю с: EXEC_DATE = '{{ macros.ds_add(ds, 1) }}' Это дает мне дату выполнения, но не час.Я хочу, чтобы иметь возможность сохранить это значение как YYYY-MM-DD HH:MM в переменную с именем process_last_run.

В основном читать переменную в начале цикла и записывать в него в конце тега.Эта переменная указывает, какое было время работы последнего дага.

Как я могу это сделать?

Ответы [ 2 ]

0 голосов
/ 14 сентября 2018

Вы можете сделать это с помощью макроса execution_date.Однако следует помнить, что это плохо названная концепция в воздушном потоке.Это представляет начало запланированного интервала периода.Он не изменится в том же режиме, даже если задача будет перезапущена вручную.Он там для поддержки идемпотентных обновлений данных.Что, честно говоря, является оптимальным способом приблизиться к конвейерам данных.В вашем случае, хотя вы и сказали в другом месте, что ваш api для извлечения данных берет дату начала и предоставляет все данные до текущей даты, что не способствует идемпотентной обработке, хотя вы можете выбросить данные после указанного отрезания.

Таким образом, вместо этого вы можете просто взять дату после завершения обработки данных и сохранить ее на потом.Вы можете сохранить в переменной воздушного потока.Вы можете заметить, однако, что время, которое вы получите из команды date, показанной ниже, будет позже, чем в последний раз, когда данные, которые вы, возможно, получили из вашего api-вызова process_data для всех данных с даты начала.Поэтому может быть лучше, если ваш шаг обработки выводит фактическую последнюю дату и время данных, обработанных в качестве последней строки стандартного вывода (который захватывается BashOperator для xcom).

EG

from airflow.models import Variable, DAG
from datetime import datetime

def pyop_fun(**context):
  # You could have used execution_date here and in the next operator
  # to make the operator rerun safe.
  # date_string = context['execution_date'].strftime('%Y-%m-%d %H:%M')
  # But elsewhere you said your api is always giving you the up-to-the-minute data.
  # So maybe getting the date from the prior task would work better for you.
  Variable.set(
    'process_last_run',
    context['task_instance'].xcom_pull(task_ids='process_data')

with dag as DAG(…):
  pyop = PythonOperator(
    task_id='set_process_last_run',
    callable=pyop_fun,
    provide_context=True, …)
  shop = BashOperator(
    task_id='process_data',
    bash_command='''
      process_data "{{var.value.process_last_run}}";
      date -u +%Y-%m-%d\ %H:%M''',
    xcom_push=True, …)
  shop >> pyop

# Because the last output line of a BashOperator is pushed into xcom for that
# task id with the default key, it can be pulled by the PythonOperator and 
# stored in a variable.
0 голосов
/ 13 сентября 2018

В Jinja есть переменная {{ execution_date }}, которую можно использовать для получения даты выполнения текущего прогона DAG.

Дополнительная информация: Воздушный поток - макросы

Есливы хотите отслеживать что-то вроде времени начала или окончания выполнения или продолжительности (в секундах) определенного экземпляра задачи, эта информация хранится в модели TaskInstance.

class TaskInstance(Base, LoggingMixin):
    ...
    start_date = Column(UtcDateTime)
    end_date = Column(UtcDateTime)
    duration = Column(Float)

https://github.com/apache/incubator-airflow/blob/4c30d402c4cd57dc56a5d9dcfe642eadc77ec3ba/airflow/models.py#L877-L879

Кроме того, если вы хотите вычислить время выполнения всей группы обеспечения доступности баз данных, вы можете получить ее, запросив базу данных метаданных Airflow вокруг этих полей для определенного запуска группы доступности базы данных.

Если вы делаете этоуже в своем коде Python вы можете получить доступ к полю execution_date в самом экземпляре задачи вместо использования слоя шаблона.

Переменные

Вы можете записывать и читать из переменных Airflow вот так :

Variable.set('my_key', 'some value')
my_val = Variable.get('my_key')

Вы также можете выполнять операции CRUD с переменными с помощью CLI .

Статистика

Другойчто вы могли бы иметь в виду, есливы обнаруживаете, что работаете со статистикой, такой как длительность задачи, - это интеграция Airflow с StatsD, которая собирает метрики для самого Airflow во время выполнения.Вы можете использовать эти метрики в системе на основе push, такой как StatsD, или в системе на основе pull, такой как Prometheus / Grafana, используя statsd_exporter .

...