K SQL агрегации с неограниченным пространством клавиш - PullRequest
0 голосов
/ 29 апреля 2020

my topi c данные (упрощенно) выглядят следующим образом:

Key: 12340,
Value:
{
  "sessionid": 12340,
  "amount": 123,
  "finished": false
}

Каждое сообщение представляет изменение сеанса (не снимок). Сеанс может иметь несколько сообщений, но всегда будет завершаться сообщением, имеющим "finished":true (все остальные сообщения будут иметь "finished":false). В день могут быть сотни миллионов сеансов, и сеанс обычно заканчивается в течение нескольких минут, но некоторые из них могут занять до недели до 1035 *. Количество открытых сессий ограничено (примерно до 20000 одновременных сессий).

В результате обработки я хотел бы получить топи c с одним сообщением за сессию, содержащим сумму всех суммы.

Решение, о котором я думаю:

Исходный поток:

CREATE STREAM source (sessionid BIGINT, amount BIGINT, finished BOOLEAN) 
WITH (KAFKA_TOPIC='source', VALUE_FORMAT='JSON');

Расчет агрегатов, количество сообщений соответствует источнику:

CREATE TABLE session_total_table
WITH (KAFKA_TOPIC='session_total_table', VALUE_FORMAT='JSON', PARTITIONS=60)
AS 
SELECT sessionid, sum(amount) as total, max(case when finished=true then 1 else 0 end) as finished
FROM source
WINDOW SESSION (7 days)
GROUP BY sessionid;

Чтение изменений таблицы, созданных выше, в виде потока:

CREATE STREAM session_total_stream (sessionid BIGINT, total BIGINT, finished INT) 
WITH (KAFKA_TOPIC='session_total_table', VALUE_FORMAT='JSON');

Фильтрация только сообщений, которые представляют законченный сеанс:

CREATE STREAM session_final
WITH (KAFKA_TOPIC='session_final', VALUE_FORMAT='JSON', PARTITIONS=60)
AS 
SELECT sessionid, total
FROM session_total_stream
WHERE finished = 1
PARTITION BY sessionid;

Мои проблемы / вопросы относительно этого решения:

  1. Сможет ли оконная функция обрабатывать сотни миллионов одновременно windows?
  2. Мне не нравится, что мне нужно держать окно группировки открытым в течение 7 дней, хотя в В подавляющем большинстве случаев окно может быть закрыто в течение нескольких секунд / минут. Есть ли способ закрыть окно на основе поля finished?
  3. Будет ли таблица способна обрабатывать неограниченный (непрерывно растущий с высокой скоростью) набор ключей? Мне не нужна эта таблица поиска, мне просто нужно средство для агрегирования по идентификатору сессии.
...