Что я хочу сделать:
- Чтение и распаковка файлов GZ непрерывно по шаблону (~ 3000 файлов), каждый файл имеет 1,2 МБ и 9 МБ после распаковки
- Заменить некоторые последовательность символов из каждого из файлов CSV
- Сжатие файла CSV в GZ и сохранение измененных файлов по собственному пути.
Фактический код:
static void run(final BeeswaxDataflowOptions options) {
final Pipeline pipeline = Pipeline.create(options);
final PCollection<MatchResult.Metadata> matches =
pipeline.apply(
"Read",
FileIO.match()
.filepattern(options.getSourcePath() + options.getSourceFilesPattern())
.continuously(
Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));
matches
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(
Window.<FileIO.ReadableFile>into(
FixedWindows.of(Duration.standardSeconds(options.getWindowInterval())))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
.triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(1).getContinuationTrigger())))
.apply(
"Uncompress",
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(
file -> {
final String filePath = file.getMetadata().resourceId().toString();
try {
return KV.of(filePath, file.readFullyAsUTF8String());
} catch (final IOException e) {
return KV.of(filePath, "");
}
}))
.apply("Prepare for BigQuery import", ParDo.of(new BigQueryDataPreparatorFn()))
.apply(
"Save results",
FileIO.<String, KV<String, String>>writeDynamic()
.withCompression(GZIP)
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.withNumShards(options.getShards())
.to(options.getOutputPath())
.withTempDirectory(options.getTempLocation())
.withNaming(AbsoluteNaming::new));
pipeline.run().waitUntilFinish();
проблема в исключении OutOfMemory (да, я знаю, что readFullyAsUTF8String может быть подозрительным для этого). Как справиться с такой ситуацией?
Мое наблюдение состоит в том, что все ~ 3000 файлов читаются и собираются на этапе "Распаковка". Поэтому, прежде чем перейти к разделу «Подготовка к импорту BigQuery» и «Сохранить результаты», его каким-то образом накапливают и читают в ОЗУ.
Было бы неплохо как-то поставить этот конвейер в очередь - как максимум 50 элементов go через шаги и ждать результатов, а затем начать дальше. Это возможно? Как бороться с этим иначе, если не