эффективно фильтровать Json сообщений от потребителя kafka - PullRequest
0 голосов
/ 10 июля 2020

Я читаю журнал Json объектов из потока Kafka. Вот формат каждого сообщения:

{"class": "abc.cdf", "object":{....}}

Меня интересует определенный «класс» сообщений, которые составляют только 10% от общего числа полученных сообщений. Как эффективно отфильтровать сообщение на основе этого поля без анализа всего json для каждого нового сообщения?

В настоящее время я использую ByteArraySerializer и ObjectMapper для синтаксического анализа до json pojo, а затем проверяю «класс» поле конкретно. Пример кода после каждого пакета сообщений, прочитанных из Kafka:

ObjectMapper mapper = new ObjectMapper();
for (record : records) {  
    MyRecord parsedRec = mapper.readValue(record, MyRecord.class);
    if (parsedRec == null || (!MYCLASSNAME.equals(parsedRec.getClass())))
       continue;
    ...
 }

Учитывая загруженность потока сообщений, я хочу потратить минимум времени на фильтрацию незаинтересованных сообщений.

1 Ответ

0 голосов
/ 10 июля 2020

Один из подходов - анализировать только поле class и игнорировать остальные. Таким образом, вы используете новый класс RecordClass, который содержит только поле class, и настраиваете преобразователь, чтобы он не выходил из строя при неизвестных свойствах (это object).

ObjectMapper mapper = new ObjectMapper()
      .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
RecordClass recordClass = mapper.readValue(record, RecordClass.class);

Затем вы анализируете полный MyRecord, только если RecordClass имеет правильный класс. Логически должно быть быстрее, но на практике вам нужно это протестировать.

Другой подход - отправить сообщения с определенным классом c другому topi c, поэтому вы в основном фильтруете на стороне производителя.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...