Dataflow GCP (Apache Beam) - постоянное чтение большого количества файлов (OutOfMemory) - PullRequest
0 голосов
/ 20 апреля 2020

Что я хочу сделать:

  1. Чтение и распаковка файлов GZ непрерывно по шаблону (~ 3000 файлов), каждый файл имеет 1,2 МБ и 9 МБ после распаковки
  2. Заменить некоторые последовательность символов из каждого из файлов CSV
  3. Сжатие файла CSV в GZ и сохранение измененных файлов по собственному пути.

Фактический код:

static void run(final BeeswaxDataflowOptions options) {
final Pipeline pipeline = Pipeline.create(options);
final PCollection<MatchResult.Metadata> matches =
    pipeline.apply(
        "Read",
        FileIO.match()
            .filepattern(options.getSourcePath() + options.getSourceFilesPattern())
            .continuously(
                Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));

matches
    .apply(FileIO.readMatches().withCompression(GZIP))
    .apply(
        Window.<FileIO.ReadableFile>into(
                FixedWindows.of(Duration.standardSeconds(options.getWindowInterval())))
            .accumulatingFiredPanes()
            .withAllowedLateness(Duration.ZERO)
            .triggering(
                Repeatedly.forever(AfterPane.elementCountAtLeast(1).getContinuationTrigger())))
    .apply(
        "Uncompress",
        MapElements.into(
                TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
            .via(
                file -> {
                  final String filePath = file.getMetadata().resourceId().toString();
                  try {
                    return KV.of(filePath, file.readFullyAsUTF8String());
                  } catch (final IOException e) {
                    return KV.of(filePath, "");
                  }
                }))
    .apply("Prepare for BigQuery import", ParDo.of(new BigQueryDataPreparatorFn()))
    .apply(
        "Save results",
        FileIO.<String, KV<String, String>>writeDynamic()
            .withCompression(GZIP)
            .by(KV::getKey)
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(KV::getValue), TextIO.sink())
            .withNumShards(options.getShards())
            .to(options.getOutputPath())
            .withTempDirectory(options.getTempLocation())
            .withNaming(AbsoluteNaming::new));

pipeline.run().waitUntilFinish();

проблема в исключении OutOfMemory (да, я знаю, что readFullyAsUTF8String может быть подозрительным для этого). Как справиться с такой ситуацией?

Мое наблюдение состоит в том, что все ~ 3000 файлов читаются и собираются на этапе "Распаковка". Поэтому, прежде чем перейти к разделу «Подготовка к импорту BigQuery» и «Сохранить результаты», его каким-то образом накапливают и читают в ОЗУ.

Было бы неплохо как-то поставить этот конвейер в очередь - как максимум 50 элементов go через шаги и ждать результатов, а затем начать дальше. Это возможно? Как бороться с этим иначе, если не

1 Ответ

1 голос
/ 21 апреля 2020

Здесь вы можете сделать несколько вещей.

1: использовать shuffle для более равномерного распределения файлов.

final PCollection<MatchResult.Metadata> matches =
pipeline.apply(
    "Read",
    FileIO.match()
        .filepattern(options.getSourcePath() + options.getSourceFilesPattern())
        .continuously(
            Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));
matches
.apply(Reshuffle())
.apply(FileIO.readMatches().withCompression(GZIP))

Далее вы можете ограничить число одновременных элементов, обрабатываемых на виртуальную машину, установив --numberOfWorkerHarnessThreads но я думаю, что проблема должна быть решена путем перетасовки.

...