KSQL: получите самое распространенное слово за последний час - PullRequest
0 голосов
/ 20 марта 2019

У меня есть тема кафки, которая получает события с: {timestamp, word, channel_id}.

Мне нужно создать KSQL, который получит лучшие слова K, которые были произнесены в определенном канале за последние полчаса.

То, что я сделал до сих пор:

1 - создать канал для темы

CREATE STREAM WORDEVENTS WITH (KAFKA_TOPIC='words',VALUE_FORMAT='AVRO');

2 - отфильтровать нужный канал

CREATE STREAM FILTERED_WORDEVENTS WITH (KAFKA_TOPIC='words_in_mail', VALUE_FORMAT='AVRO') AS SELECT WORD FROM WORDEVENTS WHERE CHANNEL_ID LIKE 'mail';

И вот где я не понимаю, я могу сделать это:

SELECT WORD, COUNT(*) AS COUNT_TOTAL FROM FILTERED_WORDEVENTS WINDOW HOPPING (SIZE 30 MINUTES, ADVANCE BY 5 SECONDS) GROUP BY WORD;

И это работает нормально, но если я пытаюсь что-то сделать с помощью функции TOPK, это не работает:

SELECT WORD, topk(COUNT(*), 2) AS COUNT_TOTAL FROM FILTERED_WORDEVENTS WINDOW HOPPING (SIZE 30 MINUTES, ADVANCE BY 5 SECONDS) GROUP BY WORD;

Не удается с:

Caused by: Can't find any functions with the name 'COUNT'

Я попытался создать поток / таблицу для группы по событиям, а затем попытаться увеличить счет:

 CREATE TABLE COUNT_WORDS_LAST_HOUR AS SELECT WORD, COUNT(*) AS COUNT_TOTAL FROM FILTERED_WORDEVENTS WINDOW HOPPING (SIZE 30 MINUTES, ADVANCE BY 5 SECONDS) GROUP BY WORD;

Но он жалуется, что topK может быть применен к столу

Как можно решить этот вариант использования?

...