DSL Kafka Streams: Как закрыть подпотоки, созданные ветвлением / группировкой потоков? - PullRequest
0 голосов
/ 11 октября 2019

Я пишу приложение Kafka Streams в Scala, и меня беспокоит потенциальная утечка памяти / общее использование ресурсов.

Есть ли способ сообщить Кафке о "закрытии" определенной подпрограммыпоток, созданный операциями группирования / ветвления и высвобождения связанных ресурсов?

Чтобы продемонстрировать потенциальную проблему, давайте рассмотрим приложение электронной коммерции, которое переносит события изменения статуса заказа в тему Kafka под названием "my-Супер-вход-тема ". Каждый заказ уникально идентифицируется с помощью OrderId , который используется в качестве ключа сообщения Kafka.

Допустим, нам нужно вычислить количество обновлений статуса для каждого заказа и отправить результаты в . "my-super-output-topic" тема. Следующий фрагмент кода демонстрирует, как это сделать в Scala:

// ...
val builder = new StreamsBuilder
val ktable = builder.stream("my-super-input-topic")
    .groupByKey
    .count

ktable.toStream.to("my-super-output-topic")
// ...

Как я понимаю, .groupBy / .groupByKey делит исходный поток на N подпотоков (по одному на заказ в нашемкейс). Приведенный выше код не указывает никаких окон хранения, поэтому, даже если данный заказ (подпоток) получает событие после нескольких часов бездействия - оно все равно будет правильно обработано, и обновление будет отправлено в раздел-приемник, содержащий правильный агрегированный счет.

Поэтому я делаю вывод, что Кафка хранит информацию о каждом подпотоке в каком-то внутреннем хранилище.

Однако заказы имеют конечное время жизни, и через некоторое время заказ становится завершенным, что означает, что подпоток, связанный с этим заказом, никогда не получит дальнейшие события. Но Кафка все еще рассматривает его как действительный и ожидает дальнейших сообщений, и все больше и больше «мертвых» подпотоков будет накапливаться по мере того, как будет выполняться все больше и больше заказов. Если Kafka выделяет хотя бы некоторые ресурсы для отслеживания каждого подпотока, «мертвые» подпотоки могут вызвать интенсивное использование памяти, даже если это совершенно не нужно.

Поэтому было бы разумно располагать / закрывать определенные подпрограммы. -потоки, как только система поймет, что соответствующие заказы выполнены.

Примечание: это вымышленный вариант использования для демонстрации конкретной проблемы, а не реальная задача. Пожалуйста, не предлагайте реализовать это без Kafka Streams.

1 Ответ

1 голос
/ 13 октября 2019

Правильно, что ваша агрегация всегда будет вести подсчет для каждого ключа. Однако «подпоток» относится к каждому разделу, и, следовательно, каждый подпоток всегда должен содержать некоторые данные.

Закрытие частей топологии невозможно.

Если выВы обеспокоены неограниченным ростом хранилища KTable, вы можете либо рассмотреть (1) использование оконного хранилища, которое в конечном итоге будет удалять старые данные, (2) использование aggregate() вместо счетчика: по умолчанию aggregate()будет просто считать, но если заказ выполнен, UDF может вернуть null - это приведет к удалению пары ключ-значение для заказа из магазина. (3) Или вы решили не использовать DSL, но API-интерфейс процессора, который обеспечивает больший контроль / гибкость для управления хранилищем состояний (вы также можете рассмотреть использование «пунктуаций»).

Вас также может заинтересоватьв: https://issues.apache.org/jira/browse/KAFKA-4212

...