Kafka Streams 2.5.0 требует ввода темы - PullRequest
2 голосов
/ 21 апреля 2020

Начиная с Kafka Streams 2.5.0, похоже, что топология должна включать ввод topi c. В Kafka 2.4.1 (и ранее) это не так.

У меня есть приложение, в котором топология просто создает несколько глобальных хранилищ состояний, которые считывают данные из тем, записанных другими приложениями.

С Kafka 2.5.0 я получаю эту ошибку:

13:24:27.161 [<redacted>-7cf1b5c9-4a6e-4bf2-9f77-f7f85f2df3bb-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [<redacted>-7cf1b5c9-4a6e-4bf2-9f77-f7f85f2df3bb-StreamThread-1] Encountered the following error during processing:
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1228)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

Если я добавляю фиктивный ввод topi c (например, через streamsBuilder.stream(Pattern.compile("hack"));), приложение запускается нормально.

Можно ли ожидать такого поведения или это непреднамеренное изменение в Kafka Streams 2.5.0?

Подробнее: приведенный выше вариант использования может показаться немного странным, и я должен согласиться. Во-первых, это было связано с недостатком интерактивных запросов, когда приложение в течение определенного периода времени не могло отвечать на запросы. Я вижу, что проблема была исправлена ​​в Kafka Streans 2.5.0 через KIP-535 , и это здорово. Я надеюсь посмотреть на IQ позже.

Ответы [ 3 ]

2 голосов
/ 27 апреля 2020

Если у вас нет неглобальных частей вашей топологии, нет никаких причин вообще иметь StreamThreads. Это означает, что вы можете легко обойти это, просто установив num.threads на ноль - возможно, вы должны делать это в любом случае, чтобы избежать ненужных накладных расходов и групповой координации. Установка этого значения на ноль по умолчанию при обнаружении глобальной топологии является «исправлением» в любом случае, поэтому вам не нужно ждать этого

0 голосов
/ 22 апреля 2020

заметили то же самое. Смотрите изменения в StreamThread. java. Это было введено: https://issues.apache.org/jira/browse/KAFKA-7317

https://github.com/apache/kafka/pull/7969/

Конечно, есть настройка конфигурации для поддержки поведения до 2.5.0 (т.е. подписаться с потребителем.subscribe (builder.sourceTopicPattern (), rebalanceListener)) ??

См. фрагмент ...

0 голосов
/ 22 апреля 2020
private void subscribeConsumer() {
    if (builder.usesPatternSubscription()) {
        // this is old behaviour - is there a config that will revert to this??
        consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
    } else {
        consumer.subscribe(builder.sourceTopicCollection(), rebalanceListener);
    }
}
...