Как хранить только последние значения ключей в теме кафки - PullRequest
0 голосов
/ 12 февраля 2019

У меня есть тема, в которую поступает поток данных.Мне нужно создать отдельную тему из этой темы, которая имеет только последний набор значений с учетом ключей.

Я думал, что вся цель KTable заключается в том, что он будет хранить последнее значение с заданным ключом, а не сохранятьвесь поток событий.Однако я не могу заставить это работать.Выполнение приведенного ниже кода создает хранилище ключей, но в этом хранилище ключей (maintopiclatest) есть поток событий (а не только последние значения).Поэтому, если я отправляю запрос с 1000 записями в теме дважды, а не вижу 1000 записей, я вижу 2000 записей.

var serializer = new KafkaSpecificRecordSerializer();
var deserializer = new KafkaSpecificRecordDeserializer();

var stream = kStreamBuilder.stream("maintopic",
    Consumed.with(Serdes.String(), Serdes.serdeFrom(serializer, deserializer)));

var table = stream
    .groupByKey()
    .reduce((aggV, newV) -> newV, Materialized.as("maintopiclatest"));

Другая проблема заключается в том, что если я хочу сохранить KTable в новой теме, яЯ не уверен, как это сделать.Чтобы сделать это, мне кажется, что я должен превратить его обратно в поток, чтобы я мог вызвать «.to» на нем.Но тогда в нем есть весь поток событий, а не только последние значения.

1 Ответ

0 голосов
/ 12 февраля 2019

Это не то, как работает KTable.

Сам по себе KTable имеет внутреннее хранилище состояний и хранит ровно одну запись на ключ.Однако таблица KTable постоянно обновляется и подчиняется так называемой stream-table-duality .Каждое обновление таблицы KTable отправляется в нисходящем направлении в виде записи журнала изменений: https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables. Таким образом, каждая входная запись приводит к выходной записи.

Поскольку это обработка потока, отсутствует «последний ключ на значение».

У меня есть тема, в которую поступает поток данных.Мне нужно создать отдельную тему из этой темы, которая имеет только последний набор значений с указанными ключами.

В какой момент времени вы хотите, чтобы KTable генерировал обновление?На этот вопрос нет ответа, поскольку входной поток концептуально бесконечен.

...