Как уже обсуждалось, Beam не генерирует данные для пустых окон.В дополнение к причинам, приведенным Руи Вангом, мы можем добавить проблему того, как последние этапы будут обрабатывать эти пустые панели.
В любом случае, описанный вами конкретный вариант использования - мониторинг скользящего числа сообщений - должен быть возможен при некоторой работе, даже если показатель в конечном итоге упадет до нуля.Одной из возможностей было бы опубликовать устойчивое количество фиктивных сообщений, которые бы продвигали водяной знак и запускали панели, но затем отфильтровывались внутри конвейера.Проблема этого подхода заключается в том, что источник публикации необходимо адаптировать, и это не всегда может быть удобным / возможным.Другой будет включать в себя создание этих поддельных данных в качестве другого ввода и совместную их группировку с основным потоком.Преимущество состоит в том, что все может быть сделано в потоке данных без необходимости настройки источника или приемника.Для иллюстрации приведу пример.
Входы разделены на два потока.Для фиктивного я использовал GenerateSequence
для создания нового элемента каждые 5 секунд.Затем я создаю окно PCollection (стратегия управления окнами должна быть совместима с таковой для основного потока, поэтому я буду использовать ее).Затем я сопоставляю элемент с парой ключ-значение, где значение равно 0 (мы могли бы использовать другие значения, поскольку мы знаем, из какого потока поступает элемент, но я хочу доказать, что фиктивные записи не учитываются).
PCollection<KV<String,Integer>> dummyStream = p
.apply("Generate Sequence", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
.apply("Window Messages - Dummy", Window.<Long>into(
...
.apply("Count Messages - Dummy", ParDo.of(new DoFn<Long, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of("num_messages", 0));
}
}));
Для основного потока, который читает из Pub / Sub, я сопоставляю каждую запись со значением 1. Позже я добавлю все те же, что и в типичных примерах подсчета слов, используя этапы сокращения карты.
PCollection<KV<String,Integer>> mainStream = p
.apply("Get Messages - Data", PubsubIO.readStrings().fromTopic(topic))
.apply("Window Messages - Data", Window.<String>into(
...
.apply("Count Messages - Data", ParDo.of(new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of("num_messages", 1));
}
}));
Затем нам нужно присоединиться к ним, используя CoGroupByKey
(я использовал тот же ключ num_messages
для группового подсчета).Этот этап будет выводить результаты, когда один из двух входов имеет элементы, поэтому здесь разблокируется основная проблема (пустые окна без сообщений Pub / Sub).
final TupleTag<Integer> dummyTag = new TupleTag<>();
final TupleTag<Integer> dataTag = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(dummyTag, dummyStream)
.and(dataTag, mainStream).apply(CoGroupByKey.<String>create());
Наконец, мы добавляем все, чтобы получитьобщее количество сообщений для окна.Если из dataTag
нет элементов, то по умолчанию сумма будет равна 0.
public void processElement(ProcessContext c, BoundedWindow window) {
Integer total_sum = new Integer(0);
Iterable<Integer> dataTagVal = c.element().getValue().getAll(dataTag);
for (Integer val : dataTagVal) {
total_sum += val;
}
LOG.info("Window: " + window.toString() + ", Number of messages: " + total_sum.toString());
}
Это должно привести к чему-то вроде:
Обратите внимание, что результаты из разных окон могут быть неупорядоченными (это может произойти в любом случае при записи в BigQuery), и я не играл с настройками окна для оптимизации примера.
Полный код:
public class EmptyWindows {
private static final Logger LOG = LoggerFactory.getLogger(EmptyWindows.class);
public static interface MyOptions extends PipelineOptions {
@Description("Input topic")
String getInput();
void setInput(String s);
}
@SuppressWarnings("serial")
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
String topic = options.getInput();
PCollection<KV<String,Integer>> mainStream = p
.apply("Get Messages - Data", PubsubIO.readStrings().fromTopic(topic))
.apply("Window Messages - Data", Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(1))
.every(Duration.standardSeconds(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply("Count Messages - Data", ParDo.of(new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
//LOG.info("New data element in main output");
c.output(KV.of("num_messages", 1));
}
}));
PCollection<KV<String,Integer>> dummyStream = p
.apply("Generate Sequence", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
.apply("Window Messages - Dummy", Window.<Long>into(
SlidingWindows.of(Duration.standardMinutes(1))
.every(Duration.standardSeconds(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply("Count Messages - Dummy", ParDo.of(new DoFn<Long, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
//LOG.info("New dummy element in main output");
c.output(KV.of("num_messages", 0));
}
}));
final TupleTag<Integer> dummyTag = new TupleTag<>();
final TupleTag<Integer> dataTag = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(dummyTag, dummyStream)
.and(dataTag, mainStream).apply(CoGroupByKey.<String>create());
coGbkResultCollection
.apply("Log results", ParDo.of(new DoFn<KV<String, CoGbkResult>, Void>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
Integer total_sum = new Integer(0);
Iterable<Integer> dataTagVal = c.element().getValue().getAll(dataTag);
for (Integer val : dataTagVal) {
total_sum += val;
}
LOG.info("Window: " + window.toString() + ", Number of messages: " + total_sum.toString());
}
}));
p.run();
}
}