Axon 4.3 - потреблять сообщения от topi c kafka - PullRequest
0 голосов
/ 29 апреля 2020

Я использую версию Axon (4.3), которая беспрепятственно поддерживает Kafka с аннотацией в классе SpringBoot Main, используя

@SpringBootApplication(exclude = org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.class)

В моем случае сообщение успешно помещается в topi c, но проблема с этим потребителем, я не могу потреблять сообщение от topi c.

y at-il uns config manquante?

 <dependency>
   <groupId>org.axonframework</groupId>
   <artifactId>axon-spring-boot-starter</artifactId>
   <version>4.3</version>
   <exclusions>
     <exclusion>
        <groupId>org.axonframework</groupId>
        <artifactId>axon-server-connector</artifactId>
     </exclusion>
   </exclusions>
 </dependency>
 <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
 </dependency>
 <dependency>
   <groupId>org.axonframework.extensions.kafka</groupId>
   <artifactId>axon-kafka-spring-boot-starter</artifactId>
   <version>4.0-RC3</version>
 </dependency>

application.yml

axon:
  eventhandling:
    processors:
      conventions:
        source: kafkaMessageSource
        mode: tracking
  serializer:
    general: jackson
  kafka:
    client-id: consumer_service
    default-topic: topic_x
    bootstrap-servers:
    - 127.0.0.1:9092

Слушатель. java

//@Component
@ProcessingGroup(value = "conventions")
public class Listener {

 private static final Logger LOGGER = LoggerFactory.getLogger(GenericListener.class);

 @EventHandler
 void on(ConventionCreatedEvent event) {
  LOGGER.info("got the event {}", event);
 }

}

1 Ответ

0 голосов
/ 30 апреля 2020

Я думаю, что это имя источника сообщений Кафки в вашей конфигурации, которое сейчас является виновником, Аймен.

При использовании автоматической конфигурации Axon и автоматической конфигурации Axon-Kafka без каких-либо В зависимости от того, какой тип сообщения Kafka вы хотите, будет создан StreamableKafkaMessageSource. Имя этого компонента будет streamableKafkaMessageSource.

В вашем application.yml вы, однако, ожидаете, что source для вашего conventions обработчика событий отслеживания будет называться kafkaMessageSource.

Кроме того, вы можете взглянуть на пример приложения , которое содержится в расширении Axon Kafka. Может быть, это проясняет ситуацию.

...