Повреждение данных в Kinesis Producer - PullRequest
0 голосов
/ 28 мая 2020

Я использую производитель kinesis, чтобы вставить некоторые аналитические данные в AWS Cloudwatch. Это полный поток данных: 1. Производитель Kinesis (с использованием java SDK) -> Kinesis Stream -> AWS лямбда-функция -> AWS Cloudwatch

Версия производителя, используемая в Kinesis:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.13.1</version>
</dependency>

После отправки моих данных аналитики производителю с помощью следующего кода:

kinesisProducer.addUserRecord(stream, "101", ByteBuffer.wrap(requestString.getBytes()));

Данные отправляются производителю:

{
    "version": null,
    "namespace": "namespace",
    "metricdata": [
        {
            "name": "system",
            "unit": "Megabytes",
            "value": 879,
            "timestamp": 1590233414481,
            "dimensions": [
                {
                    "name": "systemName",
                    "value": "ramTotal"
                }
            ],
            "type": "gauge"
        }]
}

Я пришел к знайте, что данные конвертируются в формат base64 производителем kinesis. Следовательно, данные, полученные в лямбда-функции после такого декодирования -

const payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');

полезная нагрузка поступает вот так

s   B
101fGaG
{
    "version": null,
    "namespace": "`$\u0017`$>`$0o?=o?=`$!`$\u0010`$*",
    "metricdata": [
        {
            "name": "system",
            "unit": "Megabytes",
            "value": 879,
            "timestamp": 1590233414481,
            "dimensions": [
                {
                    "name": "systemName",
                    "value": "ramTotal"
                }
            ],
            "type": "gauge"
        }]
}

Может ли кто-нибудь помочь мне с этой проблемой.

1 Ответ

0 голосов
/ 26 июня 2020

Kinesis Producer объединяет небольшие записи в записи большего размера до 1 МБ. Потребитель Kinesis должен деагрегировать записи перед обработкой.

ссылка на ссылку

библиотека деагрегации

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...