Я обращаюсь к хранилищу состояний, чтобы запросить его, и мне пришлось обернуть оператор store()
блоком try / catch, чтобы повторить попытку, потому что иногда я получаю это исключение:
org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store customers-store because the stream thread is PARTITIONS_REVOKED, not RUNNING
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:57)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1053)
at com.codependent.kafkastreams.customer.service.CustomerService.getCustomer(CustomerService.kt:75)
at com.codependent.kafkastreams.customer.service.CustomerServiceKt.main(CustomerService.kt:108)
Этокод, используемый для извлечения магазина (полный код находится на github repo ):
fun getCustomer(id: String): Customer? {
var keyValueStore: ReadOnlyKeyValueStore<String, Customer>? = null
while(keyValueStore == null) {
try {
keyValueStore = streams.store(CUSTOMERS_STORE, QueryableStoreTypes.keyValueStore<String, Customer>())
} catch (ex: InvalidStateStoreException) {
ex.printStackTrace()
}
}
val customer = keyValueStore.get(id)
return customer
}
И это основная программа:
fun main(args: Array<String>) {
val customerService = CustomerService("main", "localhost:9092")
customerService.initializeStreams()
customerService.createCustomer(Customer("53", "Joey"))
val customer = customerService.getCustomer("53")
println(customer)
customerService.stopStreams()
}
исключение происходит случайным образом при запуске программы несколько раз, после завершения предыдущих выполнений.Примечание: я ничего не делаю с исполняющим кластером Kafka и использую его конфигурацию по умолчанию.