Настройка AWS Lambda для параллельных вычислений для Dynamodb Streams - PullRequest
1 голос
/ 28 марта 2019

У меня есть flask на EC2 и python 3.6 AWS Lambda архитектура.Когда ответ приходит к flask, новый элемент добавляется к dynamoDB, что вызывает Lambda, который запускает некоторый процесс с новым добавленным элементом.По какой-то странной причине он не обрабатывает триггеры параллельно, запускает новую лямбда-функцию для каждого триггера, а обрабатывает их один за другим.

Я пытался установить максимальное значение concurrency, но это не сработало.

Мне нужно получить результат как можно быстрее, и я не управляю никакими процессами масштабирования самостоятельно.Таким образом, триггеры должны обрабатываться параллельно, а не один за другим, как сейчас.

Ответы [ 3 ]

0 голосов
/ 01 апреля 2019

Количество параллельных лямбд контролируется количеством осколков, в которые вы пишете, в DynamodB.

Amazon DynamoDB, AWS Lambda опрашивает ваш поток и вызывает вашу функцию Lambda. Когда ваша лямбда-функция ограничена, лямбда пытается обработать ограниченный пакет записей до истечения срока действия данных. Этот период времени может быть до семи дней для Amazon Kinesis. Задушенный запрос обрабатывается как блокировка для каждого шарда, и Лямбда не читает никаких новых записей с осколка, пока ограниченный пакет записей либо истекает, либо завершается успешно. Если в потоке более одного осколка, Лямбда продолжает вызывать нерегулируемые осколки до тех пор, пока один из них не пройдет.

источник

Это сделано для того, чтобы контролировать, что the events are processed in order они были сделаны на DynamodB. Но количество осколков не контролируется вами напрямую.

Теперь лучшее, что вы можете сделать, это

  1. установить большее значение Batch size в лямбда-функции. Сделав это, вы получите несколько событий в одной и той же лямбде. Вы можете иметь параллелизм в лямбда-функции, чтобы обрабатывать их все вместе. но это будет иметь очевидные недостатки, например, что если вы не сможете обработать их все до истечения времени лямбда Вы должны будете убедиться, что код является потокобезопасным.
0 голосов
/ 02 апреля 2019

Возможно, запись в DynamoDB блокирует параллелизм в этом случае.

Альтернативная архитектура для быстрой и очень масштабируемой обработки элементов: добавляйте элементы в корзину S3 в виде файлов. Затем триггер на S3 ведро запустит лямбду. Новый файл - новая лямбда, таким образом, только параллелизм лямбды ограничил бы количество лямбд, которые у вас есть параллельно.

0 голосов
/ 28 марта 2019
  1. Если вы разрабатываете лямбда-функцию с Python, параллелизм не приходит по умолчанию. Lambda поддерживает Python 2.7 и Python 3.6, оба из которых имеют многопроцессорные и многопоточные модули.
  2. С другой стороны, вы можете использовать multiprocessing.Pipe вместо multiprocessing.Queue, чтобы выполнить то, что вам нужно, без каких-либо ошибок во время выполнения функции Lambda.

Пожалуйста, обратитесь по ссылке ниже для получения более подробной информации об исходном коде для параллельного выполнения:

https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/

Также вы можете ссылаться на код ниже:

import time
import multiprocessing

region_maps = {
        "eu-west-1": {
            "dynamodb":"dynamodb.eu-west-1.amazonaws.com"
        },
        "us-east-1": {
            "dynamodb":"dynamodb.us-east-1.amazonaws.com"
        },
        "us-east-2": {
            "dynamodb": "dynamodb.us-east-2.amazonaws.com"
        }
    }

def multiprocessing_func(region):
    time.sleep(1)
    endpoint = region_maps[region]['dynamodb']
    print('endpoint for {} is {}'.format(region, endpoint))

def lambda_handler(event, context):
    starttime = time.time()
    processes = []
    regions = ['us-east-1', 'us-east-2', 'eu-west-1']
    for region in regions:
        p = multiprocessing.Process(target=multiprocessing_func, args=(region,))
        processes.append(p)
        p.start()

    for process in processes:
        process.join()

    output = 'That took {} seconds'.format(time.time() - starttime)
    print(output)
    return output

Надеюсь, это поможет.

...