Разбиваемый DoFn, вызывающий слишком большую проблему в Shuffle key - PullRequest
0 голосов
/ 21 октября 2019

Я пытаюсь реализовать функцию ListFlatten, я реализовал ее, используя SimpleDoFn, которая работает нормально, но для распараллеливания. Я конвертирую функцию в функцию Splittable Do. Мне удалось получить модульный тест, работающий локально с 5000 элементами, используя DirectRunner, при запуске того же самого в DataFlow, он завершается с ошибкой ниже.

Error Details: 
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output (GroupAlsoByWindowsParDoFn.java:184)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue (GroupAlsoByWindowFnRunner.java:102)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:126)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:54)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement (GroupAlsoByWindowFnRunner.java:115)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement (GroupAlsoByWindowFnRunner.java:73)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement (GroupAlsoByWindowsParDoFn.java:114)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process (ParDoOperation.java:44)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process (OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop (ReadOperation.java:201)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at com.abc.common.batch.functions.AbcListFlattenFn.splitRestriction (AbcListFlattenFn.java:68)

Различия в данных между локальным DirectRunner и облачным бегунком DataFlow приведены ниже.

DirectRunner в локальном:

  1. В выборке 5000 abcsвходной элемент PCollection

DataflowRunner в облаке:

  1. В 600 входных элементах PCollection имеются различные размеры abcs
  2. У немногих входных элементов есть 50000 abcs для выравнивания
   public class AbcList implements Serializable {
        private List<Abc> abcs;
        private List<Xyz> xyzs;
   }

        public class AbcListFlattenFn extends DoFn<AbcList, KV<Abc, List<Xyz>> {

            @ProcessElement
            public void process(@Element AbcList input,
                ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) {

                try {
            /* Below commented lines are without the Splittable DoFn
                       input.getAbcs().stream().forEach(abc -> {
                                context.output(KV.of(abc, input.getXyzs()));
                         }); */

                    for (long index = tracker.currentRestriction().getFrom(); tracker.tryClaim(index);
                        ++index) {
                        context.output(KV.of(input.getAbcs().get(Math.toIntExact(index),input.getXyzs())));
                    }
                } catch (Exception e) {
                    log.error("Flattening AbcList has failed ", e);
                }

            }

            @GetInitialRestriction
            public OffsetRange getInitialRestriction(AbcList input) {
                return new OffsetRange(0, input.getAbcs().size());
            }

            @SplitRestriction
            public void splitRestriction(final AbcList input,
                final OffsetRange range, final OutputReceiver<OffsetRange> receiver) {
              List<OffsetRange> ranges =
                  range.split(input.getAbcs().size() > 5000 ? 5000
                        : input.getAbcs().size(), 2000);
                for (final OffsetRange p : ranges) {
                    receiver.output(p);
                }
            }

            @NewTracker
            public OffsetRangeTracker newTracker(OffsetRange range) {
                return new OffsetRangeTracker(range);
            }
        }

Может кто-нибудь подсказать, что не так с функцией ListFlatten? является ли splitRestriction причиной следующей проблемы? Как я могу исправить эту проблему с размером ключа Shuffle?

1 Ответ

1 голос
/ 23 октября 2019

Предел размера ключа в случайном порядке зависит от размера прототипа. Чтобы избавиться от этой проблемы, вы, вероятно, захотите добавить Reshuffle перед вашим SDF. Перестановка поможет вам сделать первый раунд распространения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...