Мы недавно обновили нашу версию kafka с 0.10 до 1.0, и я обновляю устаревший код
KTable<Long, myClass> myKTable = this.streamBuilder
.stream(Serdes.Long(), mySerde, sub_topic)
.groupByKey(Serdes.Long(), mySerde)
.reduce(myReducer, my_store);
на этот
KTable<Long, myClass> myKTable = this.streamBuilder
.stream(sub_topic, Consumed.with(Serdes.Long(), mySerde))
.groupByKey(Serialized.with(Serdes.Long(), mySerde))
.reduce(myReducer, Materialized.as(my_store));
Мой поток выдает ошибку при сериализации в groupByKey
.Serialized.with()
не использует предоставленный keySerde
и по умолчанию возвращается к byteArray
.И этот byteArray serde
затем встречает мой ключ, который является Long
, и выдает ошибку приведения.
Кто-нибудь еще сталкивался с этой ошибкой в версии 1.0.0 kafka.Первый код с устаревшей версией kafka работает нормально.Но обновление кода для использования Serialized.with()
, похоже, не работает.Любая помощь с благодарностью.