Я пытаюсь выполнить простой подсчет количества прогонов в Apache Beam (Dataflowrunner), но мне трудно найти подходящее решение.
Цель конвейера:
- Вход Pubsub: события
- Выход Pubsub: каждые 30 секунд, число событий, замеченных за последние 5 минут
Проблема:
, когда естьпустое окно, я хочу видеть 0 в качестве счета.Этого не происходит.
Попытка 1: Подсчет глобально
Выдает ошибку, которую мне нужно использовать:
Combine.globally(Count.<T>combineFn()).withoutDefaults()
PCollection slidingCount =
input.apply("Create Windows",
Window.into(
SlidingWindows.of(Duration.standardSeconds(300))
.every(Duration.standardSeconds(30)))
)
.apply("Count elements in window",
Count.globally()
);
Попытка 2: Combiner
Выдает ошибку, которую мне нужно указать, withoutdefaults()
.
PCollection slidingCount =
input.apply("Create Windows",
Window.into(
SlidingWindows.of(Duration.standardSeconds(300))
.every(Duration.standardSeconds(30)))
)
.apply("Count elements in window",
Combine.globally(Count.<PubsubMessage>combineFn())
);
Попытка 3: без ошибок
Это не генерирует пустые окна.По определению конечно.
PCollection slidingCount =
input.apply("Create Windows",
Window.into(
SlidingWindows.of(Duration.standardSeconds(300))
.every(Duration.standardSeconds(30)))
)
.apply("Count elements in window",
Combine.globally(Count.<PubsubMessage>combineFn())
.withoutDefaults()
);
Попытка 4: asSingletonView
Добавление этой опции превращает мою коллекцию в представление, которое я не могу повторно использовать для продолжения моего конвейера.
PCollectionView slidingCount =
input.apply("Create Windows",
Window.into(
SlidingWindows.of(Duration.standardSeconds(300))
.every(Duration.standardSeconds(30)))
)
.apply("Count elements in window",
Combine.globally(Count.<PubsubMessage>combineFn())
.asSingletonView()
);
Любые предложения о том, как я могу подойти к этому.