Потребление строки и JSON сообщения от одной и той же kafka topi c - проблема с десериализацией - PullRequest
1 голос
/ 13 февраля 2020

JSON Сообщения потребляются строковым потребителем. Моя продукция отправляет два типа сообщений: Strings и Serialized JSON

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}", containerFactory = "${kafka.string-listener-container-factory}")
public void consume(@NotNull ConsumerRecord<String, String> cr, @Payload String payload) {
    log.debug("Received asset id: {}, with key: {}, Partition: {}, Offset: {}  ", payload, cr.key(), cr.partition(),
            cr.offset());

}


@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}", containerFactory = "${kafka.json-listener-container-factory}")
public void consumeAssetEvent(@NotNull ConsumerRecord<String, Event> cr, @Payload Event payload) {
    log.debug("Received asset id: {}, with key: {}, Partition: {}, Offset: {}  ", payload, cr.key(), cr.partition(),
            cr.offset());

}

на стороне потребителя. У меня есть два потребителя: 1. прослушивание строкового сообщения 2. прослушивание json и десериализация объекта

Даже сообщения json потребляются слушателем String.

1 Ответ

1 голос
/ 13 февраля 2020

У потребителей нет функции фильтрации, которая различает guish сообщений в соответствии с заданным вами Serde (например, строка, json et c). Когда производитель отправляет сообщение, оно конвертируется в byte [] в kafka topi c. Этот байт [] затем десериализуется настройкой десериализации потребителя.

Таким образом, не существует способа по умолчанию фильтровать строковые сообщения для потребителя строки и от json до json потребителя. Либо создайте прослушиватель, который получит все данные, и проверьте, является ли он json или нет, либо измените topi c для строки и json (отправьте строку в одну топику c и json в другую и соответственно используйте).

...