Запись файлов паркета в S3 с использованием AWS Java Lamda - PullRequest
0 голосов
/ 13 февраля 2019

Я пишу AWS-лямбду, которая читает готовые объекты от Kinesis и хотела бы записать их в s3 в виде файла партера.

Я видел, что есть реализация ParquetWriter для protobuf, которая называется ProtoParquetWriter, и это хорошо.Моя проблема в том, что ProtoParquetWriter ожидает путь в своем конструкторе.

Какой правильный способ сделать это без сохранения содержимого в виде файла паркета, при условии, что я вообще не использую файловую систему?

Ответы [ 2 ]

0 голосов
/ 19 марта 2019

Если у вас есть список (может быть любой сложный объект), пример кода для чтения / записи protobuf S3 parquet

    public class SimpleS3ParquetUtilities implements S3Utilities {

    final Logger logger;
    String PATH_SCHEMA = "s3a";
    CompressionCodecName compressionCodecName;

    public SimpleS3ParquetUtilities(Logger logger) {
        this.logger = logger;
        this.compressionCodecName = CompressionCodecName.UNCOMPRESSED;
    }

    public SimpleS3ParquetUtilities(Logger logger, CompressionCodecName compressionCodecName) {
        this.logger = logger;
        this.compressionCodecName = compressionCodecName;
    }

    @Override
    public String writeTransactions(String bucket, String objectKey, List<Transaction> transactions)
            throws Exception {
        if (objectKey.charAt(0) != '/')
            objectKey = "/" + objectKey;
        Path file = new Path(PATH_SCHEMA, bucket, objectKey);
        Stopwatch sw = Stopwatch.createStarted();
        // convert the list into protobuf 
        List<TransactionProtos.Transaction> protoTransactions = Convertor.toProtoBuf(transactions);
        try (ProtoParquetWriter<TransactionProtos.Transaction> writer = new ProtoParquetWriter<TransactionProtos.Transaction>(
                file, TransactionProtos.Transaction.class, this.compressionCodecName,
                ProtoParquetWriter.DEFAULT_BLOCK_SIZE, ProtoParquetWriter.DEFAULT_PAGE_SIZE)) {

            for (TransactionProtos.Transaction transaction : protoTransactions) {
                writer.write(transaction);
            }
        }
        logger.info("Parquet write elapse:[{}{}] Time:{}ms items:{}", bucket, objectKey,
                sw.elapsed(TimeUnit.MILLISECONDS), transactions.size());
        return "";
    }

    @Override
    public List<Transaction> readTransactions(String bucket, String pathWithFileName)
            throws Exception {
        if (pathWithFileName.charAt(0) != '/')
            pathWithFileName = "/" + pathWithFileName;
        Path file = new Path(PATH_SCHEMA, bucket, pathWithFileName);
        Stopwatch sw = Stopwatch.createStarted();
        try (ParquetReader<TransactionProtos.Transaction.Builder> reader = ProtoParquetReader.<TransactionProtos.Transaction.Builder>builder(
                file).build()) {
            List<TransactionProtos.Transaction> transactions = new ArrayList<TransactionProtos.Transaction>();
            TransactionProtos.Transaction.Builder builder = reader.read();
            while (builder != null) {
                TransactionProtos.Transaction transaction = builder.build();
                transactions.add(transaction);
                builder = reader.read();
            }
            logger.info("Parquet read elapsed:[{}{}] Time:{}ms items:{}", bucket, pathWithFileName,
                    sw.elapsed(TimeUnit.MILLISECONDS), transactions.size());
            return Convertor.fromProtoBuf(transactions);
        }
    }
}
0 голосов
/ 09 марта 2019

Если вы хотите записать в S3, вы можете установить Путь как Path("s3a://<bucketName>/<s3Key>").И не забудьте установить учетные данные S3 в конфигурациях:

    conf.set("fs.s3a.access.key", "<s3AccessKey");
    conf.set("fs.s3a.secret.key", "<s3SecretKey");
...