Наиболее вероятная причина отсутствия DAG - ошибка в коде, из-за которой планировщик не может подобрать DAG. Также вы можете проверить, есть ли 2 .py файла с одинаковым именем DAG. Я также видел, как это происходит, когда вы заменяете файл .py с другим именем, но с тем же именем DAG (даже если вы удаляете предыдущий файл .py). Это трудно устранить без проверки среды / журналов, но я думаю, что это наиболее вероятные сценарии. Не стесняйтесь связаться со службой поддержки , если проблема не устранена.
В любом случае, я создал этот DAG, который отлично работает в Composer 1.7.1 Airflow 1.10.2 и Python3. Читая вопрос и код, вы чувствуете, что хотите передать список в таблицы для следующей задачи, поэтому я добавил один, который просто печатает их, используя XCOM :
import datetime
import os
import airflow
from airflow import models
from airflow.operators import python_operator
from google.cloud import bigquery
import time
import logging</p>
<p>default_dag_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1)
}</p>
<p>with models.DAG(
'test_table_xcom',
default_args=default_dag_args, schedule_interval = "@daily") as dag:</p>
<code>TABLE_PREFIX = 'test'
SOURCE_PROJECT = <PROJECT>
SOURCE_DATASET = <DATASET>
def get_table_names(**kwargs):
client = bigquery.Client()
source_tables = []
dataset = '{}.{}'.format(SOURCE_PROJECT,SOURCE_DATASET)
for table in client.list_tables(dataset):
if table.table_id.startswith(TABLE_PREFIX):
source_tables.append(table.table_id)
logging.info('{} tables scheduled to move'.format(len(source_tables)))
return source_tables
def print_tables(**kwargs):
ti = kwargs['ti']
tables_list = ti.xcom_pull(task_ids='list_tables')
for table in tables_list:
print(table)
listTables = python_operator.PythonOperator(task_id='list_tables',python_callable=get_table_names, provide_context=True)
tablePrint = python_operator.PythonOperator(task_id='print_tables',python_callable=print_tables, provide_context=True)
listTables >> tablePrint
</code>
И последнее, но не менее важное: обратите внимание, что Airflow не означает , что означает для самостоятельного выполнения операций ETL, но для их планирования. Использование XCOM не рекомендуется (как задокументировано *), поскольку оно может перегрузить БД (в данном случае Cloud SQL), которая работает под капотом Airflow / Composer. В этом конкретном случае, когда вы будете передавать список имен таблиц, я не думаю, что это будет проблемой, но лучше знать об этой рекомендации.
* если двум операторам необходимо обмениваться информацией, например, именем файла или небольшим объемом данных, вам следует рассмотреть возможность объединения их в один оператор.