У меня есть IObservable
, предоставленный библиотекой, которая прослушивает события от внешней службы:
let startObservable () : IObservable<'a> = failwith "Given"
Для каждого полученного события я хочу выполнить действие, которое возвращает Async
:
let action (item: 'a) : Async<unit> = failwith "Given"
Я пытаюсь реализовать процессор в строках
let processor () : Async<unit> =
startObservable()
|> Observable.mapAsync action
|> Async.AwaitObservable
Я составил mapAsync
и AwaitObservable
: в идеале они должны быть предоставлены некоторой библиотекой, котораяПока я не могу найти.
Дополнительные требования:
Действия должны выполняться последовательно, поэтому последующие события буферизуются, пока обрабатывается предыдущее событие.
Если действие выдает ошибку, я хочу, чтобы мой процессор завершил работу.В противном случае это никогда не завершится.
Токен отмены, переданный через Async.Start
, должен соблюдаться.
Любые подсказки относительно библиотеки, которую я должен использовать?