Невозможно указать экземпляр valueSerializer для Spring KafkaTemplate - PullRequest
0 голосов
/ 12 мая 2018

Мне нужно установить сериализатор пользовательских значений в KafkaTemplate Spring . Значение сериализатора выглядит следующим образом: JsonSerializer<JourneyMailExchange> serializer = new JsonSerializer<>(customObjectMapper); (например, для применения PropertyNamingStrategy.SNAKE_CASE)

это легко сделать с KafkaProducer:

new KafkaProducer<>(properties, Serdes.String().serializer(), valueSerializer);

но я не нашел возможности сделать то же самое с KafkaTemplate. Я вижу только props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); свойство для ProducerFactory, но это не то, что я ищу (невозможно предоставить конкретный экземпляр).

Мы создаем KafkaTemplate следующим образом:

@Bean
public ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
    return new DefaultKafkaProducerFactory<>(producerConfigs(kafkaProperties.getDefaultSettings()));
}

private static Map<String, Object> producerConfigs(Map<String, String> defaultSettings) {
    Map<String, Object> props = new HashMap<>(defaultSettings);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
    kafkaTemplate.setDefaultTopic("topicName");
    return kafkaTemplate;
}

Также найден следующий метод:

kafkaTemplate.setMessageConverter(new StringJsonMessageConverter(customObjectMapper));

, но он применяет преобразование, только если мы работаем с экземпляром Message Spring и игнорируем предоставленное преобразование, если мы вызываем методы отправки с ключом и значением.

spring-kafka версия: 2.1.0.RELEASE

1 Ответ

0 голосов
/ 13 мая 2018

Наконец, я понял, что это можно сделать с помощью настройки нескольких DefaultKafkaProducerFactory, предоставив сериализатор значений в конструктор:

public DefaultKafkaProducerFactory(Map<String, Object> configs, Serializer<K> keySerializer,
        Serializer<V> valueSerializer)

финальная версия для сериализатора с определенным значением выглядит следующим образом (количество настроенных ProducerFactory равно количеству необходимых сериализаторов различных значений):

@Bean
public ProducerFactory<String, Object> producerFactoryWithSnakeCaseValueSerializer(KafkaProperties kafkaProperties) {
    Map<String, Object> props = producerConfigsWithSnakeCaseValueSerializer(kafkaProperties.getDefaultSettings());
    JsonSerializer<Object> valueSerializer = new JsonSerializer<>(createObjectMapper(SNAKE_CASE));
    valueSerializer.setAddTypeInfo(false);
    return new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);
}

private static Map<String, Object> producerConfigsWithSnakeCaseValueSerializer(Map<String, String> defaultSettings) {
    Map<String, Object> props = new HashMap<>(defaultSettings);
    ...
    return props;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerConfigsWithSnakeCaseValueSerializer) {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerConfigsWithSnakeCaseValueSerializer);
    kafkaTemplate.setDefaultTopic("topicName");
    return kafkaTemplate;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...