Я спрашиваю себя, удалит ли KGroupedStream некоторые потоки через некоторое время или нет.
Ничего не удалит.
Если я понимаю остальную частьваш вопрос, вы спрашиваете, как работает оператор aggregate()
.Он использует локальное хранилище состояний (реализованное с использованием RocksDB) для хранения <userId, X>
с X
, независимо от того, что возвращает ваш UDF агрегирования ((user, address, queue) -> { }
), т. Е. Это должно быть X == queue
).Таким образом, каждая входная запись выполняет локальный поиск в RocksDB, чтобы извлечь текущий queue
, обновить его, записать его обратно в RocksDB и отправить в нисходящий поток в ваш оператор to()
, который также записывает его в тему результата.
Также ознакомьтесь с документацией для получения более подробной информации: https://kafka.apache.org/21/documentation/streams/ Существует также множество других материалов о потоках Kafka и о том, как они работают в Интернете (сообщения в блогах, записи разговоров, слайды ...)
Интересно, что результат после объединения (в компактной теме, называемой user_addresses) имеет больше записей, чем пользователь таблицы записей.Я посмотрел глубже и увидел, что у пользователя с одним и тем же ключом есть несколько вхождений (несколько смещений).При наименьшем смещении этот пользователь не имеет адресов, затем при более высоком смещении он имеет один адрес в своем списке, а с наибольшим смещением он имеет два адреса в своем списке.Я снова спрашиваю себя, почему старые смещения не удаляются автоматически, когда я использую сжатую тему.Работает ли сжатие Кафки как сборщик мусора, который впоследствии удаляет данные?Что если я ищу ключ, получу ли я ключ с наибольшим смещением?
Сжатие выполняется в фоновом режиме, но не сразу.Также обратите внимание, что тематические (или, если быть более точными) разделы разделены на «сегмент», а активный сегмент никогда не сжимается (размер сегмента по умолчанию составляет 1 ГБ).Вы можете настроить размер сегмента и способ запуска его сжатия (подробнее см. В документах: https://kafka.apache.org/documentation/#compaction).
Что, если я ищу ключ, получу ли ключ с наибольшим смещением?
Не уверен, что вы подразумеваете под этим. Kafka разрешает только последовательное чтение, но не поиск ключей. Таким образом, вам нужно прочитать тему от начала до конца, чтобы найти последнюю версию ключаЕсли вы ссылаетесь на функцию «Интерактивные запросы» Kafka Streams, она будет запрашивать локальную RocksDB и, таким образом, содержать самую последнюю запись для каждого ключа.
Мой вопрос заключается в том, подходит ли мой подход для достиженияthis.
Да, с одной важной деталью, связанной с
В чем внутренняя разница между KGroupedStream и KGroupedTable?
Поскольку введенная вами тема является сжатой темой, в которой используются клавиши (userId,addressId)
, ее следует читать как table()
(не stream()
):
KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses =
builder.table("address-topic")
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupBy(...) //Select USERID as and group by USERID
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key
Разница в том, что если вы читаете темуKStreams
, интерпретируется как «факты», и, следовательно, семантика удаления отсутствует.Однако вводимая вами тема содержит записи «обновлений» и, следовательно, она должна быть потребительской как таковой.KGroupedStream
и KGroupedTable
являются просто промежуточными объектами в API и также подразумевают семантику "факт" против "обновления".Опять же, проверьте документы и другие материалы в Интернете для получения более подробной информации.