Сортировка элементов в фиксированном окне - Облачный поток данных - PullRequest
1 голос
/ 03 мая 2019

У меня есть конвейер потока данных, который читает из раздела pubsub, выполняет преобразования и пишет в BigTable. Я хочу, чтобы элементы, считанные из pubsub, обрабатывались в порядке их порядкового номера.

Я использую фиксированное окно продолжительностью 2 минуты, а затем применяю к нему GroupByKey. После GBK я использую преобразование SortValues, которое сортирует Iterable по SequenceNumber. Я наблюдаю, что время стены шага GroupByKey будет высоким, так как все элементы в окне обрабатываются одним и тем же рабочим. Есть ли эффективный способ сортировки элементов в фиксированном окне?

Ниже приведен код моего конвейера:

PCollection<PubsubMessage> pubsubRecords = p.apply(PubsubIO.readMessagesWithAttributes()
                    .fromTopic(StaticValueProvider.of(topic)));
            PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
                    .apply("Window", Window
                            .<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardMinutes(2)))
                            .triggering(Repeatedly
                                .forever(AfterProcessingTime
                                    .pastFirstElementInPane()
                                    .plusDelayOf(Duration.StandardMinutes(2))
                                )
                            )
                            .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
                        );
            PCollection<KV<String, KV<BigInteger, JSONObject>>> keyedWindow = window
                    .apply(WithKeys.of(new SerializableFunction<KV<BigInteger, JSONObject>,String>() {
                          @Override
                          public String apply(KV<BigInteger, JSONObject> row) {
                            return "key";
                          }
                    }));

            PCollection<KV<String, Iterable<KV<BigInteger, JSONObject>>>> groupedWindow = keyedWindow
                    .apply(GroupByKey.<String, KV<BigInteger, JSONObject>>create()).apply(
                            SortValues.<String, BigInteger, JSONObject>create(BufferedExternalSorter.options()));

1 Ответ

2 голосов
/ 03 мая 2019

Я думаю, что ваш подход правильный. Это неизбежно, что вы должны иметь все элементы для сортировки в одном и том же работнике. Упорядоченная обработка создает зависимости между данными и часто плохо работает с распределенными вычислениями.

...