У меня есть приложение Spring Boot, использующее GlobalKTable. Он работал нормально до обновления до kafka-streams-5.5.0- css (версия Confluent Platform, совместимая с Apache Kafka 2.5.0) с 5.3.2- css (Apache Kafka 2.3.1).
Итак, это моя конфигурация:
@Configuration
@EnableKafkaStreams
public class GlobalTableConfiguration {
public GlobalTableConfiguration() {
}
@Bean
public GlobalKTable<String, String> table(StreamsBuilder kStreamsBuilder) {
return kStreamsBuilder.globalTable("topic1", Consumed.with(null, null),
Materialized.as("topic1-store"));
}
}
Я получаю магазин следующим образом:
streamsBuilderFactoryBean.getKafkaStreams().
store("topic1-store", QueryableStoreTypes.keyValueStore());
это не работает с:
Request processing failed; nested exception is java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
Caused by: java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:316)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1182)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1169)
Я вижу, что поток потока завершается до этого:
2020-06-16 13:22:46.943 INFO 72423 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-06-16 13:22:46.944 INFO 72423 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-06-16 13:22:46.944 INFO 72423 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1592299366943
2020-06-16 13:22:46.946 INFO 72423 --- [ad | producer-2] org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Cluster ID: aKrIp_7wQcqF9OlSUoBgSQ
2020-06-16 13:22:47.496 INFO 72423 --- [ Test worker] org.apache.kafka.streams.KafkaStreams : stream-client [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4] State transition from ERROR to PENDING_SHUTDOWN
2020-06-16 13:22:47.497 INFO 72423 --- [ms-close-thread] o.a.k.s.p.internals.StreamThread : stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-StreamThread-1] Informed to shut down
2020-06-16 13:22:47.497 INFO 72423 --- [ms-close-thread] o.a.k.s.p.internals.GlobalStreamThread : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] State transition from RUNNING to PENDING_SHUTDOWN
2020-06-16 13:22:47.557 INFO 72423 --- [balStreamThread] o.a.k.s.p.internals.GlobalStreamThread : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] Shutting down
2020-06-16 13:22:47.571 INFO 72423 --- [balStreamThread] o.a.k.s.p.internals.GlobalStreamThread : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] State transition from PENDING_SHUTDOWN to DEAD
2020-06-16 13:22:47.571 INFO 72423 --- [balStreamThread] o.a.k.s.p.internals.GlobalStreamThread : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] Shutdown complete
После некоторых экспериментов я заставил его работать, добавив в свою конфигурацию:
@Bean
public KStream kStream(StreamsBuilder kStreamsBuilder) {
return kStreamsBuilder.stream("some-topic", Consumed.with(null, null));
}
Итак, в основном, когда У меня есть определенный KStream (потребляющий от любого topi c), поток потока остается живым, и все работает, как до обновления. Мой вопрос: как правильно это сделать без этого бесполезного bean-компонента (и topi c).
EDIT
Здесь обсуждалась похожая проблема: Kafka Streams 2.5.0 требует ввода topi c Похоже, это будет исправлено в kafka-streams 2.5.1 и util, а затем установка num.stream.threads: 0
более удобное решение, чем объявление фиктивного потока.