Как отфильтровать события из Kafka Stream на основе его содержимого JSON - PullRequest
0 голосов
/ 21 апреля 2019

Я использую Kafka Streams для чтения из темы в моем кластере и хочу фильтровать сообщения на основе содержимого JSON, то есть:

JSON Формат:

{
   "id": 1 
   "timestamp": "2019-04-21 12:53:18", 
   "priority": "Medium", 
   "name": "Sample Text",
   "metadata": [{
      "metric_1": "0", 
      "metric_2": "1", 
      "metric_3": "2"
   }]
}

Я хочу прочитать сообщения из входной темы (назовем это «input-topic»), отфильтровать их (предположим, я хочу только сообщения с приоритетом «Низкий»), затем объединить их и отправить в другую тему (»Filter-Topic ")

У меня не так много кода, кроме создания самого потока и его конфигураций.Я думаю, что в Serdes должно быть что-то, что мне нужно настроить, но я не уверен, как.Я также пытался использовать JSON deserializer, но не смог заставить его работать.

Прежде всего, это вообще возможно?Если да, то каков будет правильный курс действий?

1 Ответ

1 голос
/ 22 апреля 2019

Вы можете создать поток из темы.

    StreamsBuilder builder = new StreamsBuilder();

    // key value type here is both String for me and update based on cases
    KStream<String, String> source = builder.stream("input-topic");

    source.filter(new Predicate<String, String>() {
        @Override
        public boolean test(String s, String s2) {
            // your filter logic here and s and s2 are key/value from topic
            // In your case, s2 should be type of your json Java object
            return false;
        }
    }).groupBy(new KeyValueMapper<String, String, String>() {
        @Override
        public String apply(String key, String value) {
            // your group by logic
            return null;
        }
    }).count().toStream().to("new topic");
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...