Как указать тип класса при использовании весеннего шаблона kafka? - PullRequest
0 голосов
/ 11 октября 2018

Моя текущая настройка имеет следующую конфигурацию:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myKafkaListenerContainerFactory(
        ConsumerFactory<String, String> consumerFactory) {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new
            ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setMessageConverter(stringJsonMessageConverter());

    return factory;
}

, где stringJsonMessageConverter имеет

@Bean
public StringJsonMessageConverter stringJsonMessageConverter() {
    return new StringJsonMessageConverter(objectMapper());
}

для использования моего сопоставителя объектов

@Bean
public ObjectMapper objectMapper() {
    return new ObjectMapper()
            .registerModule(new JavaTimeModule())
            .registerModule(myCustomJacksonModules())
            .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
            .configure(ACCEPT_SINGLE_VALUE_AS_ARRAY, true)
            .configure(WRITE_DATES_AS_TIMESTAMPS, false);
}

С этимконфигурацию, я могу опубликовать как:

...
headers = new MessageHeaders(singletonMap(TOPIC, topic));
Foo foo = ....
Message<?> message = new GenericMessage<>(foo, headers);
kafkaTemplate.send(message);

и потреблять как:

@KafkaListener(topics = "myTopic",
        groupId = "g1",
        containerFactory = "myKafkaListenerContainerFactory")
public void onMessageReceived(Foo foo) {
    ... works with foo here
}

Это прекрасно работает в этом примере, если Foo является конкретным классом.Однако, если Foo является абстрактным и то, что отправляется как сообщение, это его подклассы

F1 extends Foo; F2 extends Foo ...

... // publisher

headers = new MessageHeaders(singletonMap(TOPIC, topic));
F1 f1 = ....
Message<?> message = new GenericMessage<>(f1, headers);
kafkaTemplate.send(message);

... // Listener

@KafkaListener(topics = "myTopic",
        groupId = "g1",
        containerFactory = "myKafkaListenerContainerFactory")
public void onMessageReceived(Foo foo) {
    ... wont work
}

Объявление Foo в моем потребителе как типе не удастся, но это будет работать:

@KafkaListener(topics = "myTopic",
        groupId = "g1",
        containerFactory = "myKafkaListenerContainerFactory")
public void onMessageReceived(Map<String,Object> fooAsMap) {
    ... this works too
}

Помимоупомянутая выше опция, которая заключается в использовании карты, я также попробовал следующее:

- class level kafka listener ([from here)][1]; it still requires method with param of type Map
- custom deserializer registered in object mapper?

Есть ли способ указать тип цели при отправке сообщения?

1 Ответ

0 голосов
/ 11 октября 2018

Сконфигурируйте StringJsonMessageConverter с пользовательским DefaultJackson2JavaTypeMapper, где он имеет setTypePrecedence(TypePrecedence.TYPE_ID).

Также установите сопоставления типов для сопоставления с желаемым классом на основе заголовка идентификатора отправляющего типа (требуется толькоесли класс производителя отличается от класса потребителя).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...