Буферизация в оконном приложении Kafka Streams - PullRequest
0 голосов
/ 20 декабря 2018

В нашем приложении мы стремимся брать сообщения JSON из входной темы, объединять их в заданном окне и записывать в целевую тему.mergeJsonNodes - это функция, которая отвечает за простое объединение двух объектов JSON.

KStream<String, JsonNode> transformed = datastreamSource
  .groupByKey(Serialized.with(Serdes.String(), JSON_SERDE))
  .windowedBy(SessionWindows.with(60 * 1000))
  .reduce((a, b) -> mergeJsonNodes(a, b))
  .toStream((windowedKey, node) -> windowedKey.key());

Мы успешно развернули это в нескольких наших непроизводственных средах.Однако, когда мы перешли к производству, где объем входной темы (datastreamSource) намного больше, мы столкнулись с узким местом, которое мы стремимся понять.

Мы видим, что наше приложение потоковмедленно продвигается по исходной теме и фиксирует тему назначения ~ раз в минуту.Однако он слишком медленно поглощает входные темы, чтобы не отставать от нашего рабочего трафика, связанного с этой темой.Мы выполняем миграцию из приложения без окон, сгруппированных потоков, которое работает хорошо в течение многих месяцев.

Ресурсы на хосте для приложения потоков Kafka, по-видимому, не ограничены;дело не в том, что приложению не хватает памяти или диска.

Наш вопрос заключается в том, какие другие факторы, в частности настройки конфигурации, мы могли бы изменить, чтобы приложение потоков могло вытягивать больше сообщений изВведите тему за раз.Похоже, что наше приложение каким-то образом ограничено в своей способности продолжать чтение из исходного кода.

Два, которые изначально выскочили на нас из Документы :
* buffered.records.per.partition
* cache.max.bytes.buffering

Есть ли у кого-нибудь опыт с высокой пропускной способностьюоконные потоковые приложения, которые могут предоставить какие-либо указатели?Спасибо !!

1 Ответ

0 голосов
/ 20 декабря 2018

Я не знаю, в частности, о оконном агрегировании, но при агрегации в потоках Кафки у вас есть две конфигурации, чтобы посмотреть, как обрабатывать, как узел процессора агрегации будет кэшировать сообщение перед сбросом в хранилище состояний и отправкой записей агрегации результатов процессорам нижестоящих потоков:cache.max.bytes.buffering, commit.interval.ms.

У вас есть конфигурация потребителя, которую вы можете настроить в потоках kafka: poll.ms.

Вы также можете масштабировать свое приложение, сколько разделов имеет ваш вводтема?Это приведет к тому, что количество задач, обрабатываемых вашей входной темой, возрастет, и, следовательно, повлияет на масштабируемость вашего приложения.

Чем больше разделов, тем больше задач означает, что больше потребителей означает больше экземпляров или даже больше потоков в экземплярах (проверьте num.streams.thread).

Надеюсь, это поможет.

...