Невозможно открыть хранилище для потоков Kafka из-за недопустимого состояния - PullRequest
0 голосов
/ 29 ноября 2018

Я пытаюсь работать с потоками Кафки и создал следующую топологию:

    KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
            historyEventSerde));

    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey()
            .reduce((e1, e2) -> e2, Materialized.as(streamByKeyStoreName));

Позже я запускаю потоки следующим образом:

private void startKafkaStreams(KafkaStreams streams) {
    CompletableFuture<KafkaStreams.State> stateFuture = new CompletableFuture<>();
    streams.setStateListener((newState, oldState) -> {
        if(stateFuture.isDone()) {
            return;
        }

        if(newState == KafkaStreams.State.RUNNING || newState == KafkaStreams.State.ERROR) {
            stateFuture.complete(newState);
        }
    });

    streams.start();
    try {
        KafkaStreams.State finalState = stateFuture.get();
        if(finalState != KafkaStreams.State.RUNNING) {
            // ...
        }
    } catch (InterruptedException ex) {
        // ...
    } catch(ExecutionException ex) {
        // ...
    }
}

Мои потоки запускаются безошибка, и они в конечном итоге попадают в состояние RUNNING, где будущее завершено.Позже я пытаюсь получить доступ к тому хранилищу, которое я создал в своей топологии для таблицы KTable:

public KafkaFlowHistory createFlowHistory(String flowId) {
    ReadOnlyKeyValueStore<HistoryEventKey, HistoryEvent> store = streams.store(streamByKeyStoreName,
            QueryableStoreTypes.keyValueStore());
    return new KafkaFlowHistory(flowId, store, event -> topicProducer.send(new ProducerRecord<>(applicationTopicName, flowId, event)));
}

Я убедился, что createFlowHistory вызывается после завершения инициализирующего будущего в состоянии RUNNING, однакоЯ постоянно не могу это сделать, и KafkaStreams сообщает о следующей ошибке:

Исключение в потоке "main" org.apache.kafka.streams.errors.InvalidStateStoreException: Невозможно получить поток хранилища состояний.stream-file-service-test-instance-by-key, поскольку поток потока имеет значение PARTITIONS_ASSIGNED, а не RUNNING

Видимо, состояние потока изменилось.Нужно ли позаботиться об этом вручную при попытке сделать запрос в магазин и подождать, пока внутренний поток Kafka не войдет в нужное состояние?

1 Ответ

0 голосов
/ 29 ноября 2018

При запуске Kafka Streams выполняет следующие переходы состояний:

CREATED -> RUNNING -> REBALANCING -> RUNNING

Вам нужно подождать второе состояние RUNNING, прежде чем вы сможете выполнить запрос.Это известная проблема, и мы надеемся исправить ее в следующих выпусках.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...