Kafka 1.0.0 - Serialized.with () использует serde по умолчанию вместо предоставленных - PullRequest
0 голосов
/ 16 мая 2018

Мы недавно обновили нашу версию kafka с 0.10 до 1.0, и я обновляю устаревший код

KTable<Long, myClass> myKTable = this.streamBuilder
            .stream(Serdes.Long(), mySerde, sub_topic)
            .groupByKey(Serdes.Long(), mySerde)
            .reduce(myReducer, my_store);

на этот

KTable<Long, myClass> myKTable = this.streamBuilder
            .stream(sub_topic, Consumed.with(Serdes.Long(), mySerde))
            .groupByKey(Serialized.with(Serdes.Long(), mySerde))
            .reduce(myReducer, Materialized.as(my_store));

Мой поток выдает ошибку при сериализации в groupByKey.Serialized.with() не использует предоставленный keySerde и по умолчанию возвращается к byteArray.И этот byteArray serde затем встречает мой ключ, который является Long, и выдает ошибку приведения.

Кто-нибудь еще сталкивался с этой ошибкой в ​​версии 1.0.0 kafka.Первый код с устаревшей версией kafka работает нормально.Но обновление кода для использования Serialized.with(), похоже, не работает.Любая помощь с благодарностью.

1 Ответ

0 голосов
/ 17 мая 2018

Можете ли вы поделиться трассировкой стека? Я действительно думаю, что проблема связана с reduce() - вам нужно снова указать Serdes через Materialized.

Это своего рода регресс на новом API, который был недавно исправлен в trunk: https://github.com/apache/kafka/pull/4919 Таким образом, в следующем выпуске 2.0 будет исправление.

...