поток воздуха: как я могу поместить метод для чтения файла JSON в локальной библиотеке - PullRequest
2 голосов
/ 01 апреля 2019

Я должен сгенерировать даг. Я сохранил файлы схемы таблицы json на GCP bucket . Файлы в корзине GCP, связанные с composer , будут переназначены в / home / airflow / gcs / dags / . Если я определю метод чтения файла json, после создания дамба, все пойдет нормально. Но если я хочу сгенерировать некоторый «общий код» (для помещения его в мою библиотеку), я не могу получить доступ к FileSystem с помощью кода из библиотеки, в частности, я не могу использовать библиотеку Python json.

Странно то, что я определяю метод на этапе создания dag, но вызываю его только после создания dag!

Чтобы завершить обсуждение, у меня нет проблем, если код в библиотеке использует только объекты памяти.

У меня есть эта проблема, когда я работаю с потоком воздуха (1.9 на драйвере GCP от композитора)

Это моя внешняя библиотека:

lib/
    __init__.py
    bb_airflow_utils.py

во внешней библиотеке

def load_json_file(fname):
    #per far sì che il dag la veda
    with open(fname, 'r') as f:
        d = json.load(f)
    return d

по основному сценарию


from lib.bb_airflow_utils import *
ROOT_PATH = 'home/airflow/gcs/dags'
IDCLI = 'goofy'
...
...
with DAG(dag_id=dag_name, default_args=dag_args) as dag:
    filepath = path.join(ROOT_PATH, '{}-todwh.json'.format(IDCLI))
    get_data = load_json_file(filepath)
    .....
    task_dummy_start = DummyOperator(task_id='task_{}_start'.format(dag_name), dag=dag)
    .....

Поток воздуха игнорируется оператором и пользовательский интерфейс говорит, что у dag нет SLA

1 Ответ

1 голос
/ 02 апреля 2019

Посмотрите на https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies#install-local.

Вы можете поместить общий код в отдельный файл и поместить его в отдельную папку, как показано в примере ниже.

Поместите зависимости в подкаталог в папку dags/. Чтобы импортировать модуль из подкаталога, каждый подкаталог в пути к модулю должен содержать __init__.py файл маркера пакета.

В этом примере зависимость - coin_module.py:

.
dags/
  use_local_deps.py  # A DAG file.
  dependencies/
    __init__.py
    coin_module.py

Импорт зависимости из файла определения DAG.

Например:

from dependencies import coin_module
...