Есть несколько проблем в реализации вашего веб-сервиса.
Когда subscribe
Прежде всего, в реактивном программировании вы, как правило, должны пытаться построить одиночный конвейер обработки (вызывая операторы Mono
и Flux
и возвращая конечный результат как Mono
и Flux
). В любом случае вы должны либо позволить платформе выполнить subscribe
, либо, по крайней мере, подписаться только один раз в конце этого конвейера.
Здесь вместо этого вы смешиваете два подхода: ваш метод createDocument
правильно возвращает Mono
, но он также делает subscribe
. Хуже того, подписка выполняется на промежуточном этапе, и ничто не подписывается на весь конвейер в методе веб-сервиса.
Таким образом, фактически никто не видит вторую половину конвейера (начиная с groupBy
), и поэтому он никогда не выполняется (это ленивый Flux
, также называемый «холодным» потоком).
Смешивание синхронного и асинхронного
Другая проблема - это проблема смешения двух подходов: ваш Flux
ленив и асинхронен, но ваш веб-сервис написан в императивном и синхронном стиле.
Таким образом, код запускает асинхронный Flux
из БД, немедленно возвращается к контроллеру и пытается загрузить данные файла с диска.
Вариант 1: сделать контроллер более Flux
-ориентированным
Если вы используете Spring MVC, вы все равно можете написать эти обязательные контроллеры стилей, добавив при этом некоторый WebFlux. В этом случае вы можете вернуть Mono
или Flux
, и Spring MVC переведет его в правильную асинхронную конструкцию сервлета. Но это будет означать, что вы должны превратить обработку OutputStream
и bytes
в Mono
, чтобы связать ее с документом Mono
, используя что-то вроде then
/ flatMap
/ etc ... It немного сложнее.
Вариант 2: Превращение Flux
в код обязательной блокировки
Другой вариант - вернуться к императиву и стилю блокировки, вызвав block()
на createDocument()
Mono
. Это подпишется на него и будет ждать его завершения. После этого весь ваш императивный код должен работать нормально.
Примечание на стороне
groupBy
имеет ограничение, при котором, если оно приводит к более чем 256
открытым группам, оно может зависнуть. Здесь группы не могут закрываться до тех пор, пока не будет достигнут конец файла, но, к счастью, поскольку вы обрабатываете данные только в течение одного месяца, поток не превысит 31
групп.