Новая сборка KTable ничего не возвращает - PullRequest
0 голосов
/ 25 апреля 2018

Я пытаюсь использовать KTable для использования событий из темы Кафки.Но это ничего не возвращает.Когда я использую KStream, он возвращает и печатает объекты.Это действительно странно. Производитель и потребитель можно найти здесь

//Not working    
KTable<String, Customer> customerKTable = streamsBuilder.table("customer", Consumed.with(Serdes.String(), customerSerde),Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name()));
customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)));

//KStream working
KStream<String, Customer> customerKStream= streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde));
customerKStream.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)))

1 Ответ

0 голосов
/ 27 апреля 2018

После долгих исследований я обнаружил проблему в своем синтаксисе. Синтаксис, который я использую, действителен, основан на документации Confluent / Kafka, но он не работает. Поднимет ошибку с командой Кафки. Теперь работает новый синтаксис:

KTable<String, Customer> customerKTable = streamsBuilder.table("customer",Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
                                                            .withKeySerde(Serdes.String())
                                                            .withValueSerde(customerSerde));

Я должен включить withKeySerde() и withValueSerde(), чтобы заставить KTable работать. Но это не упоминается в документации Confluent / Kafka

...