Как запустить задачу pyto operator boto3 AWS-glue в воздушном потоке, основываясь на успешном завершении другой задачи AWS-glue в Airflow? - PullRequest
3 голосов
/ 28 мая 2019

В моем скрипте Ariflow есть несколько задач с использованием оператора python для запуска склеивания с использованием boto3. Хотя поток зависит от каждой задачи в Ariflow. Последовательные задачи не ожидают успешного завершения предыдущей задачи.

Следующая задача задания склеивания вызывается сразу после вызова предыдущей задачи склеивания. В конце, несмотря на то, что Airflow выглядит успешно завершенным, задания на клей все еще выполняются в течение нескольких минут.

    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime, timedelta
    from airflow.operators.sensors import TimeDeltaSensor
    import boto3
    import json

    default_args = {
    'owner': 'SAM',
    'start_date': datetime(2019, 5, 27)
     }


    glue_client = boto3.client('glue', region_name='us-east-1')

    def run_customer_job():
        glue_client.start_job_run(JobName='customer')  


    def run_product_job():
        glue_client.start_job_run(JobName='product')



    with DAG('dataload', default_args=default_args, schedule_interval="0 15 * * *") as dag:
        task1 = PythonOperator(task_id='task1',                                              
                               python_callable=run_customer_job)
        task2 = PythonOperator(task_id='task2',                                                          
                               python_callable=run_product_job)



    task1 >> task2 

Ответы [ 2 ]

1 голос
/ 29 мая 2019
0 голосов
/ 28 мая 2019

Я бы создал цикл состояния после вызова start_job_run, чтобы ваша задача не завершилась до тех пор, пока не выполнится задание Glue.

Быстрый и простой пример (вам потребуется обработка для неудачных задач и т. Д.):

job = glue_client.start_job_run(JobName='customer')

while True:
  status = glue.get_job_run(JobName=job['Name'], RunId=job['JobRunId'])
  if status['JobRun']['JobRunState'] == 'SUCCEEDED':
    break

  time.sleep(10)
...