GCP Composer / Airflow не может распознать DAG после вызова функции - PullRequest
1 голос
/ 05 июня 2019

У меня есть функция, которая получает список таблиц из набора данных BigQuery:

def get_table_names(**kwargs):

    client = bigquery.Client()

    # get source tables

    source_tables = []

    for table in client.list_tables(
            Template('$project.$dataset').substitute(project=SOURCE_PROJECT, dataset=SOURCE_DATASET)):

        if table.table_id.startswith(TABLE_PREFIX):
            source_tables.append(table.table_id)

    logging.info(str(len(source_tables)) + ' tables scheduled to move')

    return source_tables

Я изначально вызывал эту функцию в рамках задачи типа PythonOperator и - хотя я не возвращалзначение - оно работало нормально и вышло из системы «524 таблицы запланированы для перемещения».

Я сейчас называю это как часть настройки dag, чтобы я мог создавать экземпляры задач для каждой таблицы (я не написалэтой части еще нет):

table_tasks = get_table_names()

Но как только я ее вызываю, веб-интерфейс Composer / Airflow перестает распознавать DAG - он все еще отображается, и если я нажимаю значок перезагрузки, я получаю обычное «свежее каксообщение «маргаритка», но если я пытаюсь войти в DAG, я получаю:

Кажется, что DAG «GA360_Replication» отсутствует

1 Ответ

0 голосов
/ 06 июня 2019

Наиболее вероятная причина отсутствия DAG - ошибка в коде, из-за которой планировщик не может подобрать DAG. Также вы можете проверить, есть ли 2 .py файла с одинаковым именем DAG. Я также видел, как это происходит, когда вы заменяете файл .py с другим именем, но с тем же именем DAG (даже если вы удаляете предыдущий файл .py). Это трудно устранить без проверки среды / журналов, но я думаю, что это наиболее вероятные сценарии. Не стесняйтесь связаться со службой поддержки , если проблема не устранена.

В любом случае, я создал этот DAG, который отлично работает в Composer 1.7.1 Airflow 1.10.2 и Python3. Читая вопрос и код, вы чувствуете, что хотите передать список в таблицы для следующей задачи, поэтому я добавил один, который просто печатает их, используя XCOM :

import datetime
import os
import airflow
from airflow import models
from airflow.operators import python_operator
from google.cloud import bigquery
import time
import logging</p>

<p>default_dag_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1)
}</p>

<p>with models.DAG(
        'test_table_xcom',
        default_args=default_dag_args, schedule_interval = "@daily") as dag:</p>

<code>TABLE_PREFIX = 'test'
SOURCE_PROJECT = <PROJECT>
SOURCE_DATASET =  <DATASET>

def get_table_names(**kwargs):

    client = bigquery.Client()
    source_tables = []


    dataset = '{}.{}'.format(SOURCE_PROJECT,SOURCE_DATASET)

    for table in client.list_tables(dataset):
        if table.table_id.startswith(TABLE_PREFIX):
            source_tables.append(table.table_id)

    logging.info('{} tables scheduled to move'.format(len(source_tables)))
    return source_tables

def print_tables(**kwargs):
    ti = kwargs['ti']
    tables_list = ti.xcom_pull(task_ids='list_tables')
    for table in tables_list:
        print(table)


listTables = python_operator.PythonOperator(task_id='list_tables',python_callable=get_table_names, provide_context=True)
tablePrint = python_operator.PythonOperator(task_id='print_tables',python_callable=print_tables, provide_context=True)

listTables >> tablePrint
</code>

И последнее, но не менее важное: обратите внимание, что Airflow не означает , что означает для самостоятельного выполнения операций ETL, но для их планирования. Использование XCOM не рекомендуется (как задокументировано *), поскольку оно может перегрузить БД (в данном случае Cloud SQL), которая работает под капотом Airflow / Composer. В этом конкретном случае, когда вы будете передавать список имен таблиц, я не думаю, что это будет проблемой, но лучше знать об этой рекомендации.

* если двум операторам необходимо обмениваться информацией, например, именем файла или небольшим объемом данных, вам следует рассмотреть возможность объединения их в один оператор.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...