Мы вызываем определенные лямбды через оператора Airflow. В операторе выполняется вызов lambda.invoke
в библиотеке boto3 (через AwsLambdaHook
).
Однако одна из лямбд имеет длительное время работы (10-15 минут). Первоначально оператор получил исключение времени ожидания boto3 из-за того, что время ожидания по умолчанию на invoke
составляет 60 секунд.
Поэтому мы расширили AwsLambdaHook
, чтобы разрешить передачу конфигурации клиенту boto
(код для справки):
from airflow.contrib.hooks.aws_hook import AwsHook
class AwsLambdaHook(AwsHook):
"""
Interact with AWS Lambda
:param function_name: AWS Lambda Function Name
:type function_name: str
:param region_name: AWS Region Name (example: us-west-2)
:type region_name: str
:param log_type: Tail Invocation Request
:type log_type: str
:param qualifier: AWS Lambda Function Version or Alias Name
:type qualifier: str
:param invocation_type: AWS Lambda Invocation Type (RequestResponse, Event etc)
:type invocation_type: str
:param config: Config to pass to the lambda connection
:type config: boto3.Config
"""
def __init__(
self,
function_name,
region_name=None,
log_type="None",
qualifier="$LATEST",
invocation_type="RequestResponse",
config=None,
*args,
**kwargs
):
self.function_name = function_name
self.region_name = region_name
self.log_type = log_type
self.invocation_type = invocation_type
self.qualifier = qualifier
self.config = config
super(AwsLambdaHook, self).__init__(*args, **kwargs)
def get_conn(self):
self.conn = self.get_client_type("lambda", self.region_name, self.config)
return self.conn
def invoke_lambda(self, payload):
"""
Invoke Lambda Function
"""
awslambda_conn = self.get_conn()
response = awslambda_conn.invoke(
FunctionName=self.function_name,
InvocationType=self.invocation_type,
LogType=self.log_type,
Payload=payload,
Qualifier=self.qualifier,
)
return response
И мы передаем следующую конфигурацию:
from botocore.client import Config
config_dict = {"connect_timeout": 5, "read_timeout": 900}
config = Config(**config_dict)
Теперь оператор больше не получает исключение времени ожидания boto3. Однако, это закончилось тем, что мы достигли нашего тайм-аута выполнения задачи Airflow (настроенный на 1 час). В журналах я видел следующее:
[2019-10-25 07:52:05,172] {aws_lambda_operator.py:114} INFO - AWS Lambda: invoking Lambdas20
[2019-10-25 07:52:05,316] {logging_mixin.py:95} INFO - [[34m2019-10-25 07:52:05,316[0m] {[34mconnectionpool.py:[0m735} INFO[0m - Starting new HTTPS connection (1): lambda.ap-southeast-1.amazonaws.com[0m
[2019-10-25 08:07:07,097] {logging_mixin.py:95} INFO - [[34m2019-10-25 08:07:07,097[0m] {[34mconnectionpool.py:[0m735} INFO[0m - Starting new HTTPS connection (2): lambda.ap-southeast-1.amazonaws.com[0m
[2019-10-25 08:22:08,489] {logging_mixin.py:95} INFO - [[34m2019-10-25 08:22:08,489[0m] {[34mconnectionpool.py:[0m735} INFO[0m - Starting new HTTPS connection (3): lambda.ap-southeast-1.amazonaws.com[0m
[2019-10-25 08:37:11,402] {logging_mixin.py:95} INFO - [[34m2019-10-25 08:37:11,402[0m] {[34mconnectionpool.py:[0m735} INFO[0m - Starting new HTTPS connection (4): lambda.ap-southeast-1.amazonaws.com[0m
[2019-10-25 08:52:05,172] {logging_mixin.py:95} INFO - [[34m2019-10-25 08:52:05,172[0m] {[34mtime
Кажется, что ответа от AWS никогда не бывает. Лямбда уже завершена, хотя. Тот же самый лямбда-код (просто работающий с меньшим количеством данных и работающий в другом регионе) отлично работает с лямбда-оператором, поэтому я уверен, что проблема не в лямбда-выражении.
Теперь у меня есть 2 вопроса:
Кто-нибудь испытывал подобное поведение, прежде чем вызывать лямбду раньше? Каково было разрешение?
Существует ли альтернативный подход, в котором мы можем асинхронно опрашивать результат? Я посмотрел в boto3
документации, но выглядело так, как будто invoke_async
устарело.