AWS Firehose Transformation лямбда помещает все сообщения в одну папку s3 - PullRequest
0 голосов
/ 05 сентября 2018

У меня есть поток Kinesis, я создал поток доставки пожарного шланга и сохранил все данные в s3, он правильно сохранялся в почасовых папках. Затем я написал лямбда-преобразование firehose, развернув все сообщения в одной папке, я не уверен, что мне не хватает. У меня есть поля ниже в моем ответе от лямбда-функции:

result.put("recordId", record.getRecordId());
result.put("result", "Ok");
result.put("approximateArrivalEpoch", record.getApproximateArrivalEpoch());
result.put("approximateArrivalTimestamp",record.getApproximateArrivalTimestamp());
result.put("kinesisRecordMetadata", record.getKinesisRecordMetadata());
result.put("data", Base64.getEncoder().encodeToString(jsonData.getBytes()));

Edit:

Вот мой код в Java. Я использую KinesisFirehoseEvent, для моего случая декодирование не требовалось, и я получил ByteBuffer в KinesisFirehoseEvent

public JSONObject handler(KinesisFirehoseEvent kinesisFirehoseEvent, Context context) {
    final LambdaLogger logger = context.getLogger();
    final JSONArray resultArray = new JSONArray();
    for (final KinesisFirehoseEvent.Record record: kinesisFirehoseEvent.getRecords()) {
      final byte[] data = record.getData().array();
      final Optional<TestData> testData = deserialize(data, logger);
      if (testData.isPresent()) {
        final JSONObject jsonObj = new JSONObject();
        final String jsonData = gson.toJson(testData.get());
        jsonObj.put("recordId", record.getRecordId());
        jsonObj.put("result", "Ok");
        jsonObj.put("approximateArrivalEpoch", record.getApproximateArrivalEpoch());
        jsonObj.put("approximateArrivalTimestamp", record.getApproximateArrivalTimestamp());
        jsonObj.put("kinesisRecordMetadata", record.getKinesisRecordMetadata());
        jsonObj.put("data", Base64.getEncoder().encodeToString
                (jsonData.getBytes()));
        resultArray.add(jsonObj);
      }
      else {
        logger.log("testData not deserialized");
      }
    }
    final JSONObject jsonFinalObj = new JSONObject();
    jsonFinalObj.put("records", resultArray);
    return jsonFinalObj;
  }

Ответы [ 2 ]

0 голосов
/ 06 сентября 2018

Я получил эту работу, используя только приведенный выше код, он выглядит так, будто поток медленный, поэтому данные о новых часах еще не достигли.

0 голосов
/ 05 сентября 2018

Лямбда-функция возвращает данные не в правильном формате ,

Оформить заказ на приведенном ниже примере,

'use strict';
console.log('Loading function');

/* Stock Ticker format parser */
const parser = /^\{\"TICKER_SYMBOL\"\:\"[A-Z]+\"\,\"SECTOR\"\:"[A-Z]+\"\,\"CHANGE\"\:[-.0-9]+\,\"PRICE\"\:[-.0-9]+\}/;

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found
    let dropped = 0; // Number of dropped entries 

    /* Process the list of records and transform them */
    const output = event.records.map((record) => {

        const entry = (new Buffer(record.data, 'base64')).toString('utf8');
        let match = parser.exec(entry);
        if (match) {
            let parsed_match = JSON.parse(match); 
            var milliseconds = new Date().getTime();
            /* Add timestamp and convert to CSV */
            const result = `${milliseconds},${parsed_match.TICKER_SYMBOL},${parsed_match.SECTOR},${parsed_match.CHANGE},${parsed_match.PRICE}`+"\n";
            const payload = (new Buffer(result, 'utf8')).toString('base64');
            if (parsed_match.SECTOR != 'RETAIL') {
                /* Dropped event, notify and leave the record intact */
                dropped++;
                return {
                    recordId: record.recordId,
                    result: 'Dropped',
                    data: record.data,
                };
            }
            else {
                /* Transformed event */
                success++;  
                return {
                    recordId: record.recordId,
                    result: 'Ok',
                    data: payload,
                };
            }
        }
        else {
            /* Failed event, notify the error and leave the record intact */
            console.log("Failed event : "+ record.data);
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            };
        }
        /* This transformation is the "identity" transformation, the data is left intact 
        return {
            recordId: record.recordId,
            result: 'Ok',
            data: record.data,
        } */
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};

Ниже документация может помочь более подробную информацию о формате возврата данных,

https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/

Надеюсь, это поможет.

...