Предположим, что я хочу преобразовать какой-то устаревший асинхронный API в потоки FS2.
API предоставляет интерфейс с 3 обратными вызовами: следующий элемент, успех, ошибка.
Я бы хотел, чтобы Stream испустил все элементы и затем завершил работу при получении обратного вызова об успешном завершении или об ошибке.
Руководство по FS2 (https://functional -streams-for-scala.github.io / fs2 / guide.html ) предлагает использовать fs2.Queue
для таких ситуаций,
и это прекрасно работает для постановки в очередь, но все примеры, которые я видел до сих пор, ожидают, что поток, который возвращает queue.dequeue
, никогда не завершится -
в моей ситуации нет очевидного способа обработки обратного вызова «успех / ошибка».
Я пытался использовать queue.dequeue.interruptWhen(...here goes the signal...)
, но если обратный вызов об успехе / ошибке приходит до того, как клиент прочитает данные из потока,
поток прерывается преждевременно - еще есть непрочитанные элементы. Я бы хотел, чтобы потребитель закончил читать их до завершения потока.
Возможно ли это сделать с помощью FS2? С Akka Streams это тривиально - у SourceQueueWithComplete
есть методы complete
и fail
.
UPDATE:
Мне удалось получить достаточно хороший результат, обернув элементы в Option и рассматривая None как сигнал для прекращения чтения потока, а также используя Promise для распространения ошибок:
queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)
Однако, я упустил более естественный способ делать такие вещи?