Я пытаюсь реализовать функцию ListFlatten
, я реализовал ее, используя SimpleDoFn
, которая работает нормально, но для распараллеливания. Я конвертирую функцию в функцию Splittable Do. Мне удалось получить модульный тест, работающий локально с 5000 элементами, используя DirectRunner
, при запуске того же самого в DataFlow, он завершается с ошибкой ниже.
Error Details:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output (GroupAlsoByWindowsParDoFn.java:184)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue (GroupAlsoByWindowFnRunner.java:102)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:126)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:54)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement (GroupAlsoByWindowFnRunner.java:115)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement (GroupAlsoByWindowFnRunner.java:73)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement (GroupAlsoByWindowsParDoFn.java:114)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process (ParDoOperation.java:44)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process (OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop (ReadOperation.java:201)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at com.abc.common.batch.functions.AbcListFlattenFn.splitRestriction (AbcListFlattenFn.java:68)
Различия в данных между локальным DirectRunner и облачным бегунком DataFlow приведены ниже.
DirectRunner в локальном:
- В выборке 5000 abcsвходной элемент PCollection
DataflowRunner в облаке:
- В 600 входных элементах PCollection имеются различные размеры abcs
- У немногих входных элементов есть 50000 abcs для выравнивания
public class AbcList implements Serializable {
private List<Abc> abcs;
private List<Xyz> xyzs;
}
public class AbcListFlattenFn extends DoFn<AbcList, KV<Abc, List<Xyz>> {
@ProcessElement
public void process(@Element AbcList input,
ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) {
try {
/* Below commented lines are without the Splittable DoFn
input.getAbcs().stream().forEach(abc -> {
context.output(KV.of(abc, input.getXyzs()));
}); */
for (long index = tracker.currentRestriction().getFrom(); tracker.tryClaim(index);
++index) {
context.output(KV.of(input.getAbcs().get(Math.toIntExact(index),input.getXyzs())));
}
} catch (Exception e) {
log.error("Flattening AbcList has failed ", e);
}
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(AbcList input) {
return new OffsetRange(0, input.getAbcs().size());
}
@SplitRestriction
public void splitRestriction(final AbcList input,
final OffsetRange range, final OutputReceiver<OffsetRange> receiver) {
List<OffsetRange> ranges =
range.split(input.getAbcs().size() > 5000 ? 5000
: input.getAbcs().size(), 2000);
for (final OffsetRange p : ranges) {
receiver.output(p);
}
}
@NewTracker
public OffsetRangeTracker newTracker(OffsetRange range) {
return new OffsetRangeTracker(range);
}
}
Может кто-нибудь подсказать, что не так с функцией ListFlatten? является ли splitRestriction причиной следующей проблемы? Как я могу исправить эту проблему с размером ключа Shuffle?