Сбой теста облачных потоков Spring после добавления рекомендации AOP, выполняемой вокруг целевого метода - PullRequest
0 голосов
/ 04 января 2019

У меня есть приложение весенних облачных потоков с потоковым приемником, потребляющим события из входного канала Все прошло гладко, пока я не добавил совет AOP для регистрации выполнения метода обработки (между другими в приложении). После этого начался сбой теста со следующей ошибкой:

org.springframework.messaging. Вложенное исключение - java.lang.IllegalStateException: класс метода сопоставленного обработчика 'com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener $$ EnhancerBySpringCGLIB $$ 9795881e $ экземпляр экземпляра Mockito4661e $ 3375951e класс бина 'com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener $$ EnhancerBySpringCGLIB $$ 9795881e $$ EnhancerBySpringCGLIB $$ 2a2d55ce'. Если конечная точка требует прокси (например, из-за @Transactional), используйте прокси на основе классов. HandlerMethod подробности: ...

Определение раковины:

Код приложения выглядит следующим образом:

public interface ExchangeRateStoreStreamSink {
        String NEWEXCHANGERATE="new-exchange-rate";

        @Input(NEWEXCHANGERATE)
        SubscribableChannel newExchangeRate();
}

Stream Listener с аннотированным методом:

@EnableBinding(ExchangeRateStoreStreamSink.class)
public class ExchangeRateStoreStreamListener {
    private CommandBus commandBus;

    @Autowired
    public ExchangeRateStoreStreamListener(CommandBus commandBus) {
        this.commandBus = commandBus;
    }

    @Loggable(operationName="ExchangeRateConsumption")
    @StreamListener(ExchangeRateStoreStreamSink.NEWEXCHANGERATE)
    public void handle(NewExchangeRateMessage newExchangeRateMessage) {
        AddExchangeRateCommand addExchangeRateCommand = new AddExchangeRateCommand(newExchangeRateMessage.from,
                newExchangeRateMessage.to, newExchangeRateMessage.amount, newExchangeRateMessage.date);
        commandBus.dispatch(addExchangeRateCommand);
    }
}

Тест:

@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class ExchangeRateStoreStreamListenerTest {
    @Autowired
    private ExchangeRateStoreStreamSink streamSink;

    @SpyBean
    private ExchangeRateStoreStreamListener streamListener;


    @Test
    public void test() {
        SubscribableChannel input = streamSink.newExchangeRate();
        NewExchangeRateMessage exchangeRateMessage = NewExchangeRateMessageFactory.aNewExchangeRateMessage();
        input.send(new GenericMessage<>(exchangeRateMessage));

        verify(streamListener).handle(any(NewExchangeRateMessage.class));
    }
}

AOP Аспект:

@Aspect
@Component
public class LoggingAspect {
    private static final String API_DOMAIN = "fx";

    @Pointcut(value = "@annotation(loggable) && execution(* *(..))", argNames = "loggable")
    public void loggableMethod(Loggable loggable) { }

    @Around(value = "loggableMethod(loggable)", argNames = "pjp,loggable")
    public Object logAccess(ProceedingJoinPoint pjp, Loggable loggable) throws Throwable {
        final Signature signature = pjp.getSignature();

        final Logger logger = LogManager.getLogger(signature.getDeclaringType());

        logger.info( "api_domain={} _operation={} _message=\"Start operation\"",
                API_DOMAIN, loggable.operationName());
        try {
            return pjp.proceed();
        } catch (DomainError domainError) {
            // Some logic here
        }
    }
}

Любая помощь приветствуется. Заранее спасибо!

1 Ответ

0 голосов
/ 04 января 2019

Это потому, что StreamListener уже является прокси.Слишком много, чтобы объяснить, и, вероятно, будет хорошей темой для ведения блога.,,В любом случае, я рад, что вы описали реальную проблему, которую вы пытаетесь решить, и которая может быть решена гораздо более простым подходом, а именно введением ChannelInterceptor, что, по сути, является рекомендацией Around для вызова вашего обработчика сообщений.В основном вот пример:

@Bean
@GlobalChannelInterceptor 
public ChannelInterceptor channelInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> msg, MessageChannel mc) {
            System.out.println("Before send to channel: " + mc);
            return msg;
        }

        @Override
        public void afterSendCompletion(Message<?> msg, MessageChannel mc, boolean bln, Exception excptn) {
            System.out.println("After send completion to channel: " + mc);
        }

        @Override
        public void postSend(Message<?> msg, MessageChannel mc, boolean bln) {
            System.out.println("After send to channel: " + mc);
        }

    };
}

.,,и вот результат, который он выдаст:

Before send to channel: input
Before send to channel: integrationFlowCreator.channel#0
===> SOME LOG MESSAGE INSIDE YOUR CODE 
After send to channel: integrationFlowCreator.channel#0
After send completion to channel: integrationFlowCreator.channel#0
After send to channel: input
After send completion to channel: input

Вы просто объявляете это в своей конфигурации, и все должно быть в порядке.Он будет применен ко всем каналам, поэтому вы (с логикой) будете следить только один раз, когда захотите.Пожалуйста, обратитесь к Javadoc для GlobalChannelInterceptor для более подробной информации.Надеюсь, это поможет

...