У нас есть две таблицы базы данных, которые передают обновления тем kafka через приложение cd c. Мы будем хранить последнюю версию каждого ряда на Ktables. Затем присоединитесь к ним и напишите в любое обновление kafka topi c. Наш код выглядит следующим образом:
pTable = builder.stream(pTopic, pKeyDbOperationEventConsumed)
.selectKey(((key, value) -> key.getId()))
.mapValues(pMapFunc)
.groupByKey(stringPGrouped)
.reduce((aggValue, newValue) -> newValue, pMaterializedAs);
sPTable = builder.stream(sPTopic, keyDbOperationEventConsumed)
.selectKey(((key, value) -> key.getPId() + "-" + key.getSId()))
.mapValues(spMapFunc)
.groupByKey(stringSPGrouped)
.reduce((aggValue, newValue) -> newValue, sPMaterializedAs);
sPTable.join(pTable, (sp) -> sp.getPId().toString(), joinerFunc)
.toStream()
.to(upstreamTopic, producedWithFunc);
Он хорошо работал с небольшими данными в локальной среде. Но мы не могли заставить его работать на производстве. Наша установка: 5 пакетов, обе темы имеют 30 разделов. С настройками по умолчанию он начал обрабатываться, но завис после обработки очень маленьких данных. Мы видели Попытка сердцебиения не удалась, так как группа восстанавливает баланс log. Затем мы изменили конфиги на:
- max.poll.interval.ms = 3600000
- request.timeout.ms = 7200000
- session.timeout.ms = 900000
- num.stream.threads = 6
Но не повезло. Он даже не мог обработать одну запись. И мы включили этот журнал посредника: Ошибка члена x в группе x, удаление его из группы (kafka.coordinator.group.GroupCoordinator)
Мой первый вопрос: если наш вариант использования действительный или нет. Если это действительно так, как мы можем отследить основную проблему?