У меня точно такая же проблема, даже с тем же разделением
Так что вам нужно использовать AWS lambda для достижения того, что вы хотите
- , чтобы переместить файлы, сгенерированные Firehose, в корзинуэто используется Athena.
- еще одна для запуска обновления таблицы Athena, так как она не будет видеть новые папки (я не помещаю все триггеры, но это должен быть просто вызов 'MSCK REPAIR TABLE your_table_name;')
Для первого я выбираю NodeJ, так как это действительно просто и очень быстро.~ 3 секунды, чтобы переместить файл размером 120 МБ с минимально допустимым AWS, выделенным 128 МБ памяти RAM (файлы, сгенерированные Firehose, будут приблизительно 64 МБ максимум)
Узел js структура проекта package.json
{
"name": "your.project",
"version": "1.0.0",
"description": "Copy generated partitioned files by Firehose to valid partitioned files for Athena",
"main": "index.js",
"dependencies": {
"async": "^2.6.1"
}
}
И индекс.js
const aws = require('aws-sdk');
const async = require('async');
const s3 = new aws.S3();
const dstBucket = 'PUT_YOUR_BUCKET_NAME_HERE';
var util = require('util');
exports.handler = (event, context, callback) => {
const srcBucket = event.Records[0].s3.bucket.name;
const srcKey = event.Records[0].s3.object.key;
const split = srcKey.split('/');
const dstKey = `event_year=${split[0]}/event_month=${split[1]}/event_day=${split[2]}/event_hour=${split[3]}/${split[4]}`;
console.log("Reading options from event:\n", util.inspect(event, {depth: 10}));
async.waterfall([
function copy(next) {
s3.copyObject({
Bucket: dstBucket,
CopySource: `${srcBucket}/${srcKey}`,
Key: dstKey
}, next);
},
function deleteOriginal(copyResult, next) {
s3.deleteObject({
Bucket: srcBucket,
Key: srcKey
}, next);
}
], function (err) {
if (err) {
console.error(`Failed: ${srcBucket}/${srcKey} => ${dstBucket}/${dstKey} to move FireHose partitioned object to Athena partitioned object. Error: ${err}`);
} else {
console.log(`Success: ${srcBucket}/${srcKey} => ${dstBucket}/${dstKey} moved FireHose partitioned object to Athena partitioned object`);
}
callback(null, 'move success');
}
);
};
Просто обновите некоторые данные, чтобы они действовали в вашем случае.И еще одна проблема, с которой я столкнулся, это когда вы собираете проект с
npm install
и архивируете его, это было что-то не так в AWS unzip, поэтому мне нужно обновить путь к моему index.js.
И это работает.
Вы также можете найти эту строку
console.log("Reading options from event:\n", util.inspect(event, {depth: 10}));
Она может быть удалена, но очень помогает понять детали обработки объекта