Аксон каркас с Атомикос? Как это работает? - PullRequest
0 голосов
/ 25 февраля 2020

У меня есть вопрос, я должен использовать Atomikos с платформой Axon, в Spring-Boot (без Axon Server). Я использую Oracle DB, и я использую несколько потоков (10) для отправки большого количества команд, а до этого я настраиваю JtaTransactionManager для себя, но в некоторых потоках я получаю такого рода исключение: javax.transaction. xa.XAException, поднял -6 или -4 или -3 или ORA-02056: 2P C: k2lcom: неверный номер двухфазной команды rdonly from ord :. В процессе отладки я увидел, что CommandGateWay также использует JtaTransactionManager. Это правильно? Когда это открывает транзакцию? Возможно ли, что мой JtaTransactionManager и Axon находятся в конфликте? У кого-нибудь были подобные исключения?

Пример кода:

@Service
public class CreateEntitiesServiceImpl extends FutureCompleter implements CreateEntitiesService {

    private static Logger logger = LoggerHelper.getDeveloperLogger(CreateEntitiesServiceImpl.class);
    private final CommandGateway commandGateway;
    private final ExecutionUtil executionUtil;
    private final MyEntityRepository myEntityRepository;

    public CreateEntitiesServiceImpl(CommandGateway commandGateway, ExecutionUtil executionUtil, MyEntityRepository myEntityRepository) {
        this.commandGateway = commandGateway;
        this.executionUtil = executionUtil;
        this.myEntityRepository = myEntityRepository;
    }

    @Override
    public void process(Message message) {
        logger.info("Entity addition started!");
        generateEntities();
        logger.info("Entity addition finished!");
    }

    private void generateEntities() {
        ExecutorService executorService = executionUtil.createExecutor(10, "createEntities");

        List<Integer> list = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());

        CreateEntitiesService proxy = applicationContext.getBean(CreateEntitiesServiceImpl.class);

        List<CompletableFuture<Void>> processingFutures = list.stream().map(
                e -> CompletableFuture.runAsync(proxy::createEntity, executorService).whenComplete((x, y) -> executorService.shutdown()))
                .collect(Collectors.toList());

        processingFutures.stream().map(this::getVoidFuture).collect(Collectors.toList());
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void createEntity() {
        try {
            MyEntity myEntity = new MyEntity();
            myEntity.setEntityStringProperty("string");
            myEntity.setEntityTimestampProperty(LocalDateTime.now());

            MyEntity savedEntity = myEntityRepository.save(myEntity);
            CreateAggregateCommand command = new CreateAggregateCommand(savedEntity.getEntityId(), savedEntity.getEntityStringProperty(),
                    savedEntity.getEntityTimestampProperty());
            commandGateway.send(command);
        } catch (Exception e) {
            throw new CreateEntitiesException(e.getMessage(), e);
        }
    }
}

Спасибо

1 Ответ

0 голосов
/ 26 февраля 2020

Я не до конца понимаю, что вы просите, чтобы быть честным.

В заголовке указано «Аксон с Атомикосом», хотя я не думаю, что этот момент когда-либо возвращается в описании.

То, что вы спрашиваете в описании вопроса, является ли правильным, что CommandGateway использует менеджер транзакций.

По этому вопросу я могу быть ясен: да, это так. Отправка команды очень вероятно приведет к тому, что вы попадете в агрегатный экземпляр. Поскольку вы хотите защитить границу согласованности в этом экземпляре и , чтобы гарантировать, что публикуемые события сохраняются, целесообразно начать транзакцию, как только будет обработана команда.

Кстати, небольшое примечание: это CommandBus, который использует TransactionManager. Концептуально это пока не меняет моего описания.

В заключение я не совсем уверен, поможет ли это вам, так как ваш вопрос мне не совсем понятен. Я надеюсь, что переписывание вашего поста прояснит больше того, что вы на самом деле после.

...