В нашем приложении мы стремимся брать сообщения JSON из входной темы, объединять их в заданном окне и записывать в целевую тему.mergeJsonNodes
- это функция, которая отвечает за простое объединение двух объектов JSON.
KStream<String, JsonNode> transformed = datastreamSource
.groupByKey(Serialized.with(Serdes.String(), JSON_SERDE))
.windowedBy(SessionWindows.with(60 * 1000))
.reduce((a, b) -> mergeJsonNodes(a, b))
.toStream((windowedKey, node) -> windowedKey.key());
Мы успешно развернули это в нескольких наших непроизводственных средах.Однако, когда мы перешли к производству, где объем входной темы (datastreamSource
) намного больше, мы столкнулись с узким местом, которое мы стремимся понять.
Мы видим, что наше приложение потоковмедленно продвигается по исходной теме и фиксирует тему назначения ~ раз в минуту.Однако он слишком медленно поглощает входные темы, чтобы не отставать от нашего рабочего трафика, связанного с этой темой.Мы выполняем миграцию из приложения без окон, сгруппированных потоков, которое работает хорошо в течение многих месяцев.
Ресурсы на хосте для приложения потоков Kafka, по-видимому, не ограничены;дело не в том, что приложению не хватает памяти или диска.
Наш вопрос заключается в том, какие другие факторы, в частности настройки конфигурации, мы могли бы изменить, чтобы приложение потоков могло вытягивать больше сообщений изВведите тему за раз.Похоже, что наше приложение каким-то образом ограничено в своей способности продолжать чтение из исходного кода.
Два, которые изначально выскочили на нас из Документы :
* buffered.records.per.partition
* cache.max.bytes.buffering
Есть ли у кого-нибудь опыт с высокой пропускной способностьюоконные потоковые приложения, которые могут предоставить какие-либо указатели?Спасибо !!