Как ждать, пока работа не будет выполнена или файл обновляется в потоке воздуха - PullRequest
2 голосов
/ 18 февраля 2020

Я пытаюсь использовать apache -airflow с облаком Google- composer, чтобы запланировать пакетную обработку, которая приводит к обучению модели на платформе Google ai. Мне не удалось использовать операторы воздушного потока, как я объясняю в этом вопросе , не удалось указать master_type в MLEngineTrainingOperator

С помощью командной строки мне удалось успешно запустить задание. Поэтому теперь моя задача - интегрировать эту команду в поток воздуха.

Используя BashOperator, я могу обучить модель, но мне нужно дождаться завершения задания, прежде чем создавать версию и устанавливать ее по умолчанию. Эта группа обеспечения доступности баз данных создает версию перед выполнением задания

    bash_command_train = "gcloud ai-platform jobs submit training training_job_name " \
                         "--packages=gs://path/to/the/package.tar.gz " \
                         "--python-version=3.5 --region=europe-west1 --runtime-version=1.14" \
                         " --module-name=trainer.train --scale-tier=CUSTOM --master-machine-type=n1-highmem-16"
    bash_train_operator = BashOperator(task_id='train_with_bash_command',
                                       bash_command=bash_command_train,
                                       dag=dag,)



    ...
    create_version_op = MLEngineVersionOperator(
        task_id='create_version',
        project_id=PROJECT,
        model_name=MODEL_NAME,
        version={
            'name': version_name,
            'deploymentUri': export_uri,
            'runtimeVersion': RUNTIME_VERSION,
            'pythonVersion': '3.5',
            'framework': 'SCIKIT_LEARN',
        },
        operation='create')

    set_version_default_op = MLEngineVersionOperator(
        task_id='set_version_as_default',
        project_id=PROJECT,
        model_name=MODEL_NAME,
        version={'name': version_name},
        operation='set_default')

    # Ordering the tasks
    bash_train_operator >> create_version_op >> set_version_default_op

Результат обучения - обновление файла в хранилище Gcloud. Поэтому я ищу оператора или датчик, который будет ждать обновления этого файла, я заметил GoogleCloudStorageObjectUpdatedSensor, но я не знаю, как заставить его повторить, пока этот файл не будет обновлен. Другим решением было бы проверить выполнение задания, но я не могу найти, как это сделать.

Любая помощь будет принята с благодарностью.

1 Ответ

2 голосов
/ 19 февраля 2020

Документация Google Cloud для флага --stream-logs:

"Блокировать до завершения задания и потоковую передачу журналов во время выполнения задания."

Добавить этот флаг на bash_command_train и я думаю, что это должно решить вашу проблему. Команда должна быть отменена только после завершения задания, тогда Airflow отметит ее как успешную. Это также позволит вам отслеживать журналы ваших тренировочных заданий в Airflow.

...