Как упростить конвейер с агрегацией и накоплением? - PullRequest
0 голосов
/ 14 января 2019

Я пытаюсь спроектировать конвейер, который будет считывать данные из PubSubIO и объединять их в один выходной сигнал каждые 60 секунд.

Введите:

00:00:01 -> "1"
00:00:21 -> "2"
00:00:41 -> "3"
00:01:01 -> "4"
00:01:21 -> "5"
00:01:51 -> "6"

Ожидаемый результат :

00:01:00 -> "1,2,3"
00:02:00 -> "1,2,3,4,5,6"

Вот мой код:

pipeline
    .apply("Reading PubSub",
        PubsubIO
            .readMessagesWithAttributes()
            .fromSubscription("..."))
    .apply("Get message",
        ParDo.of(new DoFn<PubsubMessage, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                PubsubMessage ref = c.element();
                c.output(new String(ref.getPayload()));
            }
        }))
    .apply("Window",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterProcessingTime
                        .pastFirstElementInPane()
                        .plusDelayOf(Duration.standardSeconds(60))))
            .withAllowedLateness(Duration.ZERO)
            .accumulatingFiredPanes())
    .apply("Accumulate result to iterable",
        Combine.globally(new CombineIterableAccumulatorFn<>()))
    .apply("toString()", ToString.elements())
    .apply("Write to file",
        TextIO
            .write()
            .withWindowedWrites()
            .withNumShards(1)
            .to("result"));


Это моя CombineFn реализация для агрегирования данных в Iterable

public class CombineIterableAccumulatorFn<T> extends Combine.CombineFn<T, List<T>, Iterable<T>> {

    @Override
    public List<T> createAccumulator() {
        return new ArrayList<>();
    }

    @Override
    public List<T> addInput(List<T> accumulator, T input) {
        accumulator.add(input);
        return accumulator;
    }

    @Override
    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
        return StreamSupport.stream(accumulators.spliterator(), false)
                .flatMap(List::stream)
                .collect(Collectors.toList());
    }

    @Override
    public Iterable<T> extractOutput(List<T> accumulator) {
        return accumulator;
    }
}

С этой реализацией я получаю следующий вывод :

00:01:00 -> "1,2,3"
00:02:00 -> "1,2,3
             1,2,3,4,5,6"

Чтобы удалить дублированную "1,2,3" строку в 00:02:00, я должен добавить после строки

.apply("Accumulate result to iterable",
    Combine.globally(new CombineIterableAccumulatorFn<>()))

дополнительный оконный блок, например:

.apply("Window", 
    Window
        .<String>into(new GlobalWindows())
        .triggering(
            Repeatedly.forever(
                AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(60))))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes())

Все выглядит очень сложно. Есть ли лучшие варианты для реализации этой задачи?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...