Итак, @Pablo (из моего понимания) дал правильный ответ.Но у меня были некоторые предложения, которые не помещались бы в комментарии.
Я хотел спросить, нужны ли вам раздвижные окна?Из того, что я могу сказать, фиксированные окна сделают эту работу за вас, а также будут проще в вычислительном отношении.Так как вы используете накапливающиеся панели, вам не нужно использовать скользящее окно, так как ваша следующая функция DoFn уже будет получать среднее значение из накопленных панелей.
Что касается кода, я внес изменения в логику раннего и позднего запуска.Я также предлагаю увеличить размер окна.Поскольку вы знаете, что данные поступают каждые 15 минут, вы должны закрывать окно через 15 минут, а не через 15 минут.Но вы также не хотите выбирать окно, которое в конечном итоге столкнется с кратными 15 (например, 20), потому что через 60 минут у вас будет та же проблема.Поэтому выберите число, равное 15-ти, например 19. Также допускайте поздние записи.
PCollection<TrafficData> trafficData = input
.apply("MapIntoFixedWindows", Window.<TrafficData>into(
FixedWindows.of(Duration.standardMinutes(19))
.triggering(AfterWatermark.pastEndOfWindow()
// fire the moment you see an element
.withEarlyFirings(AfterPane.elementCountAtLeast(1))
//this line is optional since you already have a past end of window and a early firing. But just in case
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.standardMinutes(60))
.accumulatingFiredPanes());
Дайте мне знать, если это решит вашу проблему!
РЕДАКТИРОВАТЬ
Итак, я не мог понять, как вы вычислили приведенный выше пример,поэтому я использую общий пример.Ниже приведена общая функция усреднения:
public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
public static class Accum {
int sum = 0;
int count = 0;
}
@Override
public Accum createAccumulator() { return new Accum(); }
@Override
public Accum addInput(Accum accum, Integer input) {
accum.sum += input;
accum.count++;
return accum;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
merged.sum += accum.sum;
merged.count += accum.count;
}
return merged;
}
@Override
public Double extractOutput(Accum accum) {
return ((double) accum.sum) / accum.count;
}
}
Чтобы запустить ее, вы должны добавить строку:
PCollection<Double> average = trafficData.apply(Combine.globally(new AverageFn()));
Поскольку вы в настоящее время с использованием накапливающих триггеров запуска, это будет самый простой способ кодирования решения проблемы.
ОДНАКО, если вы хотите использовать окно сбрасываемой панели огня, вам нужно будет использовать PCollectionView
для храненияпредыдущее среднее и передать его в качестве бокового ввода для следующего, чтобы отслеживать значения.Это немного сложнее в кодировании, но определенно улучшит производительность, так как постоянная работа выполняется в каждом окне, в отличие от накопительного запуска.
Достаточно ли этого для вас, чтобы сгенерировать свою собственную функцию для отбрасывания окна пожарной панели?