, поэтому я пытаюсь обойти распределенную командную шину в аксоне 3.4.
У меня есть сценарий использования, когда при получении определенной команды агрегат отправляет событие, которое запускает сагу, эта сага отправляет 2 команды, чтобы поддерживать отправку данных двум разным сервисам в согласованном состоянии.
Теперь перейдем к хитрой части: CommandHandlers определены во внешних сервисах, которые что-то делают, а затем отправляют команду обратно с результатом операции в ней. Однако когда команда получает send, я всегда получаю исключение тайм-аута, поэтому CommandBus знает, какой агрегат должен с ней справиться, но не может назначить команду правильного Aggregate.
В настоящее время commandService.createCurrency регистрирует только сообщение, поэтому в обработчике событий есть Thread.sleep для имитации более продолжительного процесса.
Ниже вы найдете мой код:
@Configuration
public class AxonConfig {
@Autowired
private Registration registration;
private RestTemplate restTemplate = new RestTemplate();
@Bean
public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
Serializer serializer) {
return new SpringHttpCommandBusConnector(localSegment, restTemplate, serializer);
}
@Bean
public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
return new SpringCloudCommandRouter(discoveryClient, registration, new AnnotationRoutingStrategy());
}
@Primary // to make sure this CommandBus implementation is used for autowiring
@Bean
public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter,
CommandBusConnector commandBusConnector) {
return new DistributedCommandBus(commandRouter, commandBusConnector);
}
}
Сервис 1
Совокупный:
@Aggregate
@Data
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CreateCurrencyAggregate {
@AggregateIdentifier
private String id;
@CommandHandler
public CreateCurrencyAggregate(CreateCurrencyCommand command) {
log.info("starting create currency");
Assert.notNull(command.getId(), "CreateCurrencyCommand must have an id");
Assert.hasLength(command.getId(), "CreateCurrencyCommand id cannot be an empty String");
this.id = command.getId();
apply(CreateCurrencyEvent.builder()
.id(command.getId())
.payload(command.getPayload())
.build());
}
@CommandHandler
public void on(DalCreatedCommand command) {
log.info("Currency created on dal layer");
apply(DalCurrencyCreatedEvent.builder()
.dalId(command.getId())
.build());
}
}
Saga:
@Slf4j
@Saga
public class CreateCurrencySaga {
@Autowired
private transient CommandGateway commandGateway;
@StartSaga
@SagaEventHandler(associationProperty = "id")
public void handle(CreateCurrencyEvent event) {
log.info("starting saga...");
dalCreated = false;
as400Created = true;
SagaLifecycle.associateWith("id", event.getId());
SagaLifecycle.associateWith("dalId", event.getId());
commandGateway.send(CreateDalCurrencyCommand.builder()
.id(event.getId())
.payload(event.getPayload())
.build());
}
@SagaEventHandler(associationProperty = "dalId")
public void handle(DalCurrencyCreatedEvent event) {
log.info("receiving createdEvent");
SagaLifecycle.end();
}
}
Сервис 2
Внешний CommandHandler
@Slf4j
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Component
public class CurrencyCommandHandler {
@Autowired
private EventBus eventBus;
@CommandHandler
public void on(CreateDalCurrencyCommand command) {
eventBus.publish(asEventMessage(CreateDalCurrencyEvent.builder()
.id(command.getId())
.payload(command.getPayload())
.build()));
}
}
EventHandler
@Slf4j
@RequiredArgsConstructor
@Component
public class CurrencyEventHandlers {
private final CurrencyCommandService commandService;
private final CommandGateway commandGateway;
@EventHandler
public void handle(CreateDalCurrencyEvent event){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
commandService.createCurrency(event.getId(), event.getPayload());
var result = commandGateway.send(DalCreatedCommand.builder()
.id(event.getId())
.build());
}
}