Мой подход к этому двухступенчатый.
Сначала создайте оператор общего назначения, который превращает Action<T, Action<R>>
в Func<T, IObservable<R>>
:
public static class ObservableEx
{
public static Func<T, IObservable<R>> FromAsyncCallbackPattern<T, R>(
this Action<T, Action<R>> call)
{
if (call == null) throw new ArgumentNullException("call");
return t =>
{
var subject = new AsyncSubject<R>();
try
{
Action<R> callback = r =>
{
subject.OnNext(r);
subject.OnCompleted();
};
call(t, callback);
}
catch (Exception ex)
{
return Observable.Throw<R>(ex, Scheduler.ThreadPool);
}
return subject.AsObservable<R>();
};
}
}
Затем включите вызов void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)
в IObservable<IResponse> Process(IObservable<IRequest> requests)
:
public IObservable<IResponse> Process(IObservable<IRequest> requests)
{
Func<IRequest, IObservable<IResponse>> rq2rp =
ObservableEx.FromAsyncCallbackPattern
<IRequest, IResponse>(requestProcessor.Process);
var query = (
from rq in requests
select rq2rp(rq)).Concat();
var uow = unitOfWorkFactory.Create();
var subject = new ReplaySubject<IResponse>();
query.Subscribe(
r => subject.OnNext(r),
ex =>
{
uow.Rollback();
subject.OnError(ex);
},
() =>
{
uow.Commit();
subject.OnCompleted();
});
return subject.AsObservable();
}
Теперь не только выполняется асинхронная обработка, но и обеспечивается правильный порядок результатов.
На самом деле, поскольку вы начинаете с коллекции, вы можете даже сделать это:
var rqs = requests.ToObservable();
var rqrps = rqs.Zip(Process(rqs),
(rq, rp) => new
{
Request = rq,
Response = rp,
});
Тогда у вас будет наблюдаемое, которое объединяет в пары каждый запрос / ответ без необходимости в CorrelationId
свойстве.
Надеюсь, это поможет.