Я пытаюсь обернуть голову вокруг ручьев Кафки, и у меня возникают некоторые фундаментальные вопросы, которые я не могу решить самостоятельно. Я понимаю концепцию KTable
и государственных магазинов Кафки, но мне трудно решить, как к этому подойти. Я также использую Spring Cloud Streams, который добавляет к этому еще один уровень сложности.
Мой пример использования:
У меня есть механизм правил, который читает событие Кафки, обрабатывает событие , возвращает список правил, которые соответствуют, и записывает его в другую топи c. Вот что у меня есть:
@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
return input -> input.mapValues(this::analyze).filter((host, evaluation) -> evaluation != null);
}
public List<IndicatorEvaluation> analyze(final String host, final ProcessNode process) {
// Does stuff
}
Некоторые из правил с состоянием выглядят так:
[some condition] REPEATS 5 TIMES WITHIN 1 MINUTE
[some condition] FOLLOWEDBY [some condition] WITHIN 1 MINUTE
[rule A exists and rule B exists]
Моя текущая реализация хранит всю эту информацию в памяти, чтобы иметь возможность выполнять анализ. По понятным причинам это не легко масштабируется. Поэтому я решил сохранить это в государственном магазине Кафки.
Я не уверен в лучшем способе go об этом. Я знаю, что есть способ создавать собственные хранилища состояний, которые обеспечивают более высокий уровень гибкости. Я не уверен, что Kafka DSL поддержит это.
Все еще новичок в Kafka Streams и не прочь выслушать различные предложения.