Поскольку вы не указали конкретного клиента, я покажу вам, как это можно сделать с помощью ksqlDB и недавно добавленной функции LATEST_BY_OFFSET
.
Сначала я заполняю topi c исходными данными:
kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "user123@xyz.com" }
1:{ "userid": 1, "email": "user456@xyz.com" }
1:{ "userid": 1, "email": "user789@xyz.com" }
EOF
Затем в модели ksqlDB это сначала поток событий:
ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> SET 'auto.offset.reset' = 'earliest'; [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY |USERID |EMAIL |
+---------+---------+-----------------+
|1 |1 |user123@xyz.com |
|1 |1 |user456@xyz.com |
|1 |1 |user789@xyz.com |
Сейчас мы можем сказать ksqlDB взять этот поток событий и дать нам только самое последнее значение (на основе смещения), либо напрямую:
ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID |KSQL_COL_1 |
+--------------------+--------------------+
|1 |user789@xyz.com |
Press CTRL-C to interrupt
или, что более полезно, в качестве материализованного состояния в ksqlDB:
CREATE TABLE USER_LATEST_STATE AS
SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL
FROM USER_UPDATES
GROUP BY USERID
EMIT CHANGES;
Эта таблица по-прежнему управляется изменениями в топике Кафки c, но может быть запрошена напрямую для текущего состояния , либо на данный момент («запрос запроса»):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL |
+--------------------+
|user789@xyz.com |
Query terminated
ksql>
или в виде потока изменений по мере развития состояния («pu sh query»):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL |
+--------------------+
|user789@xyz.com |
[ query continues indefinitely ]