После долгих исследований я обнаружил проблему в своем синтаксисе. Синтаксис, который я использую, действителен, основан на документации Confluent / Kafka, но он не работает. Поднимет ошибку с командой Кафки. Теперь работает новый синтаксис:
KTable<String, Customer> customerKTable = streamsBuilder.table("customer",Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
.withKeySerde(Serdes.String())
.withValueSerde(customerSerde));
Я должен включить withKeySerde()
и withValueSerde()
, чтобы заставить KTable работать. Но это не упоминается в документации Confluent / Kafka