Я играю с весенним webflux + mongodb-реактивным для сохранения бинарных файлов (изображений) в БД Mongo. К сожалению, spring-boot-starter-data-mongodb-реактивный: 2.0.5.RELEASE не поддерживает реактивное программирование для функциональности GridFsTemplate
. Поэтому я решил создать подписчика, который будет принимать все части DataBuffer, объединять их и преобразовывать в InputStream
, и тогда будет возможно GridFsTemplate::store
:
public class GridFsTemplateSubscriber extends BaseSubscriber<DataBuffer> {
private GridFsTemplate gridFsTemplate;
private List<DataBuffer> dataBuffers;
private String fileName;
public GridFsTemplateSubscriber(GridFsTemplate gridFsTemplate, String fileName) {
this.gridFsTemplate = gridFsTemplate;
this.fileName = fileName;
dataBuffers = new ArrayList<>();
}
public void hookOnNext(DataBuffer dataBuffer) {
dataBuffers.add(dataBuffer);
request(1);
}
public void hookOnComplete() {
DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
InputStream inputStream = defaultDataBufferFactory.join(dataBuffers).asInputStream();
ObjectId objectId = gridFsTemplate.store(inputStream, fileName);
}
}
Проблема в том, что я хотел бы вернуть objectId
для дальнейшей обработки, но hookOnComplete
- это тип void. Еще больше .. Я хотел бы получить отсюда Mono ObjectId, чтобы я мог обработать его дальше в реактивной манере. В этом случае, как я понимаю философию реагирования, я не должен использовать «настоящего» подписчика, а что-то, что объединит результаты из Flux<T>
, а при onComplete вернет Mono<R>
. Есть ли у проекта-реактора такая возможность? Я новичок в реактивном программировании, поэтому, возможно, я мог пропустить всю идею, поэтому, пожалуйста, объясните мне, как этого добиться.
В моем предыдущем решении я использовал block()
, чтобы завершить реактивную цепочку, поэтому я получил ObjectId
и затем я выпустил id объекта с новой цепочкой. Но это точно не хорошее решение.