Я работаю с API kafka streams и сталкиваюсь с проблемами, когда я подписываюсь на несколько тем, а потребитель просто зависает. У него есть уникальный application.id
, и я вижу в kafka, что группа потребителей была создана, но когда я опишу группу, я получу: consumer group X has no active members
. Я оставил его работать около часа.
Интересно то, что это работает, когда список topics
содержит только 1 тему. Меня не интересуют другие ответы, в которых мы создаем несколько источников, например: source1 = builder.stream("topic1")
и source2 = builder.stream("topic2")
, поскольку интерфейс для StreamsBuilder.stream
поддерживает множество тем.
Я уже мог подписаться на несколько тем раньше, я просто не могу повторить, как мы это сделали. (Этот код работает в другой среде и работает как положено, поэтому не уверен, что это проблема синхронизации или что-то еще)
List<String> topics = Arrays.asList("topic1", "topic2");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(topics);
source
.transformValues(...)
.map(key, value) -> ...)
.to((key, value, record) -> ...);
new KafkaStreams(builder.build(), props).start();
ОБНОВЛЕНИЕ 06/18/19
После включения логов стало ясно, что мы пытаемся подписаться и опубликовать темы, которых не существует. Пока у нас был auto.topic.create.enable=true
, это не имело значения. Таким образом, в конечном итоге была проведена проверка при запуске, которая привела бы к завершению работы сценария, если бы все темы не были созданы заранее, поскольку мы не создавали темы динамически.