Встроенное хранилище миграции Kafka State State между двумя экземплярами с разными @StreamListener - PullRequest
0 голосов
/ 08 сентября 2018

У меня есть приложение SpringBoot с двумя потоковыми процессорами, сопоставленными через Spring Cloud. Каждый процессор имеет собственный @StreamListener для разных тем. Один процессор записывает агрегированные данные в надежное хранилище состояний. Я столкнулся с проблемой в моем модульном тесте при получении данных через мой @Service (Служба получает агрегированные данные из хранилища состояний). По какой-то причине время от времени ловлю исключение:

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043) at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)

Когда я удаляю StreamListener из другого процессора, все работает нормально и стабильно.

Как связать хранилище состояний для точного экземпляра с соответствующим процессором?

1 Ответ

0 голосов
/ 13 сентября 2018

Я нашел решение своей проблемы, возможно, это поможет кому-то еще.Я использовал не последнюю версию spring-cloud-stream-binder-kafka-streams.Моя версия была 2.0.0, и в ней есть ошибка https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/366.Это было исправлено только в версии 2.0.1

...