Flink streaming - изменить имена файлов деталей при использовании StreamingFileSink? - PullRequest
0 голосов
/ 20 мая 2019

Я пытаюсь использовать потоковую передачу Flink для использования тематических сообщений Kafka и создания (периодически) файлов паркета, которые будут сохраняться на s3.
Есть ли способ использования приемника потокового файла с массовым форматом для изменения имен файлов деталейсоздано (или добавлено суффикс / префикс), чтобы быть более уникальным, чем частичное 0-0 или частичное 1-3?

StreamingFileSink<> sink = StreamingFileSink.forBulkFormat(new Path("s3://test-bucket/"),               ParquetAvroFactory.getParquetWriter(schema,  CompressionCodec.UNCOMPRESSED.name()))
.withBucketAssigner(new PartitionBucketAssigner(partitionColumns))
.build();

1 Ответ

0 голосов
/ 21 мая 2019

Вы можете переопределить метод getBucketId (см. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html) в BucketAssigner, который повлияет на путь, но, очевидно, не на имена файлов деталей (см. Комментарий ниже).

Имена файлов деталей устанавливаются в этом бите кода в org.apache.flink.streaming.api.functions.sink.filesystem.Bucket:

private Path assembleNewPartPath() {
    return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
}

, который не предназначен для настройки.

...