Как отправить события не-cloudwatch JSON в Splunk из Kinesis Firehose? - PullRequest
0 голосов
/ 28 апреля 2020

Я пытаюсь отправить событие, не относящееся к облачным часам, в Splunk из Kinesis Firehose. Я обрабатываю событие с помощью лямбды и возвращаю его обратно в Firehose в следующем формате ( требуется для Firehose ):

{ 
    "records": [
        {
          "recordId": "2345678",
          "result": "Ok",
          "data": [base64-encoded custom JSON]
        }
    ]
}

Однако, оно выдает неопределенную ошибку синтаксического анализа, как только оно попадает в Splunk с помощью справочной ссылки, которая никуда не ведет:

"errorCode":"Splunk.InvalidDataFormat","errorMessage":"The data is not formatted correctly. To see how to properly format data for Raw or Event HEC endpoints, see Splunk Event Data (http://dev.splunk.com/view/event-collector/SP-CAAAE6P#data)"

Что мне здесь не хватает? Кажется странным, что конечная точка HE C не сможет анализировать сообщения, поступающие от Firehose, в их стандартном формате.

Я отправляю сообщение в конечную точку события HE C, используя splunk_configuration блок в модуле Terraform aws_kinesis_firehose_delivery_stream .

1 Ответ

0 голосов
/ 28 апреля 2020

Разобрался! Для потомков, поскольку это недостаточно хорошо задокументировано:

Ваше поле data в полезной нагрузке Kinesis Firehose должно быть объектом в кодировке base64, который следует за сборщиком событий Splunk spe c.

Пока и Firehose и Splunk могут читать полезную нагрузку, которую возвращает Lambda, она не должна выдавать ошибку.

Вот код для Трансформатор Kinesis Firehose Lambda (узел исполнения 12):

/*
* Transformer for sending Kinesis Firehose events to Splunk
*
* Properly formats incoming messages for Splunk ingestion
* Returned object gets fed back into Kinesis Firehose and sent to Splunk
*/

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found
    let dropped = 0; // Number of dropped entries

    /* Process the list of records and transform them to adhere to Splunk specs */
    const output = event.records.map((record) => {
        try {
            const entry = (Buffer.from(record.data, 'base64')).toString('utf8');

            /*
             * IMPORTANT: `data` object should follow Splunk event formatting specs prior to encoding.
             * Otherwise, it will throw a parsing error.
             * https://docs.splunk.com/Documentation/Splunk/8.0.3/Data/FormateventsforHTTPEventCollector
             */
            const obj = {
                sourcetype: "aws:firehose:json", // Required, will error
                event: JSON.parse(entry)
            }
            const payload = (Buffer.from(JSON.stringify(obj), 'utf8')).toString('base64');
            success++;
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
        } catch (e) {
            failure++
            console.error(e.message());
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed'
            };
        }
    });
    console.log(`Processing completed.  Successful records ${success}. Failed records ${failure}.`);
    callback(null, {records: output});
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...