Kafka Streams - агрегирование и объединение пользователей с адресами - PullRequest
0 голосов
/ 03 декабря 2018

У меня есть две уплотненные темы.Один содержит всю информацию о моем пользователе ( USERID ), а другой сохраняет их адреса ( USERID , ADRESSID ).В скобках указаны ключи.То, что я хочу, это сохранить только в одной теме пользовательские данные с их списком адресов.Мой подход таков:

KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses = adressStream
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupByKey(...) //Grouping by USERID as key - this generates KGroupedStream
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key 
В конце я делаю leftJoin для пользователя и aggregated_addresses через USERID и сохраняю результат в сжатой теме под названием user_addresses.

Я хочу добиться, чтобы сохранить вседанные с их адресами в user_addresses.Это означает, что я не хочу терять какие-либо адреса после определенного периода времени.Только если адрес был удален в БД.Мой вопрос: подходит ли мой подход для достижения этой цели?Мой прототип работает и сохраняет список адресов для каждого пользователя, но я спрашиваю себя, удалит ли KGroupedStream некоторые потоки через некоторое время или нет.

Может быть, кто-нибудь может объяснить мне подробно, как работает этот конвейер.Если поступает новый поток (адрес), он проходит через весь конвейер (selectKey, groupByKey, aggregate) и попадает в тему aggregated_addresses, где он сохраняется в виде списка адресов?Агрегат шагов использует этот оператор:

(user, address, queue) -> {...}

Используют ли потоки Kafka агрегатные адреса для заполнения очереди оператора выше?Если я получу новый поток .aggregate, будет ли Кафка искать соответствующие агрегированные списки в aggregated_addresses и заполнять очередь этими данными?Или он использует сгруппированные потоки .groupByKey, и каждый раз, когда приходит новый поток, весь сгруппированный поток отправляется для агрегирования?Если второе значение true, KGroupedStream удалит несколько потоков, например, через неделю?Если да, некоторые адреса будут отсутствовать в очереди?

В чем внутренняя разница между KGroupedStream и KGroupedTable?

Интересно, что результат после объединения (в уплотненной теме под названием user_addresses) имеет больше записей, чем пользователь таблицы.Я посмотрел глубже и увидел, что у пользователя с одним и тем же ключом есть несколько вхождений (несколько смещений).При наименьшем смещении этот пользователь не имеет адресов, затем при более высоком смещении он имеет один адрес в своем списке, а с наибольшим смещением он имеет два адреса в своем списке.Я снова спрашиваю себя, почему старые смещения не удаляются автоматически, когда я использую сжатую тему.Работает ли сжатие Кафки как сборщик мусора, который впоследствии удаляет данные?Что, если я ищу ключ, получу ли я ключ с наибольшим смещением?

Я прошу прощения за очень много вопросов, но, поскольку я все больше и больше работаю с потоками, некоторые вещи мне не ясны.

Заранее спасибо за помощь!:)

1 Ответ

0 голосов
/ 04 декабря 2018

Я спрашиваю себя, удалит ли KGroupedStream некоторые потоки через некоторое время или нет.

Ничего не удалит.

Если я понимаю остальную частьваш вопрос, вы спрашиваете, как работает оператор aggregate().Он использует локальное хранилище состояний (реализованное с использованием RocksDB) для хранения <userId, X> с X, независимо от того, что возвращает ваш UDF агрегирования ((user, address, queue) -> { }), т. Е. Это должно быть X == queue).Таким образом, каждая входная запись выполняет локальный поиск в RocksDB, чтобы извлечь текущий queue, обновить его, записать его обратно в RocksDB и отправить в нисходящий поток в ваш оператор to(), который также записывает его в тему результата.

Также ознакомьтесь с документацией для получения более подробной информации: https://kafka.apache.org/21/documentation/streams/ Существует также множество других материалов о потоках Kafka и о том, как они работают в Интернете (сообщения в блогах, записи разговоров, слайды ...)

Интересно, что результат после объединения (в компактной теме, называемой user_addresses) имеет больше записей, чем пользователь таблицы записей.Я посмотрел глубже и увидел, что у пользователя с одним и тем же ключом есть несколько вхождений (несколько смещений).При наименьшем смещении этот пользователь не имеет адресов, затем при более высоком смещении он имеет один адрес в своем списке, а с наибольшим смещением он имеет два адреса в своем списке.Я снова спрашиваю себя, почему старые смещения не удаляются автоматически, когда я использую сжатую тему.Работает ли сжатие Кафки как сборщик мусора, который впоследствии удаляет данные?Что если я ищу ключ, получу ли я ключ с наибольшим смещением?

Сжатие выполняется в фоновом режиме, но не сразу.Также обратите внимание, что тематические (или, если быть более точными) разделы разделены на «сегмент», а активный сегмент никогда не сжимается (размер сегмента по умолчанию составляет 1 ГБ).Вы можете настроить размер сегмента и способ запуска его сжатия (подробнее см. В документах: https://kafka.apache.org/documentation/#compaction).

Что, если я ищу ключ, получу ли ключ с наибольшим смещением?

Не уверен, что вы подразумеваете под этим. Kafka разрешает только последовательное чтение, но не поиск ключей. Таким образом, вам нужно прочитать тему от начала до конца, чтобы найти последнюю версию ключаЕсли вы ссылаетесь на функцию «Интерактивные запросы» Kafka Streams, она будет запрашивать локальную RocksDB и, таким образом, содержать самую последнюю запись для каждого ключа.

Мой вопрос заключается в том, подходит ли мой подход для достиженияthis.

Да, с одной важной деталью, связанной с

В чем внутренняя разница между KGroupedStream и KGroupedTable?

Поскольку введенная вами тема является сжатой темой, в которой используются клавиши (userId,addressId), ее следует читать как table() (не stream()):

KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses =
    builder.table("address-topic")
      .selectKey(...) //Selecting USERID as key - this generates KStream
      .groupBy(...) //Select USERID as and group by USERID
      .aggregate(...) //Aggregating by USERID as key - this generates KTable
      .to("aggregated_addresses"); //KTable with USERID as key 

Разница в том, что если вы читаете темуKStreams, интерпретируется как «факты», и, следовательно, семантика удаления отсутствует.Однако вводимая вами тема содержит записи «обновлений» и, следовательно, она должна быть потребительской как таковой.KGroupedStream и KGroupedTable являются просто промежуточными объектами в API и также подразумевают семантику "факт" против "обновления".Опять же, проверьте документы и другие материалы в Интернете для получения более подробной информации.

...