Spring Boot / Kafka Json Deserialization - надежные пакеты - PullRequest
0 голосов
/ 10 мая 2018

Я только начинаю использовать Kafka с Spring Boot и хочу отправлять и использовать объекты JSON.

При попытке получить сообщение из темы Кафки я получаю следующую ошибку:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition dev.orders-0 at offset 9903. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'co.orders.feedme.feed.domain.OrderItem' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:218) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93) ~[kafka-clients-1.0.1.jar:na]

Я попытался добавить свой пакет в список доверенных пакетов, задав следующее свойство в application.properties:

spring.kafka.consumer.properties.spring.json.trusted.packages = co.orders.feedme.feed.domain

Похоже, это не имеет никакого значения. Как правильно добавить мой пакет в список доверенных пакетов для Spring Kafka JsonDeserializer?

Ответы [ 2 ]

0 голосов
/ 20 июня 2018

Поскольку у вас решена проблема с доверенным пакетом, для вашей следующей проблемы вы можете воспользоваться перегруженным

DefaultKafkaConsumerFactory(Map<String, Object> configs,
            Deserializer<K> keyDeserializer,
            Deserializer<V> valueDeserializer)

и JsonDeserializer «обертка» пружинной кафки

JsonDeserializer(Class<T> targetType, ObjectMapper objectMapper)

Объединяя вышесказанное, для Java у меня есть:

new DefaultKafkaConsumerFactory<>(properties,
                new IntegerDeserializer(),
                new JsonDeserializer<>(Foo.class,
                        new ObjectMapper()
                .registerModules(new KotlinModule(), new JavaTimeModule()).setSerializationInclusion(JsonInclude.Include.NON_NULL)
                .setDateFormat(new ISO8601DateFormat()).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))));

По сути, вы можете указать фабрике использовать ваши собственные десериализаторы, а для Json - свой собственный ObjectMapper. Там вы можете зарегистрировать модуль Kotlin, а также настроить форматы даты и другие вещи.

0 голосов
/ 10 мая 2018

Хорошо, я немного подробнее прочитал документацию и нашел ответ на свой вопрос. Я использую Kotlin, поэтому создание моего потребителя выглядит так с

@Bean
fun consumerFactory(): ConsumerFactory<String, FeedItem> {
    val configProps = HashMap<String, Any>()
    configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
    configProps[ConsumerConfig.GROUP_ID_CONFIG] = "feedme"
    configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
    configProps[JsonDeserializer.TRUSTED_PACKAGES] = "co.orders.feedme.feed.domain"
    return DefaultKafkaConsumerFactory(configProps)
}

Теперь мне просто нужен способ переопределить создание объекта Jackson ObjectMapper в JsonDeserializer, чтобы он мог работать с моими классами данных Kotlin, которые не имеют конструктора с нулевым аргументом:)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...