У меня есть Kafka topi c «события», который записывает голоса пользователей и имеет json в следующей структуре:
{"category":"image","action":"vote","label":"amsterdam","ip":"1.1.1.1","value":2}
Мне нужно получить на другую топи c сумму всех голосов за метку (например, Амстердам), но отбросьте все голоса, которые пришли с того же IP-адреса, используя только последний голос. Эта topi c должна иметь json в следующем формате:
{label:”amsterdam”,SCORE:8,TOTAL:3}
SCORE - сумма всех голосов, а TOTAL - количество подсчитанных голосов.
Принятое мною решение создает поток из событий topi c:
CREATE STREAM st_events
(CATEGORY STRING, ACTION STRING, LABEL STRING, VALUE BIGINT, IP STRING)
WITH (KAFKA_TOPIC='events', VALUE_FORMAT='JSON');
Затем я создаю таблицу tb_votes, в которой рассчитывается оценка и общее количество для каждой метки и IP-адреса:
CREATE TABLE tb_votes WITH (KAFKA_TOPIC='tb_votes', PARTITIONS=1, REPLICAS=1) AS SELECT
st_events.LABEL "label", SUM(st_events.VALUE-1) "score", CAST(COUNT(*) AS BIGINT) "total"
FROM st_events
WHERE
st_events.category='image' AND st_events.action='vote'
GROUP BY st_events.label, st_events.ip
EMIT CHANGES;
Проблема в том, что вместо отбрасывания всех предыдущих голосов, приходящих с одного и того же IP-адреса для одного и того же изображения, Кафка использует их все. Это имеет смысл, поскольку это GROUP BY.
Любая идея, как "отбросить" все предыдущие голоса и использовать только самые последние значения для изображения / IP?