FS2: возможно ли завершить очередь изящно? - PullRequest
0 голосов
/ 07 мая 2018

Предположим, что я хочу преобразовать какой-то устаревший асинхронный API в потоки FS2. API предоставляет интерфейс с 3 обратными вызовами: следующий элемент, успех, ошибка. Я бы хотел, чтобы Stream испустил все элементы и затем завершил работу при получении обратного вызова об успешном завершении или об ошибке.

Руководство по FS2 (https://functional -streams-for-scala.github.io / fs2 / guide.html ) предлагает использовать fs2.Queue для таких ситуаций, и это прекрасно работает для постановки в очередь, но все примеры, которые я видел до сих пор, ожидают, что поток, который возвращает queue.dequeue, никогда не завершится - в моей ситуации нет очевидного способа обработки обратного вызова «успех / ошибка». Я пытался использовать queue.dequeue.interruptWhen(...here goes the signal...), но если обратный вызов об успехе / ошибке приходит до того, как клиент прочитает данные из потока, поток прерывается преждевременно - еще есть непрочитанные элементы. Я бы хотел, чтобы потребитель закончил читать их до завершения потока.

Возможно ли это сделать с помощью FS2? С Akka Streams это тривиально - у SourceQueueWithComplete есть методы complete и fail.

UPDATE: Мне удалось получить достаточно хороший результат, обернув элементы в Option и рассматривая None как сигнал для прекращения чтения потока, а также используя Promise для распространения ошибок:

queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)

Однако, я упустил более естественный способ делать такие вещи?

Ответы [ 2 ]

0 голосов
/ 08 августа 2018

Ответ на ваше обновление: Объедините unNoneTerminate с rethrow, который принимает Stream[F, Either[Throwable, A]] и возвращает Stream[F, A], который выдает ошибку с Stream.raiseError, когда он совершает бросок.

Ваша полнаятогда стек будет Stream[F, Either[Throwable, Option[A]]], и вы развернете в Stream[F,A], вызвав .rethrow.unNoneTerminate

0 голосов
/ 19 июня 2018

Один идиоматический способ сделать это - создать Queue[Option[A]] вместо Queue[A]. При постановке в очередь, введите Some, и вы можете явно поставить в очередь None, чтобы сообщить о завершении. На стороне снятия с очереди выполните q.dequeue.unNoneTerminate, что даст вам Stream[F, A], который завершается, когда очередь выдает None

...