Как создать реактивного «промежуточного абонента»?Как объединить все детали Flux <T>и обработать их результат в цепочке (преобразовать в Mono <R>)? - PullRequest
0 голосов
/ 17 сентября 2018

Я играю с весенним webflux + mongodb-реактивным для сохранения бинарных файлов (изображений) в БД Mongo. К сожалению, spring-boot-starter-data-mongodb-реактивный: 2.0.5.RELEASE не поддерживает реактивное программирование для функциональности GridFsTemplate. Поэтому я решил создать подписчика, который будет принимать все части DataBuffer, объединять их и преобразовывать в InputStream, и тогда будет возможно GridFsTemplate::store:

public class GridFsTemplateSubscriber extends BaseSubscriber<DataBuffer> {
private GridFsTemplate gridFsTemplate;
private List<DataBuffer> dataBuffers;
private String fileName;

public GridFsTemplateSubscriber(GridFsTemplate gridFsTemplate, String fileName) {
    this.gridFsTemplate = gridFsTemplate;
    this.fileName = fileName;
    dataBuffers = new ArrayList<>();
}

public void hookOnNext(DataBuffer dataBuffer) {
    dataBuffers.add(dataBuffer);
    request(1);
}

public void hookOnComplete() {
    DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
    InputStream inputStream = defaultDataBufferFactory.join(dataBuffers).asInputStream();
    ObjectId objectId = gridFsTemplate.store(inputStream, fileName);
}
}

Проблема в том, что я хотел бы вернуть objectId для дальнейшей обработки, но hookOnComplete - это тип void. Еще больше .. Я хотел бы получить отсюда Mono ObjectId, чтобы я мог обработать его дальше в реактивной манере. В этом случае, как я понимаю философию реагирования, я не должен использовать «настоящего» подписчика, а что-то, что объединит результаты из Flux<T>, а при onComplete вернет Mono<R>. Есть ли у проекта-реактора такая возможность? Я новичок в реактивном программировании, поэтому, возможно, я мог пропустить всю идею, поэтому, пожалуйста, объясните мне, как этого добиться.

В моем предыдущем решении я использовал block(), чтобы завершить реактивную цепочку, поэтому я получил ObjectId и затем я выпустил id объекта с новой цепочкой. Но это точно не хорошее решение.

1 Ответ

0 голосов
/ 17 сентября 2018

К сожалению, весь этот подход вызывает проблемы, а это не рекомендуется.

Реализация Subscriber (даже с использованием BaseSubscriber) почти никогда не является решением. Если используемая вами библиотека драйверов не поддерживает реактивные потоки, то вам , вероятно, следует заключить ее в код блокировки .

Это приведет к потере многих функций (поведение во время выполнения, противодавление и т. Д.), Но, по крайней мере, вы не рискуете взорвать все ваше приложение, если реализация вашего подписчика не идеальна.

Пока ваши основные библиотеки / драйверы не поддерживают реактивные потоки, вам, вероятно, следует придерживаться Spring MVC, который поддерживает асинхронные концепции и Flux Mono возвращаемые типы в некоторых случаях.

...