Как вызвать серию AWS лямбда с использованием воздушного потока. У меня три лямбды, вторая лямбда должна вызываться исходя из статуса первой лямбды - PullRequest
0 голосов
/ 30 октября 2019
"""
functions to invoke the aws lambda
"""
def lambda1(ds,**kwargs):
    lambda_client = boto3.client('lambda', 
                             region_name='us-east-1',
                             aws_access_key_id='',
                             aws_secret_access_key='')
    response_1 = lambda_client.invoke(FunctionName='lambda1',InvocationType='RequestResponse')
    print ('Response--->' , response_1)

def lambda2(ds,**kwargs):
    lambda_client = boto3.client('lambda', 
                             region_name='us-east-1',
                             aws_access_key_id='',
                             aws_secret_access_key='')
    response_2 = lambda_client.invoke(FunctionName='lambda2',InvocationType='RequestResponse')
    print ('Response--->' , response_2)

def lambda3(ds,**kwargs):
    lambda_client = boto3.client('lambda', 
                             region_name='us-east-1',
                             aws_access_key_id='',
                             aws_secret_access_key='')
    response_3=lambda_client.invoke(FunctionName='lambda3',InvocationType='RequestResponse')
    print ('Response--->' , response_3)
"""
Default arguments 
"""
default_args = {
    'owner': 'Prabhu',
    'depends_on_past': False,
    'start_date': datetime(2019,10,23),
    'email': ['@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}

dag = DAG(
    'invocation_lambda',
    default_args=default_args,
    description='invoke a lambda in dev aws instance'
)
"""
 invoke a lambda in  aws instance
"""

start_operator = DummyOperator(task_id='Begin_execution',  dag=dag)

invoke_lambda1 = PythonOperator(
    task_id="lambda1",
    python_callable=lambda1,
    provide_context=True,
    dag=dag
)

invoke_lambda2 = PythonOperator(
    task_id="lambda2",
    python_callable=lambda2,
    provide_context=True,
    dag=dag
)

invoke_lambda3 = PythonOperator(
    task_id="lambda3",
    python_callable=lambda3,
    provide_context=True,
    dag=dag
)

end_operator = DummyOperator(task_id='stop_execution',  dag=dag)

"""
Orchestrating  the task
"""
start_operator >> invoke_lambda1 >> invoke_lambda2 >> invoke_lambda3 >> 
end_operator


    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime
    import boto3, json

    ""
    functions
    """
    def lambda1(ds,**kwargs):
        lambda_client = boto3.client('lambda', 
                                 region_name='us-east-1',
                                 aws_access_key_id='',
                                 aws_secret_access_key='')
        response_1 = lambda_client.invoke(FunctionName='lambda1',InvocationType='RequestResponse')
        print ('Response--->' , response_1)

    def lambda2(ds,**kwargs):
        lambda_client = boto3.client('lambda', 
                                 region_name='us-east-1',
                                 aws_access_key_id='',
                                 aws_secret_access_key='')
        response_2 = lambda_client.invoke(FunctionName='lambda2',InvocationType='RequestResponse')
        print ('Response--->' , response_2)

    def lambda3(ds,**kwargs):
        lambda_client = boto3.client('lambda', 
                                 region_name='us-east-1',
                                 aws_access_key_id='',
                                 aws_secret_access_key='')
        response_3 = 



    lambda_client.invoke(FunctionName='lambda3',InvocationType='RequestResponse')
        print ('Response--->' , response_3)

    default_args = {
        'owner': 'Prabhu',
        'depends_on_past': False,
        'start_date': datetime(2019,10,23),
        'email': ['@gmail.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
    }

    dag = DAG(
        'invocation_lambda',
        default_args=default_args,
        description='invoke a lambda in dev aws instance'
    )
    """
     invoke a lambda in  aws instance
    """

    start_operator = DummyOperator(task_id='Begin_execution',  dag=dag)

    invoke_lambda1 = PythonOperator(
        task_id="lambda1",
        python_callable=lambda1,
        provide_context=True,
        dag=dag
    )

    invoke_lambda2 = PythonOperator(
        task_id="lambda2",
        python_callable=lambda2,
        provide_context=True,
        dag=dag
    )

    invoke_lambda3 = PythonOperator(
        task_id="lambda3",
        python_callable=lambda3,
        provide_context=True,
        dag=dag
    )

    end_operator = DummyOperator(task_id='stop_execution',  dag=dag)

    """
    Orchestrating  the task
    """
    start_operator >> invoke_lambda1 >> invoke_lambda2 >> invoke_lambda3 >> 
    end_operator

Мне нужно вызывать третью задачу только после того, как вторая задача (вызов и выполнение лямбды) успешно завершена

1 Ответ

0 голосов
/ 30 октября 2019

В настоящее время существует PR для GitHub Airflow с лямбда-оператором AWS: https://github.com/apache/airflow/pull/5914. Вы можете использовать этот оператор для вызова лямбда-выражения и ожидать его ответа (режимом вызова по умолчанию является ответ на запрос). Он имеет параметр check_success_function, который ожидает функцию, которая принимает ответ и возвращает логическое значение, указывающее успех или нет.

def check_success_function1(responser):
    """Function that is passed to the lambda operator to check for success.

    """

    validated = //whatever is expected from lambda1 in the response
    return validated

payload = { "data": "data" }
invoke_lambda1 = AwsLambdaInvokeFunctionOperator(
    task_id="invoke_lambda1",
    function_name="lambda1",
    region_name="eu-west-1",
    payload=json.dumps(payload),
    check_success_function=check_success_function1,
)

def check_success_function2(responser):
    """Function that is passed to the lambda operator to check for success.

    """

    validated = //whatever is expected from lambda2 in the response
    return validated

invoke_lambda2 = AwsLambdaInvokeFunctionOperator(
    task_id="invoke_lambda2",
    function_name="lambda2",
    region_name="eu-west-1",
    payload=json.dumps(payload),
    check_success_function=check_success_function2,
)

start_operator >> invoke_lambda1 >> invoke_lambda2 >> invoke_lambda3 >> end_operator 

Воздушный поток гарантирует, что в случае сбоя задачи invoke_lambda2 (например, из-заcheck_success_function терпит неудачу), что invoke_lambda3 не выполняется.

...