Развертывание преобразованного ooz ie DAG в Google Composer Airflow: нет модуля с именем 'o2a' - PullRequest
0 голосов
/ 30 апреля 2020

Я использую google ooz ie для преобразования потока воздуха для преобразования некоторых рабочих процессов ooz ie, которые выполняются на AWS EMR. Удалось получить первую версию, но когда я пытаюсь загрузить DAG, airflow выдает ошибку:

Broken DAG: нет модуля с именем 'o2a'

Я попытался развернуть пакет pypi o2a, используя команду

gcloud composer environments update composer-name --update-pypi-packages-from-file requirements.txt --location location

и из облачной консоли Google. И то, и другое не удалось.

needs.txt

o2a==1.0.1

Вот код

    from airflow import models
    from airflow.operators.subdag_operator import SubDagOperator
    from airflow.utils import dates
    from o2a.o2a_libs import functions
    from airflow.models import Variable

    import subdag_validation
    import subdag_generate_reports

    CONFIG = {}

    JOB_PROPS = {

    }

    dag_config = Variable.get("coordinator", deserialize_json=True)
    cdrPeriod = dag_config["cdrPeriod"]

    TASK_MAP = {"validation": ["validation"], "generate_reports": ["generate_reports"] }

    TEMPLATE_ENV = {**CONFIG, **JOB_PROPS, "functions": functions, "task_map": TASK_MAP}

    with models.DAG(
        "workflow_coordinator",
        schedule_interval=None,  # Change to suit your needs
        start_date=dates.days_ago(0),  # Change to suit your needs
        user_defined_macros=TEMPLATE_ENV,
    ) as dag:

        validation = SubDagOperator(
            task_id="validation",
            trigger_rule="one_success",
            subdag=subdag_validation.sub_dag(dag.dag_id, "validation", dag.start_date, dag.schedule_interval),
        )

        generate_reports = SubDagOperator(
            task_id="generate_reports",
            trigger_rule="one_success",
            subdag=subdag_generate_reports.sub_dag(dag.dag_id, "generate_reports", dag.start_date, dag.schedule_interval,
            {
                "cdrPeriod": "{{cdrPeriod}}"
            }),
        )

        validation.set_downstream(generate_reports)

1 Ответ

0 голосов
/ 04 мая 2020

В документах o2a есть раздел, в котором рассказывается, как развертывать o2a:

https://github.com/GoogleCloudPlatform/oozie-to-airflow#the -o2a-library

С не удалось запустить из-за другой зависимости : lark-parser Только что установлен с помощью диспетчера пакетов pypi для Composer сделал свое дело.

...