Кафка потоков слушает несколько тем зависает - PullRequest
1 голос
/ 11 июня 2019

Я работаю с API kafka streams и сталкиваюсь с проблемами, когда я подписываюсь на несколько тем, а потребитель просто зависает. У него есть уникальный application.id, и я вижу в kafka, что группа потребителей была создана, но когда я опишу группу, я получу: consumer group X has no active members. Я оставил его работать около часа.

Интересно то, что это работает, когда список topics содержит только 1 тему. Меня не интересуют другие ответы, в которых мы создаем несколько источников, например: source1 = builder.stream("topic1") и source2 = builder.stream("topic2"), поскольку интерфейс для StreamsBuilder.stream поддерживает множество тем.

Я уже мог подписаться на несколько тем раньше, я просто не могу повторить, как мы это сделали. (Этот код работает в другой среде и работает как положено, поэтому не уверен, что это проблема синхронизации или что-то еще)

List<String> topics = Arrays.asList("topic1", "topic2");

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(topics);

source
.transformValues(...)
.map(key, value) -> ...)
.to((key, value, record) -> ...);

new KafkaStreams(builder.build(), props).start();

ОБНОВЛЕНИЕ 06/18/19 После включения логов стало ясно, что мы пытаемся подписаться и опубликовать темы, которых не существует. Пока у нас был auto.topic.create.enable=true, это не имело значения. Таким образом, в конечном итоге была проведена проверка при запуске, которая привела бы к завершению работы сценария, если бы все темы не были созданы заранее, поскольку мы не создавали темы динамически.

...