Как поместить записи в поток Kinesis из приложения NodeJS (лямбда) - PullRequest
0 голосов
/ 24 января 2019

У меня есть лямбда-функция, которая должна помещать записи в поток Kinesis. Я не получаю сообщение об ошибке (которое я распознаю), но сообщения, кажется, никогда не попадают в поток ...

Я знаю, что сам поток работает, так как я могу отправлять ему сообщения с помощью приложения aws cli. Я проверил имя потока и другие параметры, переданные в функцию putRecord ().

Я использую этот код для отправки записей в поток:

const params = {
      Data: payload,
      PartitionKey: partitionKey,
      StreamName: this.streamName,
    };
    const res = await this.awsKinesis.putRecord(params);

res большой сложный объект, но он содержит error: null ...

{
    "domain": null,
    "service": {
        "config": {
            "credentials": {
                "expired": false,
                "expireTime": null,
                "accessKeyId": "ASIAT5YUDX4OWH5FGFYE",
                "sessionToken": "FQoGZXIvYXdzEJr//////////wEaDFGkyxe1r9QSkkhSSyKKAgbrbB6ef77wtuCC4zIH3YB7C0xJPPoql1YtRGaxba5ZDSCwBBRSQ0cBeTPMmtdUqRGshdJjjLosON6QG0FGWdt3TNDrENxqFtxjrQAbCHXfIx3ARtnn6r2agZjXi9cGZhkdpvUMSIUpaC3ZC+E9wLLvkZyQBfTSsv6QdcoaKGqT8tJ9Px7Wp5BSV3Nw//NE0GtJwv0pXiQrb3c6p6GkETtAxBBVVgwJP1WYdF+kh+Gg24DxMPwwy66ayD6E7oZIWB4i7JaqMXHoDjf9D51bpWPUAVCKF9AVn3t4JiKFBVw7lFQC0m91N9HdcKLzGmjpvX4JJNzKwBA/D1TfALDsprrvU1u7r/RlyabzKIHtpeIF",
                "envPrefix": "AWS"
            },
            "credentialProvider": {
                "providers": [null, null, null, null]
            },
            "region": "eu-west-1",
            "logger": null,
            "apiVersions": {},
            "apiVersion": null,
            "endpoint": "kinesis.eu-west-1.amazonaws.com",
            "httpOptions": {
                "timeout": 120000
            },
            "maxRedirects": 10,
            "paramValidation": true,
            "sslEnabled": true,
            "s3ForcePathStyle": false,
            "s3BucketEndpoint": false,
            "s3DisableBodySigning": true,
            "computeChecksums": true,
            "convertResponseTypes": true,
            "correctClockSkew": false,
            "customUserAgent": null,
            "dynamoDbCrc32": true,
            "systemClockOffset": 0,
            "signatureVersion": "v4",
            "signatureCache": true,
            "retryDelayOptions": {},
            "useAccelerateEndpoint": false
        },
        "isGlobalEndpoint": false,
        "endpoint": {
            "protocol": "https:",
            "host": "kinesis.eu-west-1.amazonaws.com",
            "port": 443,
            "hostname": "kinesis.eu-west-1.amazonaws.com",
            "pathname": "/",
            "path": "/",
            "href": "https://kinesis.eu-west-1.amazonaws.com/"
        },
        "_clientId": 1
    },
    "operation": "putRecord",
    "params": {
        "Data": "<< THE MESSAGE >>",
        "PartitionKey": "c770e429-52e7-47c4-bcbc-497548ff9dee",
        "StreamName": "my-stream"
    },
    "httpRequest": {
        "method": "POST",
        "path": "/",
        "headers": {
            "User-Agent": "aws-sdk-nodejs/2.290.0 linux/v6.10.3 exec-env/AWS_Lambda_nodejs6.10"
        },
        "body": "",
        "endpoint": {
            "protocol": "https:",
            "host": "kinesis.eu-west-1.amazonaws.com",
            "port": 443,
            "hostname": "kinesis.eu-west-1.amazonaws.com",
            "pathname": "/",
            "path": "/",
            "href": "https://kinesis.eu-west-1.amazonaws.com/"
        },
        "region": "eu-west-1",
        "_userAgent": "aws-sdk-nodejs/2.290.0 linux/v6.10.3 exec-env/AWS_Lambda_nodejs6.10"
    },
    "startTime": "2019-01-24T08:25:42.574Z",
    "response": {
        "request": "~context",
        "data": null,
        "error": null,
        "retryCount": 0,
        "redirectCount": 0,
        "httpResponse": {
            "headers": {},
            "streaming": false,
            "stream": null
        },
        "maxRetries": 3,
        "maxRedirects": 10
    },
    "_asm": {
        "currentState": "validate",
        "states": {
            "validate": {
                "accept": "build",
                "fail": "error"
            },
            "build": {
                "accept": "afterBuild",
                "fail": "restart"
            },
            "afterBuild": {
                "accept": "sign",
                "fail": "restart"
            },
            "sign": {
                "accept": "send",
                "fail": "retry"
            },
            "retry": {
                "accept": "afterRetry",
                "fail": "afterRetry"
            },
            "afterRetry": {
                "accept": "sign",
                "fail": "error"
            },
            "send": {
                "accept": "validateResponse",
                "fail": "retry"
            },
            "validateResponse": {
                "accept": "extractData",
                "fail": "extractError"
            },
            "extractError": {
                "accept": "extractData",
                "fail": "retry"
            },
            "extractData": {
                "accept": "success",
                "fail": "retry"
            },
            "restart": {
                "accept": "build",
                "fail": "error"
            },
            "success": {
                "accept": "complete",
                "fail": "complete"
            },
            "error": {
                "accept": "complete",
                "fail": "complete"
            },
            "complete": {
                "accept": null,
                "fail": null
            }
        }
    },
    "_haltHandlersOnError": false,
    "_events": {
        "validate": [null, null, null, null],
        "afterBuild": [null, null, null],
        "restart": [null],
        "sign": [null],
        "validateResponse": [null],
        "send": [null],
        "httpHeaders": [null],
        "httpData": [null],
        "httpDone": [null],
        "retry": [null, null, null, null, null, null],
        "afterRetry": [null],
        "build": [null],
        "extractData": [null, null],
        "extractError": [null, null],
        "httpError": [null]
    }
}

Я ожидаю, что сообщение появится в потоке Kinesis и запустит лямбду, запускаемую потоком, но этого никогда не происходит. Даже в веб-консоли поток Kinesis не показывает активности на вкладке мониторинга. Что я могу делать не так?

1 Ответ

0 голосов
/ 30 апреля 2019

Согласно документации возвращаемое значение - объект Request.

putRecord (params = {}, обратный вызов) ⇒ AWS.Request

Вам необходимо вызвать promise() для этого Request объекта, чтобы получить обещание, которое вы затем можете await включить.

const res = await this.awsKinesis.putRecord(params).promise();
...