Я должен сгенерировать даг. Я сохранил файлы схемы таблицы json на GCP bucket .
Файлы в корзине GCP, связанные с composer , будут переназначены в / home / airflow / gcs / dags / .
Если я определю метод чтения файла json, после создания дамба, все пойдет нормально.
Но если я хочу сгенерировать некоторый «общий код» (для помещения его в мою библиотеку), я не могу получить доступ к FileSystem с помощью кода из библиотеки, в частности, я не могу использовать библиотеку Python json.
Странно то, что я определяю метод на этапе создания dag, но вызываю его только после создания dag!
Чтобы завершить обсуждение, у меня нет проблем, если код в библиотеке использует только объекты памяти.
У меня есть эта проблема, когда я работаю с потоком воздуха (1.9 на драйвере GCP от композитора)
Это моя внешняя библиотека:
lib/
__init__.py
bb_airflow_utils.py
во внешней библиотеке
def load_json_file(fname):
#per far sì che il dag la veda
with open(fname, 'r') as f:
d = json.load(f)
return d
по основному сценарию
from lib.bb_airflow_utils import *
ROOT_PATH = 'home/airflow/gcs/dags'
IDCLI = 'goofy'
...
...
with DAG(dag_id=dag_name, default_args=dag_args) as dag:
filepath = path.join(ROOT_PATH, '{}-todwh.json'.format(IDCLI))
get_data = load_json_file(filepath)
.....
task_dummy_start = DummyOperator(task_id='task_{}_start'.format(dag_name), dag=dag)
.....
Поток воздуха игнорируется оператором и пользовательский интерфейс говорит, что у dag нет SLA