Я использую Kafka Streams с Spring Boot.В моем случае использования, когда я получаю событие клиента, мне нужно сохранить его в customer-store
материализованном представлении, а когда я получаю событие заказа, мне нужно присоединиться к клиенту и заказать, а затем сохранить результат в customer-order
материализованном представлении.
StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer-store"),Serdes.String(), customerSerde)
.withLoggingEnabled(new HashMap<>());
streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde)).to("customer-to-ktable-topic",Produced.with(Serdes.String(), customerSerde));
KTable<String, Customer> customerKTable = streamsBuilder.table("customer-to-ktable-topic", Consumed.with(Serdes.String(), customerSerde),Materialized.as(customerStateStore.name()));
Вот проблема, когда я получаю событие Order, и мой customerKTable
возвращает ноль, и операция соединения становится бесполезной.Это не так, как это должно работать.Мой код похож на пример Kafka Music , я создал класс TestConsumer
для проверки этого.Код загружен на Github для справки.