хранилище состояний могло быть перенесено в другой экземпляр - PullRequest
1 голос
/ 03 апреля 2019

, когда я пытаюсь получить доступ к стерео состояния из потока, я получаю ошибку ниже,

Хранилище состояний, хранилище счетчиков, возможно, мигрировало в другой экземпляр

, когда я пытался получить доступ к ReadOnlyKeyValueStoreиз хранилища, получая сообщение об ошибке как перенесено на другой сервер.но у меня работает только один брокер

/**
 * 
 */
package com.ms.kafka.com.ms.stream;

import java.util.Properties;
import java.util.stream.Stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import com.ms.kafka.com.ms.entity.TrackingEvent;
import com.ms.kafka.com.ms.entity.TrackingEventDeserializer;
import com.ms.kafka.com.ms.entity.TrackingEvnetSerializer;


/**
 * @author vettri
 *
 */
public class EventStreamer {

    /**
     * 
     */
    public EventStreamer() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "trackeventstream_stream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CLIENT_ID_CONFIG,"testappdi");
        props.put("auto.offset.reset","earliest");
        /*
         * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
         * Serdes.String().getClass());
         * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
         * Serdes.String().getClass());
         */

        final StreamsBuilder builder = new StreamsBuilder();
        final KStream<String , TrackingEvent> eventStream = builder.stream("rt_event_command_topic_stream",Consumed.with(Serdes.String(),
                Serdes.serdeFrom(new TrackingEvnetSerializer(), new TrackingEventDeserializer())));
        KTable<String, Long> groupedByUniqueId = eventStream.groupBy((k,v) -> v.getUniqueid()).
                count(Materialized.as("count-store"));

        /*
         * KTable<Integer, Integer> table = builder.table( "rt_event_topic_stream",
         * Materialized.as("queryable-store-name"));
         */

        //eventStream.filter((k,v) -> "9de3b676-b20f-4b7a-878b-526fd5948a34".equalsIgnoreCase(v.getUniqueid())).foreach((k,v) -> System.out.println(v));
        final KafkaStreams stream = new KafkaStreams(builder.build(), props);
        stream.cleanUp();
        stream.start();
        System.out.println("Strema state : "+stream.state().name());
        String queryableStoreName = groupedByUniqueId.queryableStoreName();
        /*
         * ReadOnlyKeyValueStore keyValStore1 =
         * waitUntilStoreIsQueryable(queryableStoreName, (QueryableStoreTypes)
         * QueryableStoreTypes.keyValueStore(),stream);
         */ ReadOnlyKeyValueStore<Long , TrackingEvent> keyValStore = stream.store(queryableStoreName, QueryableStoreTypes.<Long,TrackingEvent>keyValueStore());

       // System.out.println("results --> "+keyValStore.get((long) 158));
        //streams.close();
    }

    public static <T> T waitUntilStoreIsQueryable(final String storeName,
            final QueryableStoreTypes queryableStoreType, final KafkaStreams streams) throws InterruptedException {
        while (true) {
            try {
                return streams.store(storeName, (QueryableStoreType<T>) queryableStoreType);
            } catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
                System.out.println("system is waitng to ready for state store");
                Thread.sleep(100);
                //streams.close();
            }
        }
}

}

Мне нужно получить данные, которые я хранил в государственном хранилище,

, что я пытаюсь сделать, нужно сохранить их в локальном хранилищеи получить сильный текст

1 Ответ

1 голос
/ 04 апреля 2019

В вашем случае локальный экземпляр KafkaStreams еще не готов, и, следовательно, его локальные хранилища состояний еще не могут быть запрошены.

Перед запросом следует подождать, пока KafkaStreams будет в состоянии RUNNING. Вам нужно позвонить вам waitUntilStoreIsQueryable(...).

Пример можно найти в Confluent github:

Более подробную информацию о причине можно найти здесь: https://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

...