Я столкнулся с небольшой проблемой с обработкой команд в Axon 4.
Допустим, у меня есть агрегат, который должен вызывать внешний сервис при обработке команды.
Внешняя служба использует асинхронный клиент (клиент Vertx TCP + Rxjava), поэтому ответ дается в другом потоке, чем тот, который создал агрегатный экземпляр.
Я хочу применить событие с учетом результата моего сервиса, но оно не работает, потому что вызов AggregateLifecycle.apply()
находится в другом потоке ...
Как я могу "перенести" объем совокупности?
Вот небольшой пример того, что я хочу сделать (использует rxjava 2 и lombok):
Агрегат:
@Slf4j
@Aggregate
@NoArgsConstructor
public class MyAggregate {
@AggregateIdentifier
private String id;
@CommandHandler
public MyAggregate(CreationCommand creationCommand) {
Single.just("some data")
.observeOn(Schedulers.computation()) // <- comment this line and the test pass, uncomment and it fail because apply is on another thread ?
.subscribe((s, throwable) -> apply(new AggregateCreatedEvent(creationCommand.getId())));
}
@EventSourcingHandler
public void on(AggregateCreatedEvent event) {
this.id = event.getId();
}
}
@Value class CreationCommand { String id; }
@Value class AggregateCreatedEvent { String id;}
И тест:
public class MyAggregateTest {
AggregateTestFixture<MyAggregate> testFixture = new AggregateTestFixture<>(MyAggregate.class);
@Test
public void test() {
testFixture.givenNoPriorActivity()
.when(new CreationCommand("123"))
.expectEvents(new AggregateCreatedEvent("123"));
}
}
Вот ошибка, которую я получил:
java.lang.IllegalStateException: Cannot request current Scope if none is active