Как создать таблицу BigQuery с уведомлением об отказе Airflow? - PullRequest
0 голосов
/ 24 марта 2020

У меня есть GAG Airflow на GCP composer, который запускается каждые 5 минут. Я хотел бы создать таблицу BigQuery, в которой будет время, когда DAG начнет работать, и флаг, определяющий, является ли он успешным или неудачным. Например, если группа обеспечения доступности баз данных работает в 2020-03-23 ​​02:30 и запуск завершается неудачно, таблица BigQuery будет иметь столбец времени с 2020-03-23 ​​02:30 и столбец с флагом 1. Если это успешный запуск, то в таблице будет столбец времени с 2020-03-23 ​​02:30 и столбец флага с 0. В таблицу будут добавлены новые строки.

Заранее спасибо

Ответы [ 2 ]

1 голос
/ 24 марта 2020

На основе решения @Enrique, вот мое окончательное решение.

def status_check(**kwargs):

        dag_id = 'dag_id'
        dag_runs = DagRun.find(dag_id=dag_id)

        import pandas as pd
        import pandas_gbq
        from google.cloud import bigquery

        arr = []
        arr1 = []

        for dag_run in dag_runs:
            arr.append(dag_run.state)
            arr1.append(dag_run.execution_date)

        data1 = {'dag_status': arr, 'time': arr1}

        df = pd.DataFrame(data1)

        project_name = "project_name"
        dataset = "Dataset"

        outputBQtableName = '{}'.format(dataset)+'.dag_status_tab'

        df.to_gbq(outputBQtableName, project_id=project_name, if_exists='replace', progress_bar=False, \
            table_schema= \
                [{'name': 'dag_status', 'type': 'STRING'}, \
                 {'name': 'time', 'type': 'TIMESTAMP'}])

        return None


Dag_status = PythonOperator(
        task_id='Dag_status',
        python_callable=status_check,
    )
0 голосов
/ 24 марта 2020

Вы можете list_dag_runs CLI, чтобы вывести список прогонов DAG для данного dag_id. Возвращаемая информация включает в себя состояние каждого прогона.

Другой вариант - получение информации с помощью кода python несколькими различными способами. Одним из таких способов, который я использовал в прошлом, является метод 'find' в airflow.models.dagrun.DagRun .

dag_id = 'my_dag'
dag_runs = DagRun.find(dag_id=dag_id)
for dag_run in dag_runs:
      print(dag_run.state)

Наконец, используйте оператор BigQuery для записи информации DAg в таблицу BigQuery. Вы можете найти пример использования BigQueryOperator здесь .

...