Поэтому мне нужно что-то вроде KTable или GlobalKTable, но они сохраняют только одно значение.
Продолжайте использовать KTable
(или GlobalKTable
), но используйте структурированное значение и / или коллекцию в качестве "значения". Ничто не заставляет Кафку заставлять вас ограничивать значение сообщения только примитивным типом данных (например, Integer
или String
).
Подумайте: KStream<UserId, List<ClickEvent>>
. Здесь каждое сообщение принадлежит конкретному пользователю (идентифицируемый ключом UserId
), и каждое сообщение имеет список из нуля, одного или многих ClickEvent
, связанных с этим пользователем. Это «просто работает», вам нужно только иметь соответствующие serdes (сериализатор / десериализатор) для типов данных, которые вы хотите использовать.
Например, CustomStreamTableJoin
пример в https://github.com/confluentinc/kafka-streams-examples ( прямая ссылка на пример для v5.2.1 , который для Apache Kafka v2.2) использует Pair
класс для хранения кортежа в значении сообщения Кафки и сопровождающего его PairSerde
. То же самое можно сделать (и делают разработчики) для хранения коллекций значений, таких как List<ClickEvent>
, как вы упомянули для своего собственного варианта использования.
Мне нужно сохранить несколько последних значений ключа в теме кафки с использованием потоков кафки. [...]
Я выяснил один из возможных способов сделать это: создание потока и изменяемой карты, [...]
Вам не нужно использовать Map
. Ключ уже доступен в сообщении Kafka, поэтому для значения сообщения вам нужен только тип данных типа List.
или, по крайней мере, без изменяемой карты.
Вам не нужно (и не следует) использовать изменяемую структуру данных, если только для этого нет особой причины, которую я не думаю, что есть в вашем случае использования. Когда новое сообщение обрабатывается и соответствующий вывод сохраняется в KTable
, то все, что было сохранено в таблице для этого ключа, будет перезаписано - поэтому использование неизменяемой структуры данных в качестве значения сообщения вполне нормально.