Как запросить последнее значение записи в теме apache kafka - PullRequest
0 голосов
/ 05 июля 2019

Я новичок в Кафке и, может быть, это очень легко.Но я не мог найти решение проблемы, с которой я сейчас сталкиваюсь.У меня есть тема Кафки metric_32, и я хочу найти последнее значение для ключа user_abc.Как это возможно в Кафке.

Я пробовал с KStream, но он подписывается, когда в теме появляется новое событие.Но я хочу запросить уже полученное последнее значение ключа.Любой пример будет полезен.

Ответы [ 2 ]

1 голос
/ 05 июля 2019

Вы можете использовать хранилище состояний (если вы используете потоки Kafka), а затем добавить к нему процессор, который обновляет хранилище состояний каждый раз, когда в тему добавляется новое значение.

builder.addGlobalStore(storeBuilder, topic, Consumed.with(keySerde, valueSerde), return new Processor<K,V>() {
    private KeyValueStore<K,V> store;

    public void init(ProcessorContext context) {
        store=(KeyValueStore<K,V>) context.getStateStore("statestorename");
    }

    public void process(K key, V value) {
        store.put(key,value);
    }

    public void close() {}
});

и тогда вы можете использовать

readOnlyStore=streams.store("statestorename", QueryableStoreTypes.keyValueStore());
readOnlyStore.get("key");
0 голосов
/ 06 июля 2019

Если для какой-либо темы вас всегда интересует последнее значение для конкретного ключа, вы можете установить log.cleanup.policy=compact.Таким образом, вы всегда будете иметь только одну запись на ключ.Если вы создадите 5 сообщений с одинаковым идентификатором, в кафке останется только последнее.Таким образом, вы можете улучшить использование диска, если у вас много сообщений с одним и тем же ключом.Вы можете прочитать больше здесь: https://dzone.com/articles/kafka-architecture-log-compaction

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...