Написание аспектов для реактивных конвейеров - PullRequest
0 голосов
/ 11 марта 2020

Я пишу Аспекты для методов, которые возвращают обещания. Рассмотрим следующий метод:

public Mono<Stream> publishToKafka(Stream s) {
    //publishToKafka is asynchronous
    return Mono.just(s).flatMap(worker::publishToKafka);
}

Я хочу кэшировать, если публикация sh прошла успешно или нет. Поскольку это междисциплинарная проблема, Aspect выглядит как лучший дизайн. Вот мой Аспект для этого.

@Around("@annotation....")
public Object cache() {
    //get the data to cache from the annotation
    Object result = pjp.proceed();
    cache.cache("key","data");
    return result;
}

Теперь, поскольку publishToKafka является асинхронным, целевой метод возвращается, как только происходит переключение потока и вызывается cache.cache(). Это не то, что я хочу. Я хочу, чтобы результат кэшировался, если событие было успешно опубликовано в Kafka. Следующий совет работает.

@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
    //get the data to cache from the annotation
    return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}

Я хочу понять, что здесь происходит. Это происходит во время сборки трубопровода? Или во время выполнения (pjp.proceed() возвращает обещание), к которому мой совет добавляет оператор doOnNext?

Мне нужно понять время сборки и выполнения в контексте этого примера.

1 Ответ

2 голосов
/ 11 марта 2020

И аспекты Spring AOP и AspectJ всегда выполняются синхронно в том же потоке, что и перехваченная точка соединения. Таким образом, если ваш перехваченный метод возвращается немедленно, а возвращаемое значение является чем-то вроде обещания, будущего или ничего (пустота) в сочетании с обратным вызовом, вы не можете ожидать магического получения асинхронного результата в рекомендации аспекта. Вам нужно проинформировать аспект об асинхронной ситуации.

Сказав это, я также хочу упомянуть, что я никогда раньше не использовал реактивное программирование, я знаю только концепцию. Из того, что я вижу в вашем совете, решение должно работать, но одна вещь не так хороша: вы заставляете совет возвращать новый Mono экземпляр , возвращаемый вашим doOnNext(..) вызовом . Возможно, было бы чище вернуть оригинал Mono, полученный от proceed() после регистрации на нем обратного вызова кэширования, просто чтобы избежать каких-либо побочных эффектов.

I Не знаю, что еще объяснить, ситуация довольно понятна. Не стесняйтесь задавать непосредственно связанные вопросы, если моего объяснения недостаточно.

...