Когда я запускаю свой конвейер на своем кластере kafka с функцией окна ниже, он не работает должным образом. Первый .apply «BuildEventsHa sh» выполняет ParsePlaybackEvent ParDo для каждого события Kafka topi c, как и ожидалось. Однако последующее .apply PTransform "CollateSessions" выполняется до BuildEventsHa sh, а затем никогда больше.
Я установил длительность окна всего на 3 секунды и поэтому ожидал, что CollateSessions сработает через 3 секунды, даже если по умолчанию срабатывает 0 секунд, но это не так. Этот код работает так, как и ожидалось в моих ограниченных модульных тестах. Таким образом, похоже, что я могу изолировать проблему от способа, которым я настраивал свой конвейер для неограниченной стратегии управления окнами.
Согласно документации, конвейер должен быть автоматически установлен неограниченным, когда я использую разъем KafkaIO, но, возможно, это не так? Кроме того, в нем говорится, что мне нужно установить не глобальную стратегию управления окнами, но я не уверен, как это сделать. Является ли установка FixedWindow указанной продолжительности c ниже не выполнением этого? Кроме того, данные, поступающие с Kafka, уже являются отметкой времени с BLOB-объектом json. Так мне все еще нужно установить метку времени?
Может кто-нибудь помочь мне исправить код ниже? Спасибо
.apply(
"Read From Kafka",
KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrapServers())
.withTopics(topicsList)
.withKeyDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withValueDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withoutMetadata())
.apply(
Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(3))))
.apply("BuildEventsHash", new BuildEventsHash())
.apply("CollateSessions", new CollateSessions())
...other transforms
//BuildEventsHash ParDo break point is executed for each kafka topic event
static class BuildEventsHash extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, PlaybackEventInfo>>> {
@Override
public PCollection<KV<String, PlaybackEventInfo>> expand(PCollection<KV<String, String>> events) {
return events.apply("ParsePlaybackEvent", ParDo.of(new ParsePlaybackEvent()));
}
}