Несколько параллельных лямбда-вызовов AWS - PullRequest
0 голосов
/ 26 февраля 2019

Я пытаюсь выполнить несколько вызовов AWS Lambda с python 3.7.2 и пакетом aiobotocore.Вот мой код.

import asyncio
import aiobotocore


async def invoke(payload, session):
    async with session.create_client('lambda', region_name='us-east-1') as client:
        return await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)


def generate_invocations(payloads, session):
    for payload in payloads:
        yield invoke(payload, session)


def invoke_all(payloads):
    loop = asyncio.get_event_loop()

    async def wrapped():
        session = aiobotocore.get_session(loop=loop)
        invocations = generate_invocations(payloads, session)
        return await asyncio.gather(*invocations)

    return loop.run_until_complete(wrapped())


def main():
    payloads_list = []  # MY PAYLOADS LIST 
    lambda_responses = invoke_all(payloads_list)
    print(lambda_responses)


if __name__ == '__main__':
    main()

Код работает очень быстро (для 10 полезных нагрузок в течение 1 секунды вместо 15 с использованием вызовов лямбда-клиента boto3), но у меня есть две проблемы:

1) Элементы в lambda_responses включаютКлюч полезной нагрузки, значение которого имеет тип aiobotocore.response.StreamingBody.Когда я пытаюсь использовать value.read (), я получаю «объект сопрограммы StreamingBody.read» и думаю, что в моем коде есть некоторые проблемы.Я могу получить желаемый ответ через «json.loads (json.loads (r ['Payload'] ._ buffer.pop ()) ['body'])", но как правильно его получить.

2) В редких случаях «Полезная нагрузка» в одном из ответов имеет пустой буфер.Как я могу гарантировать, что функция invoke_all возвращает непустые ответы?Это правильное использование aiobotocore?

Я новичок в Python 3 и асинхронной функциональности.Вдохновленный примерами aiobotocore документации и Мэтью Маркуса blog .

Спасибо!

1 Ответ

0 голосов
/ 27 февраля 2019

1) Элементы в lambda_responses включают ключ «Payload», значение которого имеет тип aiobotocore.response.StreamingBody.Когда я пытаюсь использовать value.read (), я получаю «объект сопрограммы StreamingBody.read»

Это означает, что ожидание сопрограммы read() ожидается, что вы должны делать, находясь в цикле событий.Например, вы можете изменить сопрограмму invoke так, чтобы она также читала ответ:

async def invoke(payload, session):
    async with session.create_client('lambda', region_name='us-east-1') as client:
        resp = await client.invoke(FunctionName='MY_FUNCTION', Payload=payload)
        payload = await r['Payload'].read()
        return payload  # or assemble a dict with relevant parts

2) В редких случаях «Полезная нагрузка» в одном из ответов имеет пустой буфер.

Вероятно, это связано с тем, что вы обращаетесь к буферу до фактического чтения содержимого.В некоторых случаях информация поступает достаточно скоро, чтобы вы все равно нашли ее во внутреннем буфере, но иногда вам приходится ждать ее.Использование открытого метода, такого как read(), гарантирует, что вы правильно используете API.Свойство _buffer, с другой стороны, начинается с подчеркивания, которое означает, что это деталь реализации, к которой нет прямого доступа.

...