Как преобразовать эту функцию с обратным вызовом InvokeOperation в Reactive Extensions? - PullRequest
3 голосов
/ 18 октября 2011

У меня есть служба данных RIA Services, которая имеет несколько вызовов функций, которые выглядят следующим образом:

public InvokeOperation<T> SomeFunc(
    SomeData data,
    Action<InvokeOperation<T>> callback,
    object userState)

Как бы я использовал это с Reactive Extensions, чтобы я мог подписаться на обратный вызов и получить результат InvokeOperation?


Обновление : вот моя текущая реализация гибридного решения Enigmativity.Мне нужно было фактическое InvokeOperation, а не просто значение, поскольку InvokeOperation UserState может быть полезным.Следует отметить, что я вообще не тестировал обработку ошибок.

public static class ObservableEx
{
      public static IObservable<InvokeOperation<T>> ObservableInvokeOperation<T, Tdat> (
         Func<Tdat, Action<InvokeOperation<T>>, object, InvokeOperation<T>> func,
         Tdat data,
         System.Reactive.Concurrency.IScheduler scheduler )
      {
         return
             Observable.Defer<InvokeOperation<T>>( () =>
                 FromCallbackPattern<Tdat, T>( func, scheduler )
                     .Invoke( data ) );
      }

      private static Func<P, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T> (
          Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
          IScheduler scheduler )
      {
         return p =>
         {
            var subject = new AsyncSubject<InvokeOperation<T>>();
            try
            {
               call( p, iot =>
               {
                  if ( iot.HasError )
                  {
                     subject.OnError( iot.Error );
                  }
                  else
                  {
                     subject.OnNext( iot );
                     subject.OnCompleted();
                  }
               }, p );
            }
            catch ( Exception ex )
            {
               subject.OnError( ex );
            }
            return subject.ObserveOn( scheduler );
         };
      }   
}

использование данной функции

public InvokeOperation<int> SomeFunc(SomeData data, Action<InvokeOperation<int>> callback, object userState)

var myobs = ObservableEx.ObservableInvokeOperation<int, SomeData>( myRIAContext.SomeFunc, data, Scheduler.ThreadPool );

Это прекрасно работает для любой функции, которая соответствует данной сигнатуре функции.К сожалению, теперь я столкнулся с некоторыми вариантами, такими как

Func<T1, Action<InvokeOperation<T>>, object>
Func<T1, T2, Action<InvokeOperation<T>>, object>

У кого-нибудь есть какие-либо предложения по преобразованию этого, чтобы можно было обрабатывать любой метод InvokeOperation, который я хочу использовать?

Ответы [ 2 ]

3 голосов
/ 18 октября 2011

EDIT1 : см. Ниже гибридное решение, основанное на ответе Пола Беттса и моем.

EDIT2 : См. Ниже решение "третьего поколения"основанный на обновлении ОП.


Обратный вызов немного сложен, и я должен сказать, что превращение этого в наблюдаемое - хороший способ.

у меня есть подход, который работал для меня.

Основной подход - превратить операцию SomeFunc в Func<T>, а затем вызвать Observable.Start для этого.Я обернул это в Observable.Create, чтобы сохранить его в чистоте, и я добавил обработку ошибок.Я провел базовое тестирование, но ничего слишком надежного.

Использование кода выглядит следующим образом:

var obs = service.SomeObservableFunc(new SomeData(), Scheduler.ThreadPool);
obs.Subscribe(t => { /* success */ }, ex => { /* error */ });

Я предположил, что ваш класс обслуживания RIA равен RiaService<T>, и построилSomeObservableFunc метод расширения, подобный следующему:

    public static IObservable<T> SomeObservableFunc<T>(
        this RiaService<T> service,
        SomeData data,
        IScheduler scheduler)
    {
        return Observable.Create<T>(o =>
        {
            var error = (Exception)null;
            Func<T> call = () =>
            {
                var result = default(T);
                var mre = new ManualResetEvent(false);
                Action<InvokeOperation<T>> callback = iot =>
                {
                    try
                    {
                        if (iot.HasError)
                        {
                            error = iot.Error;
                        }
                        else
                        {
                            result = iot.Value;
                        }
                    }
                    catch (Exception ex)
                    {
                        error = ex;
                    }
                    finally
                    {
                        mre.Set();
                    }
                };
                try
                {
                    service.SomeFunc(data, callback, null);
                    mre.WaitOne();
                }
                catch (Exception ex)
                {
                    error = ex;
                }
                return result;
            };

            return Observable
                .Start(call, scheduler)
                .Subscribe(t =>
                {
                    try
                    {
                        if (error == null)
                        {
                            o.OnNext(t);
                        }
                        else
                        {
                            o.OnError(error);
                        }
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }
                }, ex => o.OnError(ex), () =>
                {
                    if (error == null)
                    {
                        o.OnCompleted();
                    }
                });
        });
    }

Выкрикни, если это работает для тебя.


EDIT1

Мне понравилось решение Пола Беттса, потому что оно не помоглоне использовал ManualResetEvent, но он не компилировался и не обрабатывал внутренние ошибки, которые могут возникнуть во время вызова службы RIA, поэтому я создал следующее гибридное решение.

Мой метод расширения сейчасвыглядит следующим образом:

    public static IObservable<T> SomeObservableFunc<T>(
        this RiaService<T> service,
        SomeData data,
        IScheduler scheduler)
    {
        return
            Observable.Defer<T>(() =>
                FromCallbackPattern<SomeData, T>(service.SomeFunc, scheduler)
                    .Invoke(data));
    }

И это использует переработанный FromCallbackPattern, изначально созданный Полом Беттсом:

    private static Func<P, IObservable<T>> FromCallbackPattern<P, T>(
        Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return p =>
        {
            var subject = new AsyncSubject<T>();
            try
            {
                call(p, iot =>
                {
                    if (iot.HasError)
                    {
                        subject.OnError(iot.Error);
                    }
                    else
                    {
                        subject.OnNext(iot.Value);
                        subject.OnCompleted();
                    }
                }, null);
            }
            catch (Exception ex)
            {
                subject.OnError(ex);
            }
            return subject.ObserveOn(scheduler);
        };
    }

Он работает против моего тестового кода, и я думаю, что это в целом лучшерешение.


EDIT2

Эта версия решения предназначена для передачи различного количества параметров и пользовательского состояния в методы расширения FromCallbackPattern.

я начал с этого общего назначения FromCallbackPattern расширение меняthod:

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
        this Action<Action<InvokeOperation<T>>> call,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
        {
            var subject = new AsyncSubject<InvokeOperation<T>>();
            try
            {
                call(iot =>
                {
                    subject.OnNext(iot);
                    subject.OnCompleted();
                });
            }
            catch (Exception ex)
            {
                subject.OnError(ex);
            }
            return subject.ObserveOn(scheduler);
        });
    }

Затем мне потребовался ряд частных методов расширения Reduce, чтобы сократить количество различных вызовов служб до Action<Action<InvokeOperation<T>>> делегатов:

    private static Action<Action<InvokeOperation<T>>> Reduce<T>(
        this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        object state)
    {
        return a => call(a, state);
    }

    private static Action<Action<InvokeOperation<T>>> Reduce<P, T>(
        this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        P p, object state)
    {
        return a => call(p, a, state);
    }       

    private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, T>(
        this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        P1 p1, P2 p2, object state)
    {
        return a => call(p1, p2, a, state);
    }

    private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, T>(
        this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        P1 p1, P2 p2, P3 p3, object state)
    {
        return a => call(p1, p2, p3, a, state);
    }

    private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, P4, T>(
        this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        P1 p1, P2 p2, P3 p3, P4 p4, object state)
    {
        return a => call(p1, p2, p3, p4, a, state);
    }

Теперь я могу написатьпростые FromCallbackPattern методы расширения:

    public static Func<object, IObservable<InvokeOperation<T>>> FromCallbackPattern<T>(
        this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return o => call.Reduce(o).FromCallbackPattern(scheduler);
    }

    public static Func<P, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T>(
        this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return (p, o) => call.Reduce(p, o).FromCallbackPattern(scheduler);
    }

    public static Func<P1, P2, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, T>(
        this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return (p1, p2, o) => call.Reduce(p1, p2, o).FromCallbackPattern(scheduler);
    }

    public static Func<P1, P2, P3, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, T>(
        this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return (p1, p2, p3, o) => call.Reduce(p1, p2, p3, o).FromCallbackPattern(scheduler);
    }

    public static Func<P1, P2, P3, P4, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, P4, T>(
        this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return (p1, p2, p3, p4, o) => call.Reduce(p1, p2, p3, p4, o).FromCallbackPattern(scheduler);
    }

И, наконец, оригинальные методы расширения SomeObservableFunc / ObservableInvokeOperation (теперь также переименованные в FromCallbackPattern):

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
        this RiaService<T> service,
        Func<RiaService<T>, Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
        object state,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
            call(service).FromCallbackPattern(scheduler)
                .Invoke(state));
    }

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<P, T>(
        this RiaService<T> service,
        Func<RiaService<T>, Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
        P p, object state,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
            call(service).FromCallbackPattern(scheduler)
                .Invoke(p, state));
    }

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, T>(
        this RiaService<T> service,
        Func<RiaService<T>, Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
        P1 p1, P2 p2, object state,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
            call(service).FromCallbackPattern(scheduler)
                .Invoke(p1, p2, state));
    }

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, T>(
        this RiaService<T> service,
        Func<RiaService<T>, Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
        P1 p1, P2 p2, P3 p3, object state,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
            call(service).FromCallbackPattern(scheduler)
                .Invoke(p1, p2, p3, state));
    }   

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, P4, T>(
        this RiaService<T> service,
        Func<RiaService<T>, Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
        P1 p1, P2 p2, P3 p3, P4 p4, object state,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
            call(service).FromCallbackPattern(scheduler)
                .Invoke(p1, p2, p3, p4, state));
    }           

Очевидно, что вам нужно заменить ссылки на RiaService<T> на ваш тип класса обслуживания RIA.

Эти методы можно вызывать так:

IObservable<InvokeOperation<int>> obs1 =
    service
        .FromCallbackPattern(
            s => s.SomeFunc,
            new SomeData(),
            null, // user state
            Scheduler.ThreadPool);

IObservable<InvokeOperation<int>> obs2 =
    service
        .FromCallbackPattern(
            s => s.SomeOtherFunc,
            42, "Hello", 3.14159265,
            null, // user state
            Scheduler.ThreadPool);

Фу!Как это сейчас?

1 голос
/ 18 октября 2011

Попробуй:

public Func<T1, IObservable<TRet>> FromCallbackPattern<T1, TRet>(Action<T1, Action<TRet>, object> originalMethod)
{
    return new Func<T1, IObservable<TRet>>((param1) => {
        var subject = new AsyncSubject<TRet>();

        try {
            originalMethod(param1, (result) => {
                subject.OnNext(result);
                subject.OnCompleted();
            }, null);
        } catch (Exception ex) {
            subject.OnError(ex);
        }

        return subject;
    });
}

Используйте это так:

var rxSomeFunc = FromCallbackPattern(someObj.SomeFunc);

rxSomeFunc(theData).Subscribe(x => ...);
...