AWS лямбда-преобразование в шланг: Python - PullRequest
0 голосов
/ 02 августа 2020

Я хочу преобразовать данные потока AWS kinesis с помощью лямбда-функции, а затем доставить на S3 с помощью шланга AWS. Однако я всегда сталкивался с этой проблемой: errorCode ":" Lambda.FunctionError "," errorMessage ":" Функция Lambda была успешно запущена, но вернула результат ошибки ".

Это lambda_function.


import base64
import json

def lambda_handler(event, context):
    output = []
    for record in event['Records']:
        # your own business logic.
        json_object = {"name": "this is a test"}
        output_record = {
            'recordId': record['eventID'], # is this the problem? I used sequenceNumber, it is not right. 
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['Records'])))
    return {'records': output}

Здесь был размещен соответствующий вопрос. Лямбда-преобразование Kinesis Firehose . Но, похоже, формат данных кинезиса отличается от того, что у меня есть. Заметил, что события, которые я получил похожи на следующие: это Capital Records , а не records . И нет recordId , но это eventID .

{
    'Records': [
        {
            'kinesis': {
                'kinesisSchemaVersion': '1.0', 
                'partitionKey': '1', 
                'sequenceNumber': '49603262076998903856573762341186472148109820820203765762', 
                'data':'eyJwcm9wIjogIjc5IiwgInRpbWVzdGFtcCI6ICIxNTk2MzE0MjM0IiwgInRoaW5nX2lkIjogImFhLWJiIn0=', 
                'approximateArrivalTimestamp': 1596314234.567
            }, 
            'eventSource': 'aws:kinesis', 
            'eventVersion': '1.0', 
            'eventID': 'shardId-000000000000:49603262076998903856573762341186472148109820820203765762', 
            'eventName': 'aws:kinesis:record', 
            'invokeIdentityArn':'xxx', 
            'awsRegion': 'us-east-1', 
            'eventSourceARN': 'xxx'
        }
    ]
}

1 Ответ

1 голос
/ 02 августа 2020

Это зависит от того, как вы настроили конвейер Kinesis, Firehose и Lambda.

Если ваш поток Kinesis запускает Lambda для доставки данных в Firehose, то вас заинтересует событие записи Kinesis. Оформить заказ Использование AWS Lambda с Amazon Kinesis . Пример события ниже

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

Другой настройкой может быть Firehose, опрашивающий поток Kinesis. Кроме того, мы получаем гибкость в настройке преобразования Lambda для Firehose ( Amazon Kinesis Data Firehose Data Transformation ). В этом примере настройки событие будет выглядеть следующим образом ( Использование AWS Lambda с Amazon Kinesis Data Firehose )

{
  "invocationId": "invoked123",
  "deliveryStreamArn": "aws:lambda:events",
  "region": "us-west-2",
  "records": [
    {
      "data": "SGVsbG8gV29ybGQ=",
      "recordId": "record1",
      "approximateArrivalTimestamp": 1510772160000,
      "kinesisRecordMetadata": {
        "shardId": "shardId-000000000000",
        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
        "approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z",
        "sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
        "subsequenceNumber": ""
      }
    },
    {
      "data": "SGVsbG8gV29ybGQ=",
      "recordId": "record2",
      "approximateArrivalTimestamp": 151077216000,
      "kinesisRecordMetadata": {
        "shardId": "shardId-000000000001",
        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
        "approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z",
        "sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
        "subsequenceNumber": ""
      }
    }
  ]
}
  1. Лямбда-преобразование Kinesis Firehose вопрос, похоже, касается второго типа настройки.
  2. Ваш конвейер данных, похоже, использует первый тип настройки.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...