Вы можете получить приблизительный счет в каждом разделе, используя доступ к базовому хранилищу состояний таблицы KTable, используя эту KeyValueStore#approximateNumEntries()
, а затем экспортировать этот счет в prometheus (у каждого раздела есть один счет).
Для доступа к нижележащему хранилищу состояний вы можете использовать низкоуровневый API процессора, чтобы получить доступ к KeyValueStore
через каждый ProcessorContext в каждой StreamTask (соответствует разделу). Просто добавьте KStream#transformValues()
к вашей топологии:
kStream
...
.transformValues(ExtractCountTransformer::new, "your_ktable_name")
...
И в ExtractCountTransformer извлеките счет в Прометей:
@Log4j2
public class ExtractCountTransformer implements ValueTransformerWithKey<String, String, String> {
private KeyValueStore<String, String> yourKTableKvStore;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
yourKTableKvStore = (KeyValueStore<String, String>) context.getStateStore("your_ktable_name");
}
@Override
public String transform(String readOnlyKey, String value) {
//extract count to prometheus
log.debug("partition {} - approx count {}", context.partition(), yourKTableKvStore.approximateNumEntries());
yourKTableKvStore.approximateNumEntries();
return value;
}
@Override
public void close() {
}
}