EDIT1 : см. Ниже гибридное решение, основанное на ответе Пола Беттса и моем.
EDIT2 : См. Ниже решение "третьего поколения"основанный на обновлении ОП.
Обратный вызов немного сложен, и я должен сказать, что превращение этого в наблюдаемое - хороший способ.
у меня есть подход, который работал для меня.
Основной подход - превратить операцию SomeFunc
в Func<T>
, а затем вызвать Observable.Start
для этого.Я обернул это в Observable.Create
, чтобы сохранить его в чистоте, и я добавил обработку ошибок.Я провел базовое тестирование, но ничего слишком надежного.
Использование кода выглядит следующим образом:
var obs = service.SomeObservableFunc(new SomeData(), Scheduler.ThreadPool);
obs.Subscribe(t => { /* success */ }, ex => { /* error */ });
Я предположил, что ваш класс обслуживания RIA равен RiaService<T>
, и построилSomeObservableFunc
метод расширения, подобный следующему:
public static IObservable<T> SomeObservableFunc<T>(
this RiaService<T> service,
SomeData data,
IScheduler scheduler)
{
return Observable.Create<T>(o =>
{
var error = (Exception)null;
Func<T> call = () =>
{
var result = default(T);
var mre = new ManualResetEvent(false);
Action<InvokeOperation<T>> callback = iot =>
{
try
{
if (iot.HasError)
{
error = iot.Error;
}
else
{
result = iot.Value;
}
}
catch (Exception ex)
{
error = ex;
}
finally
{
mre.Set();
}
};
try
{
service.SomeFunc(data, callback, null);
mre.WaitOne();
}
catch (Exception ex)
{
error = ex;
}
return result;
};
return Observable
.Start(call, scheduler)
.Subscribe(t =>
{
try
{
if (error == null)
{
o.OnNext(t);
}
else
{
o.OnError(error);
}
}
catch (Exception ex)
{
o.OnError(ex);
}
}, ex => o.OnError(ex), () =>
{
if (error == null)
{
o.OnCompleted();
}
});
});
}
Выкрикни, если это работает для тебя.
EDIT1
Мне понравилось решение Пола Беттса, потому что оно не помоглоне использовал ManualResetEvent
, но он не компилировался и не обрабатывал внутренние ошибки, которые могут возникнуть во время вызова службы RIA, поэтому я создал следующее гибридное решение.
Мой метод расширения сейчасвыглядит следующим образом:
public static IObservable<T> SomeObservableFunc<T>(
this RiaService<T> service,
SomeData data,
IScheduler scheduler)
{
return
Observable.Defer<T>(() =>
FromCallbackPattern<SomeData, T>(service.SomeFunc, scheduler)
.Invoke(data));
}
И это использует переработанный FromCallbackPattern
, изначально созданный Полом Беттсом:
private static Func<P, IObservable<T>> FromCallbackPattern<P, T>(
Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return p =>
{
var subject = new AsyncSubject<T>();
try
{
call(p, iot =>
{
if (iot.HasError)
{
subject.OnError(iot.Error);
}
else
{
subject.OnNext(iot.Value);
subject.OnCompleted();
}
}, null);
}
catch (Exception ex)
{
subject.OnError(ex);
}
return subject.ObserveOn(scheduler);
};
}
Он работает против моего тестового кода, и я думаю, что это в целом лучшерешение.
EDIT2
Эта версия решения предназначена для передачи различного количества параметров и пользовательского состояния в методы расширения FromCallbackPattern
.
я начал с этого общего назначения FromCallbackPattern
расширение меняthod:
public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
this Action<Action<InvokeOperation<T>>> call,
IScheduler scheduler)
{
return Observable.Defer(() =>
{
var subject = new AsyncSubject<InvokeOperation<T>>();
try
{
call(iot =>
{
subject.OnNext(iot);
subject.OnCompleted();
});
}
catch (Exception ex)
{
subject.OnError(ex);
}
return subject.ObserveOn(scheduler);
});
}
Затем мне потребовался ряд частных методов расширения Reduce
, чтобы сократить количество различных вызовов служб до Action<Action<InvokeOperation<T>>>
делегатов:
private static Action<Action<InvokeOperation<T>>> Reduce<T>(
this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
object state)
{
return a => call(a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P, T>(
this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P p, object state)
{
return a => call(p, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, T>(
this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, object state)
{
return a => call(p1, p2, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, T>(
this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, P3 p3, object state)
{
return a => call(p1, p2, p3, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, P4, T>(
this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, P3 p3, P4 p4, object state)
{
return a => call(p1, p2, p3, p4, a, state);
}
Теперь я могу написатьпростые FromCallbackPattern
методы расширения:
public static Func<object, IObservable<InvokeOperation<T>>> FromCallbackPattern<T>(
this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return o => call.Reduce(o).FromCallbackPattern(scheduler);
}
public static Func<P, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T>(
this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p, o) => call.Reduce(p, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, T>(
this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, o) => call.Reduce(p1, p2, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, P3, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, T>(
this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, p3, o) => call.Reduce(p1, p2, p3, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, P3, P4, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, P4, T>(
this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, p3, p4, o) => call.Reduce(p1, p2, p3, p4, o).FromCallbackPattern(scheduler);
}
И, наконец, оригинальные методы расширения SomeObservableFunc
/ ObservableInvokeOperation
(теперь также переименованные в FromCallbackPattern
):
public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
this RiaService<T> service,
Func<RiaService<T>, Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P p, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, P3 p3, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, p3, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, P4, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, P3 p3, P4 p4, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, p3, p4, state));
}
Очевидно, что вам нужно заменить ссылки на RiaService<T>
на ваш тип класса обслуживания RIA.
Эти методы можно вызывать так:
IObservable<InvokeOperation<int>> obs1 =
service
.FromCallbackPattern(
s => s.SomeFunc,
new SomeData(),
null, // user state
Scheduler.ThreadPool);
IObservable<InvokeOperation<int>> obs2 =
service
.FromCallbackPattern(
s => s.SomeOtherFunc,
42, "Hello", 3.14159265,
null, // user state
Scheduler.ThreadPool);
Фу!Как это сейчас?