Можно ли получить последнее значение для ключа сообщения из сообщений Кафки? - PullRequest
2 голосов
/ 01 апреля 2020

Предположим, у меня разные значения для одного и того же ключа сообщения.

Например:

{
userid: 1,
email: user123@xyz.com }

{
userid: 1,
email: user456@xyz.com }

{
userid: 1,
email: user789@xyz.com }

В этом случае я хочу, чтобы пользователь обновил только последнее значение, то есть 'user789@xyz.com'.

Мой поток kafka должен дать мне только третье значение, а не предыдущие 2 значения.

Ответы [ 3 ]

3 голосов
/ 01 апреля 2020

Поскольку вы не указали конкретного клиента, я покажу вам, как это можно сделать с помощью ksqlDB и недавно добавленной функции LATEST_BY_OFFSET.

Сначала я заполняю topi c исходными данными:

kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "user123@xyz.com" }
1:{ "userid": 1, "email": "user456@xyz.com" }
1:{ "userid": 1, "email": "user789@xyz.com" }
EOF

Затем в модели ksqlDB это сначала поток событий:

ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> SET 'auto.offset.reset' = 'earliest';                                                                                                                                                                                                                                         [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY   |USERID   |EMAIL            |
+---------+---------+-----------------+
|1        |1        |user123@xyz.com  |
|1        |1        |user456@xyz.com  |
|1        |1        |user789@xyz.com  |

Сейчас мы можем сказать ksqlDB взять этот поток событий и дать нам только самое последнее значение (на основе смещения), либо напрямую:

ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID              |KSQL_COL_1          |
+--------------------+--------------------+
|1                   |user789@xyz.com     |

Press CTRL-C to interrupt

или, что более полезно, в качестве материализованного состояния в ksqlDB:

CREATE TABLE USER_LATEST_STATE AS 
    SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL 
      FROM USER_UPDATES 
     GROUP BY USERID 
     EMIT CHANGES;

Эта таблица по-прежнему управляется изменениями в топике Кафки c, но может быть запрошена напрямую для текущего состояния , либо на данный момент («запрос запроса»):

ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL               |
+--------------------+
|user789@xyz.com     |
Query terminated
ksql>

или в виде потока изменений по мере развития состояния («pu sh query»):

ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL               |
+--------------------+
|user789@xyz.com     |

[ query continues indefinitely ]

asciicast

1 голос
/ 01 апреля 2020

Кажется, что вы хотите буферизовать записи перед дальнейшей обработкой. Начиная с потоковой передачи, у вас есть постоянно растущие, бесконечные наборы данных, так что вы никогда не знаете, будете ли вы ждать больше записей или flu sh буфер для дальнейшей обработки. Можете ли вы добавить более подробную информацию о том, как вы будете обрабатывать эти записи?

Вы можете ввести дополнительный параметр, который является максимальным временем ожидания перед очисткой буфера. Чтобы заархивировать это, вы можете либо использовать окно сеанса или акробатическое окно , либо использовать кэш записей в сочетании с интервалом фиксации , либо вы также можете реализовать его с помощью API процессора низкого уровня Kafka .

Вот пример кода, показывающий, как вы можете заархивировать его с помощью окна Tumbling, чтобы агрегировать и подавлять всю информацию userId в 1-часовом временном окне, принимать события, которые опаздывают на 10 минут, а затем отправлять подавленные события в нисходящий поток. процессор (если вы используете это, вы не сможете получить окончательные результаты до наступления нового события):

userInfoKStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(10)))
    .aggregate(() -> "", (userId, newValue, currentValue) -> newValue)
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    .foreach((userId, value) -> {});
0 голосов
/ 01 апреля 2020

Вам нужно Kafka log compaction. Короче говоря, если вы хотите, чтобы ваш topi c сохранял только последнее значение для указанного ключа c, вам следует установить свойство log.cleanup.policy=compact. Вы можете найти больше об этом здесь .

...