Сохранение состояния в Kafka с использованием потоков Kafka - PullRequest
1 голос
/ 18 февраля 2020

Я пытаюсь обернуть голову вокруг ручьев Кафки, и у меня возникают некоторые фундаментальные вопросы, которые я не могу решить самостоятельно. Я понимаю концепцию KTable и государственных магазинов Кафки, но мне трудно решить, как к этому подойти. Я также использую Spring Cloud Streams, который добавляет к этому еще один уровень сложности.

Мой пример использования:

У меня есть механизм правил, который читает событие Кафки, обрабатывает событие , возвращает список правил, которые соответствуют, и записывает его в другую топи c. Вот что у меня есть:

@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
    return input -> input.mapValues(this::analyze).filter((host, evaluation) -> evaluation != null);
}

public List<IndicatorEvaluation> analyze(final String host, final ProcessNode process) {
    // Does stuff
}

Некоторые из правил с состоянием выглядят так:

[some condition] REPEATS 5 TIMES WITHIN 1 MINUTE
[some condition] FOLLOWEDBY [some condition] WITHIN 1 MINUTE
[rule A exists and rule B exists]

Моя текущая реализация хранит всю эту информацию в памяти, чтобы иметь возможность выполнять анализ. По понятным причинам это не легко масштабируется. Поэтому я решил сохранить это в государственном магазине Кафки.

Я не уверен в лучшем способе go об этом. Я знаю, что есть способ создавать собственные хранилища состояний, которые обеспечивают более высокий уровень гибкости. Я не уверен, что Kafka DSL поддержит это.

Все еще новичок в Kafka Streams и не прочь выслушать различные предложения.

1 Ответ

2 голосов
/ 19 февраля 2020

Из описания, которое вы дали, я полагаю, что этот вариант использования все еще может быть реализован с использованием DSL в Kafka Streams. Код, который вы показали выше, не отслеживает состояние. В вашей топологии вам необходимо добавить состояние, отслеживая количество правил и сохраняя их в хранилище состояний. Тогда вам нужно только отправить правила вывода, когда это количество достигает порогового значения. Вот общая идея этого псевдокода. Очевидно, что вы должны настроить это, чтобы удовлетворить конкретные спецификации вашего варианта использования.

@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
    return input -> input
                     .mapValues(this::analyze)
                     .filter((host, evaluation) -> evaluation != null)
                     ...
                     .groupByKey(...)
                     .windowedBy(TimeWindows.of(Duration.ofHours(1)))
                     .count(Materialized.as("rules"))
                     .filter((key, value) -> value > 4)
                     .toStream()
                    ....
}
...