Мониторинг размеров K-таблицы Kafka Streams - PullRequest
2 голосов
/ 01 апреля 2020

У меня есть потоковая топология, которая использует топи c, запускает агрегацию и создает KTable, который материализуется в породе.

У меня есть другое приложение, которое потребляет все события из той же топики c ежедневно, и отправляет надгробные сообщения о событиях, которые соответствуют определенным c критериям (т.е. они больше не нужны). Агрегация справляется с этим и удаляет из хранилищ состояний, но я смотрю на мониторинг либо размера хранилища состояний, либо журнала изменений topi c - всего, что действительно говорит мне о размере таблицы ktable.

Я раскрыл метрики JMX, но там нет ничего, что могло бы дать мне то, что мне нужно. Я могу видеть общее количество «положений» в rockDB, но не общее количество ключей. Мои приложения загружаются весной, и я хотел бы показать метрики через prometheus.

Кто-нибудь решил эту проблему или какие-нибудь идеи, которые могут помочь?

1 Ответ

2 голосов
/ 01 апреля 2020

Вы можете получить приблизительный счет в каждом разделе, используя доступ к базовому хранилищу состояний таблицы KTable, используя эту KeyValueStore#approximateNumEntries(), а затем экспортировать этот счет в prometheus (у каждого раздела есть один счет).

Для доступа к нижележащему хранилищу состояний вы можете использовать низкоуровневый API процессора, чтобы получить доступ к KeyValueStore через каждый ProcessorContext в каждой StreamTask (соответствует разделу). Просто добавьте KStream#transformValues() к вашей топологии:

kStream
        ...
        .transformValues(ExtractCountTransformer::new, "your_ktable_name")
        ...

И в ExtractCountTransformer извлеките счет в Прометей:

@Log4j2
public class ExtractCountTransformer implements ValueTransformerWithKey<String, String, String> {

    private KeyValueStore<String, String> yourKTableKvStore;
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        yourKTableKvStore = (KeyValueStore<String, String>) context.getStateStore("your_ktable_name");
    }

    @Override
    public String transform(String readOnlyKey, String value) {
        //extract count to prometheus
        log.debug("partition {} - approx count {}", context.partition(), yourKTableKvStore.approximateNumEntries());
        yourKTableKvStore.approximateNumEntries();
        return value;
    }

    @Override
    public void close() {

    }
}
...