динамически зарегистрировать слушатель транзакции с пружиной? - PullRequest
12 голосов
/ 18 марта 2011

У меня есть приложение springframework, в котором я хотел бы добавить прослушиватель транзакций в транзакцию, которая в данный момент выполняется.Мотивация состоит в том, чтобы инициировать действие после фиксации, которое уведомляет последующие системы.Я использую @Transactional, чтобы обернуть транзакцию вокруг какого-либо сервисного метода - вот где я хочу создать / зарегистрировать прослушиватель после транзакции.Я хочу сделать что-то «подобное» следующим образом:

public class MyService {
 @Transaction
 public void doIt() {
  modifyObjects();

  // something like this
  getTransactionManager().registerPostCommitAction(new 
   TransactionSynchronizationAdapter() {
     public void afterCommit() { 
      notifyDownstream(); 
     }
  });
 }
}

Spring имеет интерфейс TransactionSynchronization и класс адаптера, который кажется именно тем, что я хочу;однако не сразу понятно, как зарегистрировать его динамически в текущей транзакции или в менеджере транзакций.Я бы предпочел не создавать подкласс JtaTransactionManager, если смогу избежать этого.

Q: Кто-нибудь делал это раньше.

Q: что самое простоеспособ регистрации моего адаптера?

Ответы [ 4 ]

29 голосов
/ 19 марта 2011

На самом деле это было не так сложно, как я думал; У Spring есть статический вспомогательный класс, который помещает «правильный» материал в контекст потока.

TransactionSynchronizationManager.registerSynchronization(
    new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            s_logger.info("TRANSACTION COMPLETE!!!");
        }
    }
);
3 голосов
/ 27 февраля 2013

Вот более полное решение, которое я сделал для аналогичной проблемы, состоящей в том, что я хотел, чтобы мои сообщения отправлялись после совершения транзакций (я мог бы использовать RabbitMQ TX, но они довольно медленные).

public class MessageBusUtils {
    public static Optional<MessageBusResourceHolder> getTransactionalResourceHolder(TxMessageBus messageBus) {

        if ( ! TransactionSynchronizationManager.isActualTransactionActive()) {
            return Optional.absent();
        }

        MessageBusResourceHolder o = (MessageBusResourceHolder) TransactionSynchronizationManager.getResource(messageBus);
        if (o != null) return Optional.of(o);

        o = new MessageBusResourceHolder();
        TransactionSynchronizationManager.bindResource(messageBus, o);
        o.setSynchronizedWithTransaction(true);
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(new MessageBusResourceSynchronization(o, messageBus));
        }
        return Optional.of(o);

    }

    private static class MessageBusResourceSynchronization extends ResourceHolderSynchronization<MessageBusResourceHolder, TxMessageBus> {
        private final TxMessageBus messageBus;
        private final MessageBusResourceHolder holder;

        public MessageBusResourceSynchronization(MessageBusResourceHolder resourceHolder, TxMessageBus resourceKey) {
            super(resourceHolder, resourceKey);
            this.messageBus = resourceKey;
            this.holder = resourceHolder;
        }


        @Override
        protected void cleanupResource(MessageBusResourceHolder resourceHolder, TxMessageBus resourceKey,
                boolean committed) {
            resourceHolder.getPendingMessages().clear();
        }

        @Override
        public void afterCompletion(int status) {
            if (status == TransactionSynchronization.STATUS_COMMITTED) {
                for (Object o : holder.getPendingMessages()) {
                    messageBus.post(o, false);
                }
            }
            else {
                holder.getPendingMessages().clear();
            }
            super.afterCompletion(status);
        }


    }
}

public class MessageBusResourceHolder extends ResourceHolderSupport {

    private List<Object> pendingMessages = Lists.newArrayList();

    public void addMessage(Object message) {
        pendingMessages.add(message);
    }


    protected List<Object> getPendingMessages() {
        return pendingMessages;
    }

}

Теперь в вашем классе, куда вы на самом деле отправляете сообщение, вы будете делать

@Override
public void postAfterCommit(Object o) {
    Optional<MessageBusResourceHolder> holder = MessageBusTxUtils.getTransactionalResourceHolder(this);
    if (holder.isPresent()) {
        holder.get().addMessage(o);
    }
    else {
        post(o, false);
    }
}

Приносим извинения за длинные примеры кода, но, надеюсь, это покажет кому-то, как что-то сделать после коммита.

3 голосов
/ 19 марта 2011

вы можете использовать аспект для соответствия аспекту транзакционных методов в вашем сервисе, чтобы выполнить это:

@Aspect
public class AfterReturningExample {

  @AfterReturning("execution(* com.mypackage.MyService.*(..))")
  public void afterReturning() {
    // ...
  }

}
0 голосов
/ 26 марта 2015

Имеет ли смысл переопределять диспетчер транзакций в методах фиксации и отката, вызывая super.commit() в самом начале.

...