KTable<Key1, GenericRecord> primaryTable = createKTable(key1, kstream, statestore-name);
KTable<Key2, GenericRecord> childTable1 = createKTable(key1, kstream, statestore-name);
KTable<Key3, GenericRecord> childTable2 = createKTable(key1, kstream, statestore-name);
primaryTable.leftJoin(childTable1, (primary, choild1) -> compositeObject)
.leftJoin(childTable2,(compositeObject, child2) -> compositeObject, Materialized.as("compositeobject-statestore"))
.toStream().to(""composite-topics)
Для моего приложения я использую соединения KTable-Ktable, чтобы при получении данных в первичном или дочернем потоке он мог устанавливать его CompositeObject с установщиками и получателями для всех трех таблиц.Эти три входящих потока имеют разные ключи, но при создании KTable я делаю ключи одинаковыми для всех трех KTable.
У меня есть все темы с одним разделом.Когда я запускаю приложение на одном экземпляре, все работает нормально.Я вижу составной объект, заполненный данными из всех трех таблиц.Все интерактивные запросы также работают нормально, передавая идентификатор записи и имя локального хранилища состояний.
Но когда я запускаю два экземпляра одного и того же приложения, я вижу составной объект с данными первичного и дочернего типов, но дочерний элемент остается пустым.Даже если я пытаюсь позвонить в Statestore с помощью интерактивного запроса, он ничего не возвращает.
Я использую библиотеки spring-cloud-stream-kafka-streams для написания кода.
Пожалуйстапредположите, по какой причине он не устанавливается, и какое должно быть правильное решение для этого.