Почему Kafka меняет название магазина - PullRequest
0 голосов
/ 28 ноября 2018

У меня проблема с моим приложением.

Код:

KTable<Long, byte[]> table = stream.groupByKey().aggregate(() -> null , (key, oldVal, newVal) -> {
        return newVal;
    }, Materialized.<Long,byte[],KeyValueStore<Long,byte[]>>as("networkStore").with(longSerde, byteSerde));

здесь я устанавливаю Имя магазина равным networkStore, но когда я перечисляю темы Кафки, название магазина равно network-service-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog

ЧтоЯ хочу, чтобы: - имя магазина было networkStore, чтобы я мог читать из него позже.

Когда я пытаюсь читать из магазина сейчас, это дает мне следующее исключение:

org.apache.kafka.streams.errors.InvalidStateStoreException: хранилище состояний networkStore может быть перенесено в другой экземпляр.в org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore (QueryableStoreProvider.java:60) в org.apache.kafka.streams.KafkaStreams.store (KafkaStreams.java:1039) в com.maxflow.networksservice.utils.NetworksServiceUtils.updateGraphForCompany (NetworksServiceUtils.java:41) в com.maxflow.networksservice.consumer.NodesConsumer.run (NodesConsumer.java:99) в java.lang.Thread.run (Thread.java:748)

1 Ответ

0 голосов
/ 28 ноября 2018

Используйте следующее:

KTable<Long, byte[]> table = stream.groupByKey().aggregate(() -> null , (key, oldVal, newVal) -> {
        return newVal;
    }, Materialized.with(longSerde, byteSerde).as("networkStore"));

Materialized.as().with() заменяет пользовательское имя внутренним именем.Следовательно, вы должны вызывать метод .as() после .with().Вы можете прочитать более подробную информацию здесь.

https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Materialized.html#with-org.apache.kafka.common.serialization.Serde-org.apache.kafka.common.serialization.Serde-

Другой вариант - использование метода .withKeySerde() и .withValueSerde() с пользовательским именем магазина, как указано ниже.

Materialized.<Long,byte[],KeyValueStore<Long,byte[]>>as("networkStore").withKeySerde(longSerde).withValueSerde(byteSerde)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...