Как сохранить N последних значений ключа в теме кафки, используя потоки кафки - PullRequest
3 голосов
/ 06 мая 2019

Допустим, я использую потоки kafka (библиотека kafka-streams-scala, версия 2.2.0).

Мне нужно сохранить несколько последних значений ключа в теме kafka с использованием потоков kafka.Я использую это для обогащения другого потока.Поэтому мне нужно что-то вроде KTable или GlobalKTable, но они сохраняют только одно значение.

Я нашел один из возможных способов сделать это: создать поток и изменяемую карту, а затем использовать stream.foreach для отслеживания N последних значений для каждого ключа.

val stream: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topicName")

val map = scala.collection.mutable.Map[String, List[MyObject]]

stream.foreach((k, v) =>  {
  //update map
})

У меня вопрос: есть ли лучший подход для достижения этого - либо с помощью API потоков, либо, по крайней мере, без изменяемой карты.

1 Ответ

3 голосов
/ 06 мая 2019

Поэтому мне нужно что-то вроде KTable или GlobalKTable, но они сохраняют только одно значение.

Продолжайте использовать KTable (или GlobalKTable), но используйте структурированное значение и / или коллекцию в качестве "значения". Ничто не заставляет Кафку заставлять вас ограничивать значение сообщения только примитивным типом данных (например, Integer или String).

Подумайте: KStream<UserId, List<ClickEvent>>. Здесь каждое сообщение принадлежит конкретному пользователю (идентифицируемый ключом UserId), и каждое сообщение имеет список из нуля, одного или многих ClickEvent, связанных с этим пользователем. Это «просто работает», вам нужно только иметь соответствующие serdes (сериализатор / десериализатор) для типов данных, которые вы хотите использовать.

Например, CustomStreamTableJoin пример в https://github.com/confluentinc/kafka-streams-examples ( прямая ссылка на пример для v5.2.1 , который для Apache Kafka v2.2) использует Pair класс для хранения кортежа в значении сообщения Кафки и сопровождающего его PairSerde. То же самое можно сделать (и делают разработчики) для хранения коллекций значений, таких как List<ClickEvent>, как вы упомянули для своего собственного варианта использования.

Мне нужно сохранить несколько последних значений ключа в теме кафки с использованием потоков кафки. [...] Я выяснил один из возможных способов сделать это: создание потока и изменяемой карты, [...]

Вам не нужно использовать Map. Ключ уже доступен в сообщении Kafka, поэтому для значения сообщения вам нужен только тип данных типа List.

или, по крайней мере, без изменяемой карты.

Вам не нужно (и не следует) использовать изменяемую структуру данных, если только для этого нет особой причины, которую я не думаю, что есть в вашем случае использования. Когда новое сообщение обрабатывается и соответствующий вывод сохраняется в KTable, то все, что было сохранено в таблице для этого ключа, будет перезаписано - поэтому использование неизменяемой структуры данных в качестве значения сообщения вполне нормально.

...