Аксон 4: EventSourcingHandler не срабатывает при применении события из другого потока - PullRequest
0 голосов
/ 04 января 2019

Я столкнулся с небольшой проблемой с обработкой команд в Axon 4.

Допустим, у меня есть агрегат, который должен вызывать внешний сервис при обработке команды.

Внешняя служба использует асинхронный клиент (клиент Vertx TCP + Rxjava), поэтому ответ дается в другом потоке, чем тот, который создал агрегатный экземпляр.

Я хочу применить событие с учетом результата моего сервиса, но оно не работает, потому что вызов AggregateLifecycle.apply() находится в другом потоке ...

Как я могу "перенести" объем совокупности?

Вот небольшой пример того, что я хочу сделать (использует rxjava 2 и lombok):

Агрегат:

@Slf4j
@Aggregate
@NoArgsConstructor
public class MyAggregate {

    @AggregateIdentifier
    private String id;

    @CommandHandler
    public MyAggregate(CreationCommand creationCommand) {
        Single.just("some data")
                .observeOn(Schedulers.computation()) // <- comment this line and the test pass, uncomment and it fail because apply is on another thread ?
                .subscribe((s, throwable) -> apply(new AggregateCreatedEvent(creationCommand.getId())));
    }

    @EventSourcingHandler
    public void on(AggregateCreatedEvent event) {
        this.id = event.getId();
    }
}

@Value class CreationCommand { String id; }
@Value class AggregateCreatedEvent { String id;}

И тест:

public class MyAggregateTest {

    AggregateTestFixture<MyAggregate> testFixture = new AggregateTestFixture<>(MyAggregate.class);

    @Test
    public void test() {
        testFixture.givenNoPriorActivity()
                .when(new CreationCommand("123"))
                .expectEvents(new AggregateCreatedEvent("123"));
    }
}

Вот ошибка, которую я получил:

java.lang.IllegalStateException: Cannot request current Scope if none is active

1 Ответ

0 голосов
/ 19 января 2019

Событие должно быть применено в потоке, который управляет этой единицей работы, в данном случае CommandHandler.Axon предоставляет собственные механизмы для асинхронных операций.CommandBus принимает команды асинхронно, а события обрабатываются обработчиками событий асинхронно.Ничего не получится от асинхронной реализации CommandHandler, и в любом случае он не поддерживается в настоящее время.

Все данные, необходимые для применения события, обычно должны быть доступны в команде или всовокупное состояние, а не из дополнительных внешних источников.

Это, вероятно, то, что вы хотите, чтобы ваш командный обработчик был похож:

@CommandHandler
public MyAggregate(CreationCommand creationCommand) {
    apply(new AggregateCreatedEvent(creationCommand.getId());
}
...