Kafka Streams Join после объединения не работает с несколькими разделами - PullRequest
1 голос
/ 25 апреля 2020

Состояние проблемы:

Topi c 1: "ключ = empId, значение = empname, deptName , ..."

Topi c 2 : "key = deptName , value = deptName"

Мне нужны данные из Topi c 1, где deptName (атрибут значения в topi c 1) равен ключу Topi c 2.

Шаги:

  1. Создайте поток из Topi c 1, сгруппируйте его по deptName и выполните агрегацию. Будет возвращено Ktable (key =deptName, value = "empId1,empId2,empId3 ..")
  2. Создать поток из Topi c 2 (key ="deptName" value = "deptName")
  3. Выполнить операцию левого соединения на Ktable (Шаг 1) и KSteam (Шаг 2). (KStream-Ktable)
  4. И объединение возвращает желаемый результат.

В одном разделе все работает как положено, однако после переключения на несколько разделов объединение не возвращает никаких данных .

Шаг 1:

KGroupedStream<String, Object> groupedStream = adStream.groupBy((key, value) -> value.getOrganizationId().toString());
        groupedStream
                .aggregate(() -> (new String()),
                        (aggKey, newValue, aggValue) -> addCurrentValue(aggValue,
                                String.valueOf(newValue.getOriginId())),
                        Materialized.as("aggregated-stream-store").with(strSerde, strSerde))
                .toStream().to(Constant.AD_AGGREGATED_DATA, Produced.with(strSerde, strSerde));

Шаг 2:

KStream<String, String> swgOrgStream = builder.stream(Constant.SWG_ORG_TOPIC,Consumed.with(strSerde, strSerde));

Шаг 3:

 KStream<String, String> filteredOrgStream = swgOrgStream.leftJoin(aggregatedTable,
                (leftValue, rightValue) -> rightValue);
...