Я использую Kafka Streams для чтения из темы в моем кластере и хочу фильтровать сообщения на основе содержимого JSON, то есть:
JSON Формат:
{
"id": 1
"timestamp": "2019-04-21 12:53:18",
"priority": "Medium",
"name": "Sample Text",
"metadata": [{
"metric_1": "0",
"metric_2": "1",
"metric_3": "2"
}]
}
Я хочу прочитать сообщения из входной темы (назовем это «input-topic»), отфильтровать их (предположим, я хочу только сообщения с приоритетом «Низкий»), затем объединить их и отправить в другую тему (»Filter-Topic ")
У меня не так много кода, кроме создания самого потока и его конфигураций.Я думаю, что в Serdes должно быть что-то, что мне нужно настроить, но я не уверен, как.Я также пытался использовать JSON deserializer, но не смог заставить его работать.
Прежде всего, это вообще возможно?Если да, то каков будет правильный курс действий?