Apache Функция Beam Window не работает с неограниченным потоком Кафки - PullRequest
0 голосов
/ 06 марта 2020

Когда я запускаю свой конвейер на своем кластере kafka с функцией окна ниже, он не работает должным образом. Первый .apply «BuildEventsHa sh» выполняет ParsePlaybackEvent ParDo для каждого события Kafka topi c, как и ожидалось. Однако последующее .apply PTransform "CollateSessions" выполняется до BuildEventsHa sh, а затем никогда больше.

Я установил длительность окна всего на 3 секунды и поэтому ожидал, что CollateSessions сработает через 3 секунды, даже если по умолчанию срабатывает 0 секунд, но это не так. Этот код работает так, как и ожидалось в моих ограниченных модульных тестах. Таким образом, похоже, что я могу изолировать проблему от способа, которым я настраивал свой конвейер для неограниченной стратегии управления окнами.

Согласно документации, конвейер должен быть автоматически установлен неограниченным, когда я использую разъем KafkaIO, но, возможно, это не так? Кроме того, в нем говорится, что мне нужно установить не глобальную стратегию управления окнами, но я не уверен, как это сделать. Является ли установка FixedWindow указанной продолжительности c ниже не выполнением этого? Кроме того, данные, поступающие с Kafka, уже являются отметкой времени с BLOB-объектом json. Так мне все еще нужно установить метку времени?

Может кто-нибудь помочь мне исправить код ниже? Спасибо

.apply(
	                "Read From Kafka",
	                KafkaIO.<String, String>read()
	                    .withBootstrapServers(options.getBootstrapServers())
	                    .withTopics(topicsList)
	                    .withKeyDeserializerAndCoder(
	                        StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
	                    .withValueDeserializerAndCoder(
	                        StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
	                    .withoutMetadata()) 
	            .apply(
	                Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(3))))
.apply("BuildEventsHash", new BuildEventsHash())
.apply("CollateSessions", new CollateSessions())
...other transforms

//BuildEventsHash ParDo break point is executed for each kafka topic event
static class BuildEventsHash extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, PlaybackEventInfo>>> {

	@Override
	public PCollection<KV<String, PlaybackEventInfo>> expand(PCollection<KV<String, String>> events) {
		return events.apply("ParsePlaybackEvent", ParDo.of(new ParsePlaybackEvent()));
	}
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...