Разобрался! Для потомков, поскольку это недостаточно хорошо задокументировано:
Ваше поле data
в полезной нагрузке Kinesis Firehose должно быть объектом в кодировке base64, который следует за сборщиком событий Splunk spe c.
Пока и Firehose и Splunk могут читать полезную нагрузку, которую возвращает Lambda, она не должна выдавать ошибку.
Вот код для Трансформатор Kinesis Firehose Lambda (узел исполнения 12):
/*
* Transformer for sending Kinesis Firehose events to Splunk
*
* Properly formats incoming messages for Splunk ingestion
* Returned object gets fed back into Kinesis Firehose and sent to Splunk
*/
'use strict';
console.log('Loading function');
exports.handler = (event, context, callback) => {
let success = 0; // Number of valid entries found
let failure = 0; // Number of invalid entries found
let dropped = 0; // Number of dropped entries
/* Process the list of records and transform them to adhere to Splunk specs */
const output = event.records.map((record) => {
try {
const entry = (Buffer.from(record.data, 'base64')).toString('utf8');
/*
* IMPORTANT: `data` object should follow Splunk event formatting specs prior to encoding.
* Otherwise, it will throw a parsing error.
* https://docs.splunk.com/Documentation/Splunk/8.0.3/Data/FormateventsforHTTPEventCollector
*/
const obj = {
sourcetype: "aws:firehose:json", // Required, will error
event: JSON.parse(entry)
}
const payload = (Buffer.from(JSON.stringify(obj), 'utf8')).toString('base64');
success++;
return {
recordId: record.recordId,
result: 'Ok',
data: payload,
};
} catch (e) {
failure++
console.error(e.message());
return {
recordId: record.recordId,
result: 'ProcessingFailed'
};
}
});
console.log(`Processing completed. Successful records ${success}. Failed records ${failure}.`);
callback(null, {records: output});
}