Spring Cloud Stream - Быстрое добавление xml ObjectNode в доверенный пакет kafka - PullRequest
0 голосов
/ 09 мая 2020

Я использую Spring-cloud-stream версии 3.0.4

Я пишу консолидатор JSON, который прослушивает несколько потоков, хранит JSON в хранилищах состояний, а затем объединяет их для создания вывод JSON. Поскольку мой сервис - это просто агрегатор JSON, я не хочу конвертировать JSONS в Java объекты. Итак, я пытаюсь опубликовать sh JSON из вышестоящих сервисов как JsonNode. Сделайте слияние и опубликуйте sh его на нисходящем топи c.

Чтобы добавить JsonNode в качестве надежного пакета, я объявил bean-компонент в своем классе приложения, как показано ниже.

@Bean
public KafkaHeaderMapper customKafkaHeaderMapper()
{
    DefaultKafkaHeaderMapper kafkaHeaderMapper = new DefaultKafkaHeaderMapper();
    kafkaHeaderMapper.addTrustedPackages("com.fasterxml.jackson.databind.node");
    return kafkaHeaderMapper;

}

и добавил следующую запись в свой application.yml

spring.application.name: stream-aggregator
spring.cloud.stream.bindings.formDataIn:
  destination: form-data
  contentType: application/json
spring.cloud.stream.kafka.streams.binder:
  configuration:
    default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
    default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
    spring.json.key.default.type: com.company.datamapper.domain.EmpUUID
    spring.json.value.default.type: com.fasterxml.jackson.databind.JsonNode
    commit.interval.ms: 1000
spring.cloud.stream.kafka.binder.headerMapperBeanName:
  customKafkaHeaderMapper

Однако эта конфигурация не работает, и я получаю следующую ошибку.

Caused by: java.lang.IllegalArgumentException: The class 'com.fasterxml.jackson.databind.node.ObjectNode' is not in the trusted packages: [java.util, java.lang, com.fasterxml.jackson.databind]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:125) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:99) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:425) ~[spring-kafka-2.3.7.RELEASE.jar:2.3.7.RELEASE]
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60) ~[kafka-streams-2.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) ~[kafka-streams-2.3.1.jar:?]

У меня 2 запроса

  1. Как исправить ошибку, указанную выше.
  2. Оптимален ли мой подход, или мне следует выполнять агрегирование по-другому.

1 Ответ

0 голосов
/ 10 мая 2020

Проблема в десериализаторе, а не в преобразователе заголовков ...

at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:425)

spring.json.value.default.type: com.fasterxml.jackson.databind.JsonNode

Мы автоматически "доверяем" пакет для тип по умолчанию, но подпакеты не являются доверенными.

Вам нужно добавить пакет ...node в

spring.json.trusted.packages

в конфигурациях связующего.

Вероятно, мы должны поддержка доверенных подпакетов; не стесняйтесь открывать вопрос на GitHub против spring-kafka .

...