KTable не может получить данные из материализованного представления - PullRequest
0 голосов
/ 26 апреля 2018

Я использую Kafka Streams с Spring Boot.В моем случае использования, когда я получаю событие клиента, мне нужно сохранить его в customer-store материализованном представлении, а когда я получаю событие заказа, мне нужно присоединиться к клиенту и заказать, а затем сохранить результат в customer-order материализованном представлении.

StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer-store"),Serdes.String(), customerSerde)
                .withLoggingEnabled(new HashMap<>());
streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde)).to("customer-to-ktable-topic",Produced.with(Serdes.String(), customerSerde));
KTable<String, Customer> customerKTable = streamsBuilder.table("customer-to-ktable-topic", Consumed.with(Serdes.String(), customerSerde),Materialized.as(customerStateStore.name()));

Вот проблема, когда я получаю событие Order, и мой customerKTable возвращает ноль, и операция соединения становится бесполезной.Это не так, как это должно работать.Мой код похож на пример Kafka Music , я создал класс TestConsumer для проверки этого.Код загружен на Github для справки.

1 Ответ

0 голосов
/ 27 апреля 2018

Эта проблема была создана KTable.Синтаксис KTable, который я использовал, был синтаксически правильным, но не работал.См. этот вопрос для получения дополнительной информации.Изменение синтаксиса KTable работало для меня.Теперь customerKTable возвращает события или объекты из материализованного представления, когда поступило событие Order.

...