Получение хранилища для GlobalKTable вылетает после обновления до kafka-streams: 5.5.0- css (Apache Kafka 2.5.0) [РЕШЕНИЕ] - PullRequest
2 голосов
/ 16 июня 2020

У меня есть приложение Spring Boot, использующее GlobalKTable. Он работал нормально до обновления до kafka-streams-5.5.0- css (версия Confluent Platform, совместимая с Apache Kafka 2.5.0) с 5.3.2- css (Apache Kafka 2.3.1).

Итак, это моя конфигурация:

@Configuration
@EnableKafkaStreams
public class GlobalTableConfiguration {

    public GlobalTableConfiguration() {
    }

    @Bean
    public GlobalKTable<String, String> table(StreamsBuilder kStreamsBuilder) {
        return kStreamsBuilder.globalTable("topic1", Consumed.with(null, null), 
                                            Materialized.as("topic1-store"));
    }
}

Я получаю магазин следующим образом:

streamsBuilderFactoryBean.getKafkaStreams().
                store("topic1-store", QueryableStoreTypes.keyValueStore());

это не работает с:

Request processing failed; nested exception is java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)


Caused by: java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
    at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:316)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1182)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1169)

Я вижу, что поток потока завершается до этого:

2020-06-16 13:22:46.943  INFO 72423 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
        2020-06-16 13:22:46.944  INFO 72423 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
        2020-06-16 13:22:46.944  INFO 72423 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1592299366943
        2020-06-16 13:22:46.946  INFO 72423 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: aKrIp_7wQcqF9OlSUoBgSQ
        2020-06-16 13:22:47.496  INFO 72423 --- [    Test worker] org.apache.kafka.streams.KafkaStreams    : stream-client [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4] State transition from ERROR to PENDING_SHUTDOWN
        2020-06-16 13:22:47.497  INFO 72423 --- [ms-close-thread] o.a.k.s.p.internals.StreamThread         : stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-StreamThread-1] Informed to shut down
        2020-06-16 13:22:47.497  INFO 72423 --- [ms-close-thread] o.a.k.s.p.internals.GlobalStreamThread   : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] State transition from RUNNING to PENDING_SHUTDOWN
        2020-06-16 13:22:47.557  INFO 72423 --- [balStreamThread] o.a.k.s.p.internals.GlobalStreamThread   : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] Shutting down
        2020-06-16 13:22:47.571  INFO 72423 --- [balStreamThread] o.a.k.s.p.internals.GlobalStreamThread   : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] State transition from PENDING_SHUTDOWN to DEAD
        2020-06-16 13:22:47.571  INFO 72423 --- [balStreamThread] o.a.k.s.p.internals.GlobalStreamThread   : global-stream-thread [app-d09c3f52-8d77-4814-944b-ba08b79ed8a4-GlobalStreamThread] Shutdown complete

После некоторых экспериментов я заставил его работать, добавив в свою конфигурацию:

    @Bean
    public KStream kStream(StreamsBuilder kStreamsBuilder) {
        return kStreamsBuilder.stream("some-topic", Consumed.with(null, null));
    }

Итак, в основном, когда У меня есть определенный KStream (потребляющий от любого topi c), поток потока остается живым, и все работает, как до обновления. Мой вопрос: как правильно это сделать без этого бесполезного bean-компонента (и topi c).

EDIT

Здесь обсуждалась похожая проблема: Kafka Streams 2.5.0 требует ввода topi c Похоже, это будет исправлено в kafka-streams 2.5.1 и util, а затем установка num.stream.threads: 0 более удобное решение, чем объявление фиктивного потока.

1 Ответ

0 голосов
/ 17 июня 2020

Похоже, что это не имеет ничего общего со Spring и вызвано некоторыми внутренними изменениями в классах kafka-streams.

Это отлично работает с Boot 2.2.x (Kafka-streams 2.3.x).

@SpringBootApplication
@EnableKafkaStreams
public class So62406117Application {

    public static void main(String[] args) {
        SpringApplication.run(So62406117Application.class, args);
    }

    @Bean
    public GlobalKTable<String, String> table(StreamsBuilder kStreamsBuilder) {
        return kStreamsBuilder.globalTable("topic1", Consumed.with(null, null),
                Materialized.as("topic1-store"));
    }

    @Bean
    public ApplicationRunner runner(StreamsBuilderFactoryBean fb) {
        return args -> {
            ReadOnlyKeyValueStore<Object, Object> store =
                    fb.getKafkaStreams().store("topic1-store", QueryableStoreTypes.keyValueStore());
            System.out.println(store);
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1").partitions(1).replicas(1).build();
    }

}

Но не получается при загрузке 2.3 (Kafka-Streams 2.5.0).

Мы определенно запускаем метод KafkaStreams (в заводском компоненте start(), но во время этого start() получаем


java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1228) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) ~[kafka-streams-2.5.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) ~[kafka-streams-2.5.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[kafka-streams-2.5.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-2.5.0.jar:na]

2020-06-16 17:44:02.700  INFO 10635 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
2020-06-16 17:44:02.700  INFO 10635 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1] Shutting down
2020-06-16 17:44:02.700  INFO 10635 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-06-16 17:44:02.700  INFO 10635 --- [-StreamThread-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2020-06-16 17:44:02.704  INFO 10635 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2020-06-16 17:44:02.704  INFO 10635 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [foo-235af8e6-6618-4e73-86ad-75307130004b] State transition from REBALANCING to ERROR
2020-06-16 17:44:02.704 ERROR 10635 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [foo-235af8e6-6618-4e73-86ad-75307130004b] All stream threads have died. The instance will be in error state and should be closed.
2020-06-16 17:44:02.704  INFO 10635 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [foo-235af8e6-6618-4e73-86ad-75307130004b-StreamThread-1] Shutdown complete
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...