Apache Kafka (KStreams): как подписаться на несколько тем? - PullRequest
2 голосов
/ 11 июня 2019

У меня есть следующий код

//Kafka Config setup
Properties props = ...; //setup

List<String> topicList = Arrays.asList({"A", "B", "C"});

StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);

source
.map((k,v) -> { //busy code for mapping data})
.transformValues(new MyGenericTransformer());
.to((k,v,r) -> {//busy code for topic routing});

new KafkaStream(builder.build(), properties).start();

Моя проблема: Когда я добавляю более одной темы для подписки (например, A, B, C выше), код Kstream прекращает получать записи.

Ссылки: https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsBuilder.html

Соответствующая документация

public <K,V> KStream<K,V> stream(java.util.Collection<java.lang.String> topics)

"If multiple topics are specified there is no ordering guarantee for records from different topics."

Чего я пытаюсь достичь: Иметь один Kstream (т.е. «источник» сверху) потреблять / обрабатывать из нескольких тем.

1 Ответ

1 голос
/ 11 июня 2019

Разделены ли темы по одному и тому же ключу?

Обратите внимание, что указанные темы ввода должны быть разделены по ключу.Если это не так, то пользователь несет ответственность за перераспределение данных до того, как какая-либо операция на основе ключа (например, агрегация или объединение) будет применена к возвращенному KStream.

это может быть ваш блокировщик.

Другая возможная проблема, возможно, использованная группа потребителей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...