AWS Firehose - Где определение формата события для обработки в лямбде? - PullRequest
0 голосов
/ 18 марта 2020

Преобразование данных Amazon Kinesis Firehose не предоставляет никакой информации о формате данных событий, поступающих в лямбда-функцию из Firehose.

Как мы могли бы кодировать лямбда-функцию для выполнения преобразования без такой информации?

Ответы [ 2 ]

1 голос
/ 19 марта 2020

После долгих затрат времени:

enter image description here

Чтобы получить событие, прибывающее в Lambda от Firehose.

$ sam local generate-event kinesis kinesis-firehose
{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
  "region": "us-east-1",
  "records": [
    {
      "recordId": "49546986683135544286507457936321625675700192471156785154",
      "approximateArrivalTimestamp": 1495072949453,
      "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
    }
  ]
}

Тестирование Firehose / Lambda

Тестирование Создание современного приложения на AWS - Модуль 5 Firehose и лямбда с CLI .

Проверка лямбды

aws lambda invoke --function-name ${FUNCTION_NAME} \
--qualifier ${FUNCTION_ALIAS} \
--payload file://./event.json \
response.json
  • событие. json
{
  "records": [
    {
      "recordId": "1",
      "data": "'eyJ1c2VySWQiOiAiY3VycmVudFVzZXJJZCIsICJteXNmaXRJZCI6ICI0ZTUzOTIwYy01MDVhLTRhOTAtYTY5NC1iOTMwMDc5MWYwYWUifQ=='"
    }
  ]
}

Результат Лямбда-лог.

START RequestId: e15a50f9-20a5-48ce-9942-9681291910fe Version: 13
{'records': [{'recordId': '1', 'data': "'eyJ1c2VySWQiOiAiY3VycmVudFVzZXJJZCIsICJteXNmaXRJZCI6ICI0ZTUzOTIwYy01MDVhLTRhOTAtYTY5NC1iOTMwMDc5MWYwYWUifQ=='"}]}
Processing record: 1
{
    "userId": "currentUserId",
    "mysfitId": "4e53920c-505a-4a90-a694-b9300791f0ae",
    "goodevil": "Evil",
    "lawchaos": "Lawful",
    "species": "Chimera"
}
Successfully processed 1 records.

Проверка пожарного шланга + лямбда

echo "Testing Firehose put-record using --record file://./data.json"
aws firehose put-record --delivery-stream-name ${DELIVERY_STREAM_NAME} \
--record file://./data.json

echo "Testing put-record using  --record='{"Data": "{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"}'"
# aws firehose put-record --delivery-stream-name mystream --record="{\"Data\":\"1\"}"
aws firehose put-record --delivery-stream-name "${DELIVERY_STREAM_NAME}" \
--record='{"Data": "{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"}'

echo "Testing Firehose put-record using --cli-input-json"
aws firehose put-record \
--cli-input-json '
{
    "DeliveryStreamName": '\"${DELIVERY_STREAM_NAME}\"',
    "Record": {
        "Data": "{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"
    }
}'

данных. json

{
    "Data":"{\"userId\": \"2\",\"mysfitId\": \"2b473002-36f8-4b87-954e-9a377e0ccbec\"}"
}

Результат

START RequestId: 94007e93-31d8-4da5-8231-c7cafa0d363a Version: 13
{'invocationId': '6bd3e736-2ad8-41d4-9485-a0aad1806990', 'deliveryStreamArn': 'arn:aws:firehose:us-east-2:200506027189:deliverystream/masa-ecs_monolith-firehose-extended-s3-firehose-click-stream', 'region': 'us-east-2', 'records': [{'recordId': '49605256299907973028537486643826326105740520545077690370000000', 'approximateArrivalTimestamp': 1584590301809, 'data': 'eyJ1c2VySWQiOiAiMiIsIm15c2ZpdElkIjogIjJiNDczMDAyLTM2ZjgtNGI4Ny05NTRlLTlhMzc3ZTBjY2JlYyJ9'}, {'recordId': '49605256299907973028537486643827535031560135311691350018000000', 'approximateArrivalTimestamp': 1584590303745, 'data': 'eyJ1c2VySWQiOiAiMiIsIm15c2ZpdElkIjogIjJiNDczMDAyLTM2ZjgtNGI4Ny05NTRlLTlhMzc3ZTBjY2JlYyJ9'}, {'recordId': '49605256299907973028537486643828743957379750009585532930000000', 'approximateArrivalTimestamp': 1584590305222, 'data': 'eyJ1c2VySWQiOiAiMiIsIm15c2ZpdElkIjogIjJiNDczMDAyLTM2ZjgtNGI4Ny05NTRlLTlhMzc3ZTBjY2JlYyJ9'}]}

Processing record: 49605256299907973028537486643826326105740520545077690370000000
{
    "userId": "2",
    "mysfitId": "2b473002-36f8-4b87-954e-9a377e0ccbec",
    "goodevil": "Neutral",
    "lawchaos": "Lawful",
    "species": "Cyclops"
}
Processing record: 49605256299907973028537486643827535031560135311691350018000000
{
    "userId": "2",
    "mysfitId": "2b473002-36f8-4b87-954e-9a377e0ccbec",
    "goodevil": "Neutral",
    "lawchaos": "Lawful",
    "species": "Cyclops"
}
Processing record: 49605256299907973028537486643828743957379750009585532930000000
{
    "userId": "2",
    "mysfitId": "2b473002-36f8-4b87-954e-9a377e0ccbec",
    "goodevil": "Neutral",
    "lawchaos": "Lawful",
    "species": "Cyclops"
}
Successfully processed 3 records.

Ссылки

Боюсь, что документ AWS Firehose так плохо написан, да не служит техническим документом.

Не тратить время на то, чтобы лично провести go через блоги и репозитории github, а не документ AWS Firehose.

Я надеюсь, AWS серьезно улучшит документ, так что нам не придется искать вокруг github, блоги, много экспериментируют.

1 голос
/ 18 марта 2020

Вот пример лямбда для python 3.7. Преобразование добавляет | между записями Firehose.

      import base64

      import json

      def lambda_handler(event, context):

          output = []

          print(json.dumps(event))

          for record in event['records']:

              print(record['recordId'])

              payload = base64.b64decode(record['data'])    

              output_record = {
                  'recordId': record['recordId'],
                  'result': 'Ok',
                  'data': base64.b64encode(payload + b'|').decode("utf-8")
              }

              output.append(output_record)

          return {'records': output}        

И пример пожарной части event (частичный вывод, как полный - это длинный путь к публикации:

{   "invocationId":"81087760-69e0-4e50-a12e-4fb46d05678a",
   "sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:850577719404:stream/a02e-kinesis-stream-MyKinesisStream-6JYA08YTEN6L",
   "deliveryStreamArn":"arn:aws:firehose:us-east-1:850577719404:deliverystream/a02f-firehose-MyFirehose-XHPEHGN8H2RX",
   "region":"us-east-1",
   "records":[      {         "recordId":"49605230427854536169624763988300178155600757073314316306000000",
         "approximateArrivalTimestamp":1584514759230,
         "data":"eyJtZXNzYWdlX2lkIjoxOTcsIm1zZ19ubyI6MjQxLCJhc2ciOiJhMDZlLUFTRy1jb25zdW1lcjEtTXlMYXVjaFRlbXBsYXRlU3RhY2stOTZHRVpZRTU0MkdFIn0=",
         "kinesisRecordMetadata":{
            "sequenceNumber":"49605230427854536169624763988300178155600757073314316306",
            "subsequenceNumber":0,
            "partitionKey":"fadff67a-6803-4db5-8bed-4fcbcb0ed5db",
            "shardId":"shardId-000000000001",
            "approximateArrivalTimestamp":1584514759230

}

},
      {         "recordId":"49605230427854536169624763988301387081420371702489022482000000",
         "approximateArrivalTimestamp":1584514759230,
         "data":"eyJtZXNzYWdlX2lkIjoxOTcsIm1zZ19ubyI6MjQyLCJhc2ciOiJhMDZlLUFTRy1jb25zdW1lcjEtTXlMYXVjaFRlbXBsYXRlU3RhY2stOTZHRVpZRTU0MkdFIn0=",
         "kinesisRecordMetadata":{
            "sequenceNumber":"49605230427854536169624763988301387081420371702489022482",
            "subsequenceNumber":0,
            "partitionKey":"ca681b9d-476e-4bf0-a193-9d67ac7df51b",
            "shardId":"shardId-000000000001",
            "approximateArrivalTimestamp":1584514759230

}

},
      {         "recordId":"49605230427854536169624763988302596007239986331663728658000000",
         "approximateArrivalTimestamp":1584514759230,
         "data":"eyJtZXNzYWdlX2lkIjoxOTcsIm1zZ19ubyI6MjQ1LCJhc2ciOiJhMDZlLUFTRy1jb25zdW1lcjEtTXlMYXVjaFRlbXBsYXRlU3RhY2stOTZHRVpZRTU0MkdFIn0=",
         "kinesisRecordMetadata":{
            "sequenceNumber":"49605230427854536169624763988302596007239986331663728658",
            "subsequenceNumber":0,
            "partitionKey":"ef73dafa-43b1-4b4f-bd57-e3fe56077c96",
            "shardId":"shardId-000000000001",
            "approximateArrivalTimestamp":1584514759230

}

},
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...