Аксон 3.4 CommandHandler в Агрегате не запускается при отправке команды из внешней службы - PullRequest
1 голос
/ 16 мая 2019

, поэтому я пытаюсь обойти распределенную командную шину в аксоне 3.4. У меня есть сценарий использования, когда при получении определенной команды агрегат отправляет событие, которое запускает сагу, эта сага отправляет 2 команды, чтобы поддерживать отправку данных двум разным сервисам в согласованном состоянии.

Теперь перейдем к хитрой части: CommandHandlers определены во внешних сервисах, которые что-то делают, а затем отправляют команду обратно с результатом операции в ней. Однако когда команда получает send, я всегда получаю исключение тайм-аута, поэтому CommandBus знает, какой агрегат должен с ней справиться, но не может назначить команду правильного Aggregate.

В настоящее время commandService.createCurrency регистрирует только сообщение, поэтому в обработчике событий есть Thread.sleep для имитации более продолжительного процесса.

Ниже вы найдете мой код:

@Configuration
public class AxonConfig {

    @Autowired
    private Registration registration;

    private RestTemplate restTemplate = new RestTemplate();

    @Bean
    public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
                                                             Serializer serializer) {
        return new SpringHttpCommandBusConnector(localSegment, restTemplate, serializer);
    }

    @Bean
    public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
        return new SpringCloudCommandRouter(discoveryClient, registration, new AnnotationRoutingStrategy());
    }

    @Primary // to make sure this CommandBus implementation is used for autowiring
    @Bean
    public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter,
                                                                  CommandBusConnector commandBusConnector) {
        return new DistributedCommandBus(commandRouter, commandBusConnector);
    }

}

Сервис 1

Совокупный:

@Aggregate
@Data
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CreateCurrencyAggregate {

    @AggregateIdentifier
    private String id;

    @CommandHandler
    public CreateCurrencyAggregate(CreateCurrencyCommand command) {
        log.info("starting create currency");
        Assert.notNull(command.getId(), "CreateCurrencyCommand must have an id");
        Assert.hasLength(command.getId(), "CreateCurrencyCommand id cannot be an empty String");
        this.id = command.getId();
        apply(CreateCurrencyEvent.builder()
                .id(command.getId())
                .payload(command.getPayload())
                .build());
    }

    @CommandHandler
    public void on(DalCreatedCommand command) {
        log.info("Currency created on dal layer");
        apply(DalCurrencyCreatedEvent.builder()
                .dalId(command.getId())
                .build());

    }
}

Saga:

@Slf4j
@Saga
public class CreateCurrencySaga {

    @Autowired
    private transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "id")
    public void handle(CreateCurrencyEvent event) {
        log.info("starting saga...");
        dalCreated = false;
        as400Created = true;
        SagaLifecycle.associateWith("id", event.getId());
        SagaLifecycle.associateWith("dalId", event.getId());
        commandGateway.send(CreateDalCurrencyCommand.builder()
                .id(event.getId())
                .payload(event.getPayload())
                .build());
    }

    @SagaEventHandler(associationProperty = "dalId")
    public void handle(DalCurrencyCreatedEvent event) {
        log.info("receiving createdEvent");
        SagaLifecycle.end();
    }


}

Сервис 2

Внешний CommandHandler

@Slf4j
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Component
public class CurrencyCommandHandler {

    @Autowired
    private EventBus eventBus;

    @CommandHandler
    public void on(CreateDalCurrencyCommand command) {
        eventBus.publish(asEventMessage(CreateDalCurrencyEvent.builder()
                .id(command.getId())
                .payload(command.getPayload())
                .build()));
    }
}

EventHandler

@Slf4j
@RequiredArgsConstructor
@Component
public class CurrencyEventHandlers {

    private final CurrencyCommandService commandService;

    private final CommandGateway commandGateway;

    @EventHandler
    public void handle(CreateDalCurrencyEvent event){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        commandService.createCurrency(event.getId(), event.getPayload());
        var result = commandGateway.send(DalCreatedCommand.builder()
            .id(event.getId())
            .build());
    }
}

1 Ответ

1 голос
/ 17 мая 2019

Я думаю, что могу дать вам дополнительную информацию в этой области.

К сожалению, реализация Spring Cloud, используемой в качестве Discovery Service, вносит большой вклад в мир. Внутренне SpringCloudCommandRouter использует метаданные ServiceInstance для совместного использования MessageRoutingInformation. Каждое приложение, подключенное к вашей настройке, будет представлено ServiceInstance, поэтому поделиться тем, какие сообщения (и, следовательно, также команды), которые вы, как служба, можете обработать с помощью этого подхода, будет просто.

Однако при создании SpringCloudCommandRouter это было проверено с использованием Eureka в качестве реализации Spring Cloud. Eureka позволяет корректировать метаданные ServiceInstance, поэтому я могу с уверенностью заявить, что если вы используете Spring Cloud Eureka, я ожидаю, что все будет работать так же, как и все.

Однако, если вы, например, используете Консул, это другая история. Spring Cloud Consul не позволяет корректировать метаданные ServiceInstance. В прошлом я создал проблему , чтобы настроить API так, чтобы он имел возможность обновлять метаданные.

Несмотря на это, проблема с предоставлением поддержки Spring Cloud Consul и других реализаций, которые не позволяют корректировать метаданные, была решена Axon Framework путем предоставления SpringCloudHttpBackupCommandRouter.

Поэтому я бы предложил настроить вашу конфигурацию так, чтобы она использовала SpringCloudHttpBackupCommandRouter вместо `` SpringCloudCommandRouter`

...