DataFlit Splittable ReadFn не использует несколько рабочих - PullRequest
0 голосов
/ 23 сентября 2018

У меня есть особенно простой конвейер Dataflow, где я хочу прочитать файл и вывести его проанализированные записи в Avro.Это работает в большинстве случаев, за исключением случаев, когда исходный файл является особенно большим (20+ ГБ), что приводит к OOM даже с особенно большими машинами памяти.Я почти уверен, что это происходит потому, что неразделимый источник полностью читается Beam, поэтому я реализовал разделяемый DoFn

Это функционально работает в том, что конвейер теперь успешен, что, кажется, подтверждает мое предположение, что причиной является один большой пакет из неразделимого файла.Однако, похоже, это не распространяет работу на нескольких работников.Я попробовал следующее:

  1. Отключил автоматическое масштабирование (autoscalingAlgorithm = NONE) и установил numWorkers на 10. Это имело ту же пропускную способность, что и numWorkers 1
  2. Левое автоматическое масштабирование при высоком maxWorkers.Это ненадолго до 2, а затем вернулся к 1
  3. Добавлен тасование (Reshuffle.viaRandomKey) после DoFn, но до того, как Avro написать

Есть идеи?Точный код трудно передать из-за политики компании, но в целом все довольно просто.Я реализовал следующее:

public class SplittableReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
// ...
@ProcessElement
public void process(final ProcessContext c, final OffsetRangeTracker tracker) {
   final FileIO.ReadableFile file = c.element();
   // Followed by something like
   ReadableByteStream in = file.open()
   in.seek(tracker.from())
   Parser parser = new Parser(in)
   while (parser.next()) {
     if (parser.getOffset() > tracker.to()) {
        break
     }
     tracker.tryClaim(parser.getOffset())
     c.output(parser.item())
   }
   tracker.markDone()
}

@GetInitialRestriction
public OffsetRange getInitialRestriction(final FileIO.ReadableFile file) {
   return new Offset(0, getSize(file) - 1);
}

@SplitRestriction
public void splitRestriction(final FileIO.ReadableFile file, final OffsetRange restriction, final DoFn.OutputReceiver<OffsetRange> receiver) {
    // chunkRange for test purposes just breaks into at most 500MB chunks
    for (final OffsetRange chunk: chunkRange(restriction)) {
       receiver.output(chunk);
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...