как обработать сообщение, полученное в кафке, с помощью пружинного интеграционного преобразователя - PullRequest
0 голосов
/ 27 августа 2018

Я отправляю данные ниже в kafka и получаю через каналы интеграции пружины и преобразую в объект Log , как мне преобразовать данные ниже в объект Log с помощью преобразователя интеграции пружины?ценим любую помощь здесь

'Log (clientKey = строка, полезная нагрузка = строка)'

Вот код адаптера канала

@Bean
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
            kafkaListenerContainer());
    kafkaMessageDrivenChannelAdapter.setPayloadType(Log.class);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(inputChannel());
    return kafkaMessageDrivenChannelAdapter;
}

, когдая пытаюсь преобразовать в активаторе службы, используя ниже

 ObjectMapper objectMapper = new ObjectMapper();
 Log msg = objectMapper.readValue(arg0.getPayload().toString() , Log.class);

его сбой с

com.fasterxml.jackson.core.JsonParseException: нераспознанный токен 'Log': ожидал ('true', 'ложь' или 'ноль')

1 Ответ

0 голосов
/ 27 августа 2018

Прежде всего 'Log(clientKey=string, payload=string)' не выглядит как правильно сформированный JSON, как вы хотели бы преобразовать позже в Spring Integration.

Еще одна проблема, которую вы делаете kafkaMessageDrivenChannelAdapter.setPayloadType(Log.class);.Таким образом, вам не нужно преобразование в нисходящем направлении, и MessagingMessageConverter в IntegrationRecordMessageListener сделает работу по преобразованию за нас.

Однако нам все еще нужно иметь надлежащие данные, чтобы иметь возможность конвертировать из.

Вам также нужно помнить, что Apache Kafka имеет свои собственные механизмы для десериализации byte[] изпровод.См. Spring для Apache Kafka для получения дополнительной информации о преобразовании и (де) сериализации: https://docs.spring.io/spring-kafka/docs/current/reference/html/_reference.html#serdes

Для правильного преобразования JSON Spring Integration предоставляет компонент JsonToObjectTransformer, который можно использовать с аннотацией @Transformer:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-transformation-chapter.html#_common_transformers

ОБНОВЛЕНИЕ

@Transformer(inputChannel = "inputChannel", outputChannel = "processChannel")
@Bean
public JsonToObjectTransformer jsonToObjectTransformer() {
   return new JsonToObjectTransformer(Log.class);
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...