У меня есть особенно простой конвейер Dataflow, где я хочу прочитать файл и вывести его проанализированные записи в Avro.Это работает в большинстве случаев, за исключением случаев, когда исходный файл является особенно большим (20+ ГБ), что приводит к OOM даже с особенно большими машинами памяти.Я почти уверен, что это происходит потому, что неразделимый источник полностью читается Beam, поэтому я реализовал разделяемый DoFn
Это функционально работает в том, что конвейер теперь успешен, что, кажется, подтверждает мое предположение, что причиной является один большой пакет из неразделимого файла.Однако, похоже, это не распространяет работу на нескольких работников.Я попробовал следующее:
- Отключил автоматическое масштабирование (autoscalingAlgorithm = NONE) и установил numWorkers на 10. Это имело ту же пропускную способность, что и numWorkers 1
- Левое автоматическое масштабирование при высоком maxWorkers.Это ненадолго до 2, а затем вернулся к 1
- Добавлен тасование (Reshuffle.viaRandomKey) после DoFn, но до того, как Avro написать
Есть идеи?Точный код трудно передать из-за политики компании, но в целом все довольно просто.Я реализовал следующее:
public class SplittableReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
// ...
@ProcessElement
public void process(final ProcessContext c, final OffsetRangeTracker tracker) {
final FileIO.ReadableFile file = c.element();
// Followed by something like
ReadableByteStream in = file.open()
in.seek(tracker.from())
Parser parser = new Parser(in)
while (parser.next()) {
if (parser.getOffset() > tracker.to()) {
break
}
tracker.tryClaim(parser.getOffset())
c.output(parser.item())
}
tracker.markDone()
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(final FileIO.ReadableFile file) {
return new Offset(0, getSize(file) - 1);
}
@SplitRestriction
public void splitRestriction(final FileIO.ReadableFile file, final OffsetRange restriction, final DoFn.OutputReceiver<OffsetRange> receiver) {
// chunkRange for test purposes just breaks into at most 500MB chunks
for (final OffsetRange chunk: chunkRange(restriction)) {
receiver.output(chunk);
}
}