Внешний ключ, соединяющий два ktable при потоковых обновлениях базы данных - PullRequest
1 голос
/ 16 апреля 2020

У нас есть две таблицы базы данных, которые передают обновления тем kafka через приложение cd c. Мы будем хранить последнюю версию каждого ряда на Ktables. Затем присоединитесь к ним и напишите в любое обновление kafka topi c. Наш код выглядит следующим образом:

pTable = builder.stream(pTopic, pKeyDbOperationEventConsumed)
                          .selectKey(((key, value) -> key.getId()))
                          .mapValues(pMapFunc)
                          .groupByKey(stringPGrouped)
                          .reduce((aggValue, newValue) -> newValue, pMaterializedAs);
sPTable = builder.stream(sPTopic, keyDbOperationEventConsumed)
                               .selectKey(((key, value) -> key.getPId() + "-" + key.getSId()))
                               .mapValues(spMapFunc)
                               .groupByKey(stringSPGrouped)
                               .reduce((aggValue, newValue) -> newValue, sPMaterializedAs);
sPTable.join(pTable, (sp) -> sp.getPId().toString(), joinerFunc)
                     .toStream()
                     .to(upstreamTopic, producedWithFunc);

Он хорошо работал с небольшими данными в локальной среде. Но мы не могли заставить его работать на производстве. Наша установка: 5 пакетов, обе темы имеют 30 разделов. С настройками по умолчанию он начал обрабатываться, но завис после обработки очень маленьких данных. Мы видели Попытка сердцебиения не удалась, так как группа восстанавливает баланс log. Затем мы изменили конфиги на:

- max.poll.interval.ms = 3600000
- request.timeout.ms = 7200000
- session.timeout.ms = 900000
- num.stream.threads = 6 

Но не повезло. Он даже не мог обработать одну запись. И мы включили этот журнал посредника: Ошибка члена x в группе x, удаление его из группы (kafka.coordinator.group.GroupCoordinator)

Мой первый вопрос: если наш вариант использования действительный или нет. Если это действительно так, как мы можем отследить основную проблему?

...