Разделы преобразования формата записи Firehose - PullRequest
0 голосов
/ 15 мая 2018

Я пытался использовать новую функцию пожарного шланга «преобразование формата записи», чтобы сохранить мои события в виде файлов паркета для агенств athena или hive. Вы должны выбрать таблицу из своего каталога клея, но firehose игнорирует определенные разделы и вместо этого сохраняет файлы в структуре YYYY / MM / DD / HH /. В данных также отсутствуют определенные столбцы разделов. Это было бы хорошо, если бы он использовал его для разделения.

Существует ли конфигурация API или что-то еще, чтобы принудительно использовать разбиение таблицы?

1 Ответ

0 голосов
/ 02 ноября 2018

У меня точно такая же проблема, даже с тем же разделением

Так что вам нужно использовать AWS lambda для достижения того, что вы хотите

  1. , чтобы переместить файлы, сгенерированные Firehose, в корзинуэто используется Athena.
  2. еще одна для запуска обновления таблицы Athena, так как она не будет видеть новые папки (я не помещаю все триггеры, но это должен быть просто вызов 'MSCK REPAIR TABLE your_table_name;')

Для первого я выбираю NodeJ, так как это действительно просто и очень быстро.~ 3 секунды, чтобы переместить файл размером 120 МБ с минимально допустимым AWS, выделенным 128 МБ памяти RAM (файлы, сгенерированные Firehose, будут приблизительно 64 МБ максимум)

Узел js структура проекта package.json

{ 
  "name": "your.project", 
  "version": "1.0.0", 
  "description": "Copy generated partitioned files by Firehose to valid partitioned files for Athena", 
  "main": "index.js", 
  "dependencies": { 
    "async": "^2.6.1" 
   } 
}

И индекс.js

const aws = require('aws-sdk');
const async = require('async');
const s3 = new aws.S3();
const dstBucket = 'PUT_YOUR_BUCKET_NAME_HERE';
var util = require('util');

exports.handler = (event, context, callback) => {
    const srcBucket = event.Records[0].s3.bucket.name;
    const srcKey = event.Records[0].s3.object.key;
    const split = srcKey.split('/');
    const dstKey = `event_year=${split[0]}/event_month=${split[1]}/event_day=${split[2]}/event_hour=${split[3]}/${split[4]}`;
    console.log("Reading options from event:\n", util.inspect(event, {depth: 10}));
    async.waterfall([
            function copy(next) {
                s3.copyObject({
                    Bucket: dstBucket,
                    CopySource: `${srcBucket}/${srcKey}`,
                    Key: dstKey
                }, next);
            },
            function deleteOriginal(copyResult, next) {
                s3.deleteObject({
                    Bucket: srcBucket,
                    Key: srcKey
                }, next);
            }
        ], function (err) {
            if (err) {
                console.error(`Failed: ${srcBucket}/${srcKey} => ${dstBucket}/${dstKey} to move FireHose partitioned object to Athena partitioned object. Error: ${err}`);
            } else {
                console.log(`Success: ${srcBucket}/${srcKey} => ${dstBucket}/${dstKey} moved FireHose partitioned object to Athena partitioned object`);
            }
            callback(null, 'move success');
        }
    );
};

Просто обновите некоторые данные, чтобы они действовали в вашем случае.И еще одна проблема, с которой я столкнулся, это когда вы собираете проект с

npm install

и архивируете его, это было что-то не так в AWS unzip, поэтому мне нужно обновить путь к моему index.js.

И это работает.

Вы также можете найти эту строку

console.log("Reading options from event:\n", util.inspect(event, {depth: 10}));

Она может быть удалена, но очень помогает понять детали обработки объекта

...