my topi c данные (упрощенно) выглядят следующим образом:
Key: 12340,
Value:
{
"sessionid": 12340,
"amount": 123,
"finished": false
}
Каждое сообщение представляет изменение сеанса (не снимок). Сеанс может иметь несколько сообщений, но всегда будет завершаться сообщением, имеющим "finished":true
(все остальные сообщения будут иметь "finished":false
). В день могут быть сотни миллионов сеансов, и сеанс обычно заканчивается в течение нескольких минут, но некоторые из них могут занять до недели до 1035 *. Количество открытых сессий ограничено (примерно до 20000 одновременных сессий).
В результате обработки я хотел бы получить топи c с одним сообщением за сессию, содержащим сумму всех суммы.
Решение, о котором я думаю:
Исходный поток:
CREATE STREAM source (sessionid BIGINT, amount BIGINT, finished BOOLEAN)
WITH (KAFKA_TOPIC='source', VALUE_FORMAT='JSON');
Расчет агрегатов, количество сообщений соответствует источнику:
CREATE TABLE session_total_table
WITH (KAFKA_TOPIC='session_total_table', VALUE_FORMAT='JSON', PARTITIONS=60)
AS
SELECT sessionid, sum(amount) as total, max(case when finished=true then 1 else 0 end) as finished
FROM source
WINDOW SESSION (7 days)
GROUP BY sessionid;
Чтение изменений таблицы, созданных выше, в виде потока:
CREATE STREAM session_total_stream (sessionid BIGINT, total BIGINT, finished INT)
WITH (KAFKA_TOPIC='session_total_table', VALUE_FORMAT='JSON');
Фильтрация только сообщений, которые представляют законченный сеанс:
CREATE STREAM session_final
WITH (KAFKA_TOPIC='session_final', VALUE_FORMAT='JSON', PARTITIONS=60)
AS
SELECT sessionid, total
FROM session_total_stream
WHERE finished = 1
PARTITION BY sessionid;
Мои проблемы / вопросы относительно этого решения:
- Сможет ли оконная функция обрабатывать сотни миллионов одновременно windows?
- Мне не нравится, что мне нужно держать окно группировки открытым в течение 7 дней, хотя в В подавляющем большинстве случаев окно может быть закрыто в течение нескольких секунд / минут. Есть ли способ закрыть окно на основе поля
finished
? - Будет ли таблица способна обрабатывать неограниченный (непрерывно растущий с высокой скоростью) набор ключей? Мне не нужна эта таблица поиска, мне просто нужно средство для агрегирования по идентификатору сессии.