Состояние проблемы:
Topi c 1: "ключ = empId, значение = empname, deptName , ..."
Topi c 2 : "key = deptName , value = deptName"
Мне нужны данные из Topi c 1, где deptName (атрибут значения в topi c 1) равен ключу Topi c 2.
Шаги:
- Создайте поток из Topi c 1, сгруппируйте его по deptName и выполните агрегацию. Будет возвращено Ktable
(key =deptName, value = "empId1,empId2,empId3 ..")
- Создать поток из Topi c 2
(key ="deptName" value = "deptName")
- Выполнить операцию левого соединения на Ktable (Шаг 1) и KSteam (Шаг 2).
(KStream-Ktable)
- И объединение возвращает желаемый результат.
В одном разделе все работает как положено, однако после переключения на несколько разделов объединение не возвращает никаких данных .
Шаг 1:
KGroupedStream<String, Object> groupedStream = adStream.groupBy((key, value) -> value.getOrganizationId().toString());
groupedStream
.aggregate(() -> (new String()),
(aggKey, newValue, aggValue) -> addCurrentValue(aggValue,
String.valueOf(newValue.getOriginId())),
Materialized.as("aggregated-stream-store").with(strSerde, strSerde))
.toStream().to(Constant.AD_AGGREGATED_DATA, Produced.with(strSerde, strSerde));
Шаг 2:
KStream<String, String> swgOrgStream = builder.stream(Constant.SWG_ORG_TOPIC,Consumed.with(strSerde, strSerde));
Шаг 3:
KStream<String, String> filteredOrgStream = swgOrgStream.leftJoin(aggregatedTable,
(leftValue, rightValue) -> rightValue);