K SQL Group By, чтобы сбросить предыдущие значения и использовать только последние - PullRequest
0 голосов
/ 05 февраля 2020

У меня есть Kafka topi c «события», который записывает голоса пользователей и имеет json в следующей структуре:

{"category":"image","action":"vote","label":"amsterdam","ip":"1.1.1.1","value":2}

Мне нужно получить на другую топи c сумму всех голосов за метку (например, Амстердам), но отбросьте все голоса, которые пришли с того же IP-адреса, используя только последний голос. Эта topi c должна иметь json в следующем формате:

{label:”amsterdam”,SCORE:8,TOTAL:3}

SCORE - сумма всех голосов, а TOTAL - количество подсчитанных голосов.

Принятое мною решение создает поток из событий topi c:

CREATE STREAM st_events
  (CATEGORY STRING, ACTION STRING, LABEL STRING, VALUE BIGINT, IP STRING)
  WITH (KAFKA_TOPIC='events', VALUE_FORMAT='JSON');

Затем я создаю таблицу tb_votes, в которой рассчитывается оценка и общее количество для каждой метки и IP-адреса:

CREATE TABLE tb_votes WITH (KAFKA_TOPIC='tb_votes', PARTITIONS=1, REPLICAS=1) AS SELECT
  st_events.LABEL "label", SUM(st_events.VALUE-1) "score", CAST(COUNT(*) AS BIGINT) "total"
FROM st_events
WHERE
    st_events.category='image' AND st_events.action='vote'
GROUP BY st_events.label, st_events.ip
EMIT CHANGES;

Проблема в том, что вместо отбрасывания всех предыдущих голосов, приходящих с одного и того же IP-адреса для одного и того же изображения, Кафка использует их все. Это имеет смысл, поскольку это GROUP BY.

Любая идея, как "отбросить" все предыдущие голоса и использовать только самые последние значения для изображения / IP?

...