У меня есть тема кафки, которая получает события с: {timestamp, word, channel_id}
.
Мне нужно создать KSQL, который получит лучшие слова K, которые были произнесены в определенном канале за последние полчаса.
То, что я сделал до сих пор:
1 - создать канал для темы
CREATE STREAM WORDEVENTS WITH (KAFKA_TOPIC='words',VALUE_FORMAT='AVRO');
2 - отфильтровать нужный канал
CREATE STREAM FILTERED_WORDEVENTS WITH (KAFKA_TOPIC='words_in_mail', VALUE_FORMAT='AVRO') AS SELECT WORD FROM WORDEVENTS WHERE CHANNEL_ID LIKE 'mail';
И вот где я не понимаю, я могу сделать это:
SELECT WORD, COUNT(*) AS COUNT_TOTAL FROM FILTERED_WORDEVENTS WINDOW HOPPING (SIZE 30 MINUTES, ADVANCE BY 5 SECONDS) GROUP BY WORD;
И это работает нормально, но если я пытаюсь что-то сделать с помощью функции TOPK, это не работает:
SELECT WORD, topk(COUNT(*), 2) AS COUNT_TOTAL FROM FILTERED_WORDEVENTS WINDOW HOPPING (SIZE 30 MINUTES, ADVANCE BY 5 SECONDS) GROUP BY WORD;
Не удается с:
Caused by: Can't find any functions with the name 'COUNT'
Я попытался создать поток / таблицу для группы по событиям, а затем попытаться увеличить счет:
CREATE TABLE COUNT_WORDS_LAST_HOUR AS SELECT WORD, COUNT(*) AS COUNT_TOTAL FROM FILTERED_WORDEVENTS WINDOW HOPPING (SIZE 30 MINUTES, ADVANCE BY 5 SECONDS) GROUP BY WORD;
Но он жалуется, что topK может быть применен к столу
Как можно решить этот вариант использования?