Смешивание IObservable и Async <'a> в F # - PullRequest
0 голосов
/ 12 сентября 2018

У меня есть IObservable, предоставленный библиотекой, которая прослушивает события от внешней службы:

let startObservable () : IObservable<'a> = failwith "Given"

Для каждого полученного события я хочу выполнить действие, которое возвращает Async:

let action (item: 'a) : Async<unit> = failwith "Given"

Я пытаюсь реализовать процессор в строках

let processor () : Async<unit> =
    startObservable()
    |> Observable.mapAsync action
    |> Async.AwaitObservable

Я составил mapAsync и AwaitObservable: в идеале они должны быть предоставлены некоторой библиотекой, котораяПока я не могу найти.

Дополнительные требования:

  • Действия должны выполняться последовательно, поэтому последующие события буферизуются, пока обрабатывается предыдущее событие.

  • Если действие выдает ошибку, я хочу, чтобы мой процессор завершил работу.В противном случае это никогда не завершится.

  • Токен отмены, переданный через Async.Start, должен соблюдаться.

Любые подсказки относительно библиотеки, которую я должен использовать?

Ответы [ 2 ]

0 голосов
/ 12 сентября 2018

Hopac может легко взаимодействовать с IObservables Вы можете конвертировать Hopac Jobs в Async с Job.toAsync

open System
open Hopac

let startObservable () : IObservable<'a> = failwith "Given"

let action (item: 'a) : Job<unit> = failwith "Given"

let processor () : Job<unit> =
    startObservable()
    |> Stream.ofObservable
    |> Stream.mapJob action
    |> Stream.iter
0 голосов
/ 12 сентября 2018

Поскольку вы хотите преобразовать модель, основанную на push (IObservable<>), в модель, основанную на pull (Async<>), вам необходимо поставить в очередь для буферизации данных, поступающих из наблюдаемой. Если очередь ограничена по размеру - какой тбч. следует сделать так, чтобы весь конвейер был безопасным, чтобы не переполнять память - тогда необходима стратегия переполнения буфера.

  1. Одним из способов является реализация MailboxProcessor<> и настраиваемая наблюдаемая, которая будет публиковать данные в нее. Поскольку MP является нативной реализацией F # actor, он может выполнять упорядоченную обработку с очередью для буферизации пиков.
  2. Другой вариант - использовать FSharp.Control.AsyncSeq (и, в частности, AsyncSeq.ofObservableBuffered ), которая превратит наблюдаемую в перечисляемую асинхронную обработку на основе извлечения - под ней используется процессор почтовых ящиков с 1-й точки:

    startObservable()
    |> AsyncSeq.ofObservableBuffered
    |> AsyncSeq.iterAsync action
    
...