"""
functions to invoke the aws lambda
"""
def lambda1(ds,**kwargs):
lambda_client = boto3.client('lambda',
region_name='us-east-1',
aws_access_key_id='',
aws_secret_access_key='')
response_1 = lambda_client.invoke(FunctionName='lambda1',InvocationType='RequestResponse')
print ('Response--->' , response_1)
def lambda2(ds,**kwargs):
lambda_client = boto3.client('lambda',
region_name='us-east-1',
aws_access_key_id='',
aws_secret_access_key='')
response_2 = lambda_client.invoke(FunctionName='lambda2',InvocationType='RequestResponse')
print ('Response--->' , response_2)
def lambda3(ds,**kwargs):
lambda_client = boto3.client('lambda',
region_name='us-east-1',
aws_access_key_id='',
aws_secret_access_key='')
response_3=lambda_client.invoke(FunctionName='lambda3',InvocationType='RequestResponse')
print ('Response--->' , response_3)
"""
Default arguments
"""
default_args = {
'owner': 'Prabhu',
'depends_on_past': False,
'start_date': datetime(2019,10,23),
'email': ['@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}
dag = DAG(
'invocation_lambda',
default_args=default_args,
description='invoke a lambda in dev aws instance'
)
"""
invoke a lambda in aws instance
"""
start_operator = DummyOperator(task_id='Begin_execution', dag=dag)
invoke_lambda1 = PythonOperator(
task_id="lambda1",
python_callable=lambda1,
provide_context=True,
dag=dag
)
invoke_lambda2 = PythonOperator(
task_id="lambda2",
python_callable=lambda2,
provide_context=True,
dag=dag
)
invoke_lambda3 = PythonOperator(
task_id="lambda3",
python_callable=lambda3,
provide_context=True,
dag=dag
)
end_operator = DummyOperator(task_id='stop_execution', dag=dag)
"""
Orchestrating the task
"""
start_operator >> invoke_lambda1 >> invoke_lambda2 >> invoke_lambda3 >>
end_operator
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import boto3, json
""
functions
"""
def lambda1(ds,**kwargs):
lambda_client = boto3.client('lambda',
region_name='us-east-1',
aws_access_key_id='',
aws_secret_access_key='')
response_1 = lambda_client.invoke(FunctionName='lambda1',InvocationType='RequestResponse')
print ('Response--->' , response_1)
def lambda2(ds,**kwargs):
lambda_client = boto3.client('lambda',
region_name='us-east-1',
aws_access_key_id='',
aws_secret_access_key='')
response_2 = lambda_client.invoke(FunctionName='lambda2',InvocationType='RequestResponse')
print ('Response--->' , response_2)
def lambda3(ds,**kwargs):
lambda_client = boto3.client('lambda',
region_name='us-east-1',
aws_access_key_id='',
aws_secret_access_key='')
response_3 =
lambda_client.invoke(FunctionName='lambda3',InvocationType='RequestResponse')
print ('Response--->' , response_3)
default_args = {
'owner': 'Prabhu',
'depends_on_past': False,
'start_date': datetime(2019,10,23),
'email': ['@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}
dag = DAG(
'invocation_lambda',
default_args=default_args,
description='invoke a lambda in dev aws instance'
)
"""
invoke a lambda in aws instance
"""
start_operator = DummyOperator(task_id='Begin_execution', dag=dag)
invoke_lambda1 = PythonOperator(
task_id="lambda1",
python_callable=lambda1,
provide_context=True,
dag=dag
)
invoke_lambda2 = PythonOperator(
task_id="lambda2",
python_callable=lambda2,
provide_context=True,
dag=dag
)
invoke_lambda3 = PythonOperator(
task_id="lambda3",
python_callable=lambda3,
provide_context=True,
dag=dag
)
end_operator = DummyOperator(task_id='stop_execution', dag=dag)
"""
Orchestrating the task
"""
start_operator >> invoke_lambda1 >> invoke_lambda2 >> invoke_lambda3 >>
end_operator
Мне нужно вызывать третью задачу только после того, как вторая задача (вызов и выполнение лямбды) успешно завершена