Почему я иногда получаю InvalidStateStoreException PARTITIONS_REVOKED, а не RUNNING при получении магазина для его запроса? - PullRequest
0 голосов
/ 25 августа 2018

Я обращаюсь к хранилищу состояний, чтобы запросить его, и мне пришлось обернуть оператор store() блоком try / catch, чтобы повторить попытку, потому что иногда я получаю это исключение:

org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store customers-store because the stream thread is PARTITIONS_REVOKED, not RUNNING
    at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:57)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1053)
    at com.codependent.kafkastreams.customer.service.CustomerService.getCustomer(CustomerService.kt:75)
    at com.codependent.kafkastreams.customer.service.CustomerServiceKt.main(CustomerService.kt:108)

Этокод, используемый для извлечения магазина (полный код находится на github repo ):

fun getCustomer(id: String): Customer? {
    var keyValueStore: ReadOnlyKeyValueStore<String, Customer>? = null
    while(keyValueStore == null) {
        try {
            keyValueStore = streams.store(CUSTOMERS_STORE, QueryableStoreTypes.keyValueStore<String, Customer>())
        } catch (ex: InvalidStateStoreException) {
            ex.printStackTrace()
        }
    }
    val customer = keyValueStore.get(id)
    return customer
}

И это основная программа:

fun main(args: Array<String>) {
    val customerService = CustomerService("main", "localhost:9092")
    customerService.initializeStreams()
    customerService.createCustomer(Customer("53", "Joey"))
    val customer = customerService.getCustomer("53")
    println(customer)
    customerService.stopStreams()
}

исключение происходит случайным образом при запуске программы несколько раз, после завершения предыдущих выполнений.Примечание: я ничего не делаю с исполняющим кластером Kafka и использую его конфигурацию по умолчанию.

1 Ответ

0 голосов
/ 30 августа 2018

В то время, когда вы обращаетесь к магазину, приложение Kafka Streams претерпевает перебалансировку, и хранилища состояний в это время недоступны. Вы хотите убедиться, что запрашиваете хранилища только тогда, когда состояние приложения РАБОТАЕТ, а не РЕБАЛАНСИРУЕТСЯ.

Что вы можете сделать, это проверить состояние приложения, прежде чем пытаться читать из магазина, как это:

if(streams.state() == State.RUNNING) {
    keyValueStore = streams.store(...);
    val customer = keyValueStore.get(id);
    return customer;
}

Существует также метод KafkaStreams.setStateListener, который можно использовать для регистрации реализации KafkStreams.StateListener. Метод StateListener.onChange вызывается каждый раз, когда приложение меняет свое состояние.

...