Я пишу приложение Kafka Streams в Scala, и меня беспокоит потенциальная утечка памяти / общее использование ресурсов.
Есть ли способ сообщить Кафке о "закрытии" определенной подпрограммыпоток, созданный операциями группирования / ветвления и высвобождения связанных ресурсов?
Чтобы продемонстрировать потенциальную проблему, давайте рассмотрим приложение электронной коммерции, которое переносит события изменения статуса заказа в тему Kafka под названием "my-Супер-вход-тема ". Каждый заказ уникально идентифицируется с помощью OrderId , который используется в качестве ключа сообщения Kafka.
Допустим, нам нужно вычислить количество обновлений статуса для каждого заказа и отправить результаты в . "my-super-output-topic" тема. Следующий фрагмент кода демонстрирует, как это сделать в Scala:
// ...
val builder = new StreamsBuilder
val ktable = builder.stream("my-super-input-topic")
.groupByKey
.count
ktable.toStream.to("my-super-output-topic")
// ...
Как я понимаю, .groupBy / .groupByKey делит исходный поток на N подпотоков (по одному на заказ в нашемкейс). Приведенный выше код не указывает никаких окон хранения, поэтому, даже если данный заказ (подпоток) получает событие после нескольких часов бездействия - оно все равно будет правильно обработано, и обновление будет отправлено в раздел-приемник, содержащий правильный агрегированный счет.
Поэтому я делаю вывод, что Кафка хранит информацию о каждом подпотоке в каком-то внутреннем хранилище.
Однако заказы имеют конечное время жизни, и через некоторое время заказ становится завершенным, что означает, что подпоток, связанный с этим заказом, никогда не получит дальнейшие события. Но Кафка все еще рассматривает его как действительный и ожидает дальнейших сообщений, и все больше и больше «мертвых» подпотоков будет накапливаться по мере того, как будет выполняться все больше и больше заказов. Если Kafka выделяет хотя бы некоторые ресурсы для отслеживания каждого подпотока, «мертвые» подпотоки могут вызвать интенсивное использование памяти, даже если это совершенно не нужно.
Поэтому было бы разумно располагать / закрывать определенные подпрограммы. -потоки, как только система поймет, что соответствующие заказы выполнены.
Примечание: это вымышленный вариант использования для демонстрации конкретной проблемы, а не реальная задача. Пожалуйста, не предлагайте реализовать это без Kafka Streams.