Ошибка сборки материализованного магазина Kafka Streams - PullRequest
0 голосов
/ 06 января 2019

Я пытаюсь создать DSL-код Materialized.as здесь: https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/Stores.html

Но я получаю ошибку

incompatible types: org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted to org.apache.kafka.common.serialization.Serde<java.lang.Object>

На линии

.withKeySerde(Serdes.Long())

Кто-нибудь знает, что здесь может быть не так?

final StreamsBuilder builder = new StreamsBuilder();

   KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
   KTable<Long,String> dataStore = builder.table(
     "example_stream",
     Materialized.as(storeSupplier)
             .withKeySerde(Serdes.Long())
             .withValueSerde(Serdes.String()));

Ответы [ 2 ]

0 голосов
/ 09 января 2019

Проблема в том, что builder.table не знает универсальный тип по умолчанию <Object,Object>. Позднее типы Serde не совпадают. Вам необходимо указать такие типы, как

KTable<Long,String> dataStore = builder.<Long,String>table(
    "example_stream",
    Materialized.as(storeSupplier)
        .withKeySerde(Serdes.Long())
        .withValueSerde(Serdes.String()));
0 голосов
/ 06 января 2019

Я не могу сказать наверняка без примера кода, однако сообщение об ошибке очень ясно. Вы указываете Кафке, что ключ имеет тип Long. Однако ваш ключ на самом деле является другим Java-объектом. Например, если у вас есть сообщение со строковым ключом, этот код изменится на: .withKeySerde(Serdes.String()). Проверьте тип ключа и укажите правильный Serde для этого типа.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...