Я пытаюсь спроектировать конвейер, который будет считывать данные из PubSubIO
и объединять их в один выходной сигнал каждые 60 секунд.
Введите:
00:00:01 -> "1"
00:00:21 -> "2"
00:00:41 -> "3"
00:01:01 -> "4"
00:01:21 -> "5"
00:01:51 -> "6"
Ожидаемый результат :
00:01:00 -> "1,2,3"
00:02:00 -> "1,2,3,4,5,6"
Вот мой код:
pipeline
.apply("Reading PubSub",
PubsubIO
.readMessagesWithAttributes()
.fromSubscription("..."))
.apply("Get message",
ParDo.of(new DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
PubsubMessage ref = c.element();
c.output(new String(ref.getPayload()));
}
}))
.apply("Window",
Window.<String>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(60))))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply("Accumulate result to iterable",
Combine.globally(new CombineIterableAccumulatorFn<>()))
.apply("toString()", ToString.elements())
.apply("Write to file",
TextIO
.write()
.withWindowedWrites()
.withNumShards(1)
.to("result"));
Это моя CombineFn
реализация для агрегирования данных в Iterable
public class CombineIterableAccumulatorFn<T> extends Combine.CombineFn<T, List<T>, Iterable<T>> {
@Override
public List<T> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<T> addInput(List<T> accumulator, T input) {
accumulator.add(input);
return accumulator;
}
@Override
public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
return StreamSupport.stream(accumulators.spliterator(), false)
.flatMap(List::stream)
.collect(Collectors.toList());
}
@Override
public Iterable<T> extractOutput(List<T> accumulator) {
return accumulator;
}
}
С этой реализацией я получаю следующий вывод :
00:01:00 -> "1,2,3"
00:02:00 -> "1,2,3
1,2,3,4,5,6"
Чтобы удалить дублированную "1,2,3"
строку в 00:02:00
, я должен добавить после строки
.apply("Accumulate result to iterable",
Combine.globally(new CombineIterableAccumulatorFn<>()))
дополнительный оконный блок, например:
.apply("Window",
Window
.<String>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(60))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
Все выглядит очень сложно. Есть ли лучшие варианты для реализации этой задачи?