Длительная лямбда-ожидание ожидания ответа в Airflow - PullRequest
0 голосов
/ 25 октября 2019

Мы вызываем определенные лямбды через оператора Airflow. В операторе выполняется вызов lambda.invoke в библиотеке boto3 (через AwsLambdaHook).

Однако одна из лямбд имеет длительное время работы (10-15 минут). Первоначально оператор получил исключение времени ожидания boto3 из-за того, что время ожидания по умолчанию на invoke составляет 60 секунд.

Поэтому мы расширили AwsLambdaHook, чтобы разрешить передачу конфигурации клиенту boto(код для справки):

from airflow.contrib.hooks.aws_hook import AwsHook


class AwsLambdaHook(AwsHook):
    """
    Interact with AWS Lambda

    :param function_name: AWS Lambda Function Name
    :type function_name: str
    :param region_name: AWS Region Name (example: us-west-2)
    :type region_name: str
    :param log_type: Tail Invocation Request
    :type log_type: str
    :param qualifier: AWS Lambda Function Version or Alias Name
    :type qualifier: str
    :param invocation_type: AWS Lambda Invocation Type (RequestResponse, Event etc)
    :type invocation_type: str
    :param config: Config to pass to the lambda connection
    :type config: boto3.Config
    """

    def __init__(
        self,
        function_name,
        region_name=None,
        log_type="None",
        qualifier="$LATEST",
        invocation_type="RequestResponse",
        config=None,
        *args,
        **kwargs
    ):
        self.function_name = function_name
        self.region_name = region_name
        self.log_type = log_type
        self.invocation_type = invocation_type
        self.qualifier = qualifier
        self.config = config
        super(AwsLambdaHook, self).__init__(*args, **kwargs)

    def get_conn(self):
        self.conn = self.get_client_type("lambda", self.region_name, self.config)
        return self.conn

    def invoke_lambda(self, payload):
        """
        Invoke Lambda Function
        """

        awslambda_conn = self.get_conn()

        response = awslambda_conn.invoke(
            FunctionName=self.function_name,
            InvocationType=self.invocation_type,
            LogType=self.log_type,
            Payload=payload,
            Qualifier=self.qualifier,
        )

        return response

И мы передаем следующую конфигурацию:

from botocore.client import Config

config_dict = {"connect_timeout": 5, "read_timeout": 900}
config = Config(**config_dict)

Теперь оператор больше не получает исключение времени ожидания boto3. Однако, это закончилось тем, что мы достигли нашего тайм-аута выполнения задачи Airflow (настроенный на 1 час). В журналах я видел следующее:

[2019-10-25 07:52:05,172] {aws_lambda_operator.py:114} INFO - AWS Lambda: invoking Lambdas20
[2019-10-25 07:52:05,316] {logging_mixin.py:95} INFO - [[34m2019-10-25 07:52:05,316[0m] {[34mconnectionpool.py:[0m735} INFO[0m - Starting new HTTPS connection (1): lambda.ap-southeast-1.amazonaws.com[0m
[2019-10-25 08:07:07,097] {logging_mixin.py:95} INFO - [[34m2019-10-25 08:07:07,097[0m] {[34mconnectionpool.py:[0m735} INFO[0m - Starting new HTTPS connection (2): lambda.ap-southeast-1.amazonaws.com[0m
[2019-10-25 08:22:08,489] {logging_mixin.py:95} INFO - [[34m2019-10-25 08:22:08,489[0m] {[34mconnectionpool.py:[0m735} INFO[0m - Starting new HTTPS connection (3): lambda.ap-southeast-1.amazonaws.com[0m
[2019-10-25 08:37:11,402] {logging_mixin.py:95} INFO - [[34m2019-10-25 08:37:11,402[0m] {[34mconnectionpool.py:[0m735} INFO[0m - Starting new HTTPS connection (4): lambda.ap-southeast-1.amazonaws.com[0m
[2019-10-25 08:52:05,172] {logging_mixin.py:95} INFO - [[34m2019-10-25 08:52:05,172[0m] {[34mtime

Кажется, что ответа от AWS никогда не бывает. Лямбда уже завершена, хотя. Тот же самый лямбда-код (просто работающий с меньшим количеством данных и работающий в другом регионе) отлично работает с лямбда-оператором, поэтому я уверен, что проблема не в лямбда-выражении.

Теперь у меня есть 2 вопроса:

  1. Кто-нибудь испытывал подобное поведение, прежде чем вызывать лямбду раньше? Каково было разрешение?

  2. Существует ли альтернативный подход, в котором мы можем асинхронно опрашивать результат? Я посмотрел в boto3 документации, но выглядело так, как будто invoke_async устарело.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...