Я пытаюсь дублировать записи данных в будущих окнах. Проблема, которую это решает, заключается в том, что вычисление статистики для каждого окна будет более точным, поскольку эти данные являются непрерывными (например, температура) и требуют базового значения.
На этих диаграммах каждый блок представляет собой фиксированное окно. Числа в каждом окне представляют данные, поступающие в коллекцию PC, поступающую из источника.
Это пример ввода PCollection:
+---------+---------+---------+--------->
| 1 2 | 3 | | |
+---------+---------+---------+--------->
И полученный вывод PCollection:
+---------+---------+---------+--------->
| 1 2 | 2 3 | 3 | 3 |
+---------+---------+---------+--------->
Обратите внимание, как самая последняя точка данных (на основе метки времени события) пересылается в следующее окно. Если имеется несколько пустых окон, значение необходимо переадресовать.
Я решил проблему переадресации один раз, запустив оконную PCollection через DoFn с состоянием, который испускает дополнительный дублированный и модифицированный элемент:
public class DupeFn extends DoFn<Datum, Datum> {
@StateId("latest")
private final StateSpec<ValueState<Datum>> latestStateSpec = StateSpecs.value();
@TimerId("emit")
private final TimerSpec emitSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void processElement(
@Element Datum element,
OutputReceiver<Datum> receiver,
IntervalWindow window,
@StateId("latest") ValueState<Datum> latest,
@TimerId("emit") Timer emit
) {
emit.set(window.maxTimestamp());
Datum prev = latest.read();
if (prev == null || element.timestamp > prev.timestamp) {
latest.write(element);
}
receiver.output(element);
}
@OnTimer("emit")
public void emitLatest(
OutputReceiver<Datum> receiver,
IntervalWindow window,
@StateId("latest") ValueState<Datum> latest
) {
Datum last = latest.read();
// modify the timestamp such that it lands in the next window
last.timestamp = window.end().getMillis() + 10;
last.id += " DUPED";
receiver.outputWithTimestamp(last, new Instant(last.timestamp));
}
}
Проблема теперь в том, что ничто не будет дублироваться в исходное окно, если есть пустое окно. В идеале поведение должно быть таким, как описано на диаграмме выше.
Есть ли способ сделать это?
Редактировать
Я нашел это соответствующее неопубликованное сообщение в блоге.