Как обрабатывать несколько событий в Rx (с зависимостями) - PullRequest
2 голосов
/ 14 сентября 2011

У меня есть несколько вызовов WCF, скажем, A, B, C, D, E. Они находятся в приложении SL (поэтому, я думаю, мне нужно быть осторожным с многопоточностью и тому подобным).
Я хочу, чтобы B запускался после завершения A, а C мог работать одновременно с ними. Я хочу, чтобы D побежал после того, как все трое закончили. И дополнительно на основе условия (просто если ...) я хочу, чтобы E тоже запускался (одновременно с любым из вышеперечисленных).

Я только что скачал Rx и чувствую, что это инструмент для этой работы. Однако я еще не понимаю всех операторов, уловок и тому подобного.

Можно ли это сделать? Как? Пожалуйста, объясните операторов, необходимых для этого.

РЕДАКТИРОВАТЬ добавить: для простоты, скажем, все эти сервисы принимают строку (или int) в качестве параметра и возвращают что-то, что реализует сложный интерфейс, поэтому события являются простыми EventHandlers ( не универсальный), и я приведу их к их специальному интерфейсу в коде обработчика.
Некоторые входные данные доступны в начале, некоторые поступают из результатов предыдущих услуг (например, строка ввода B вычисляется на основе результата A)

1 Ответ

2 голосов
/ 14 сентября 2011

Я принял во внимание редактирование и собрал решение.

Я должен сказать, что Microsoft очень сильно затруднила написание простых вызовов WCF в Silverlight.Обработка событий для получения возвращаемых данных из асинхронных вызовов просто болезненна.

Сказав, что с небольшим количеством кода для копирования и вставки Rx отлично справляется с устранением сложности.

Вот то, что я придумал как окончательный код клиента:

        var service = new Service1Client();

        var abs =
            from a in service.AObservable("a")
            from b in service.BObservable(a)
            select new { a, b };

        var cs = service.CObservable("c");

        var abcs = abs.Zip(cs, (ab, c) => new { ab.a, ab.b, c });

        var ds =
            from abc in abcs
            from d in service.DObservable(abc.a, abc.b, abc.c)
            select String.Format(
                "A={0} B={1} C={2} D={3}",
                abc.a, abc.b, abc.c, d);

        var condition = true;

        var es = from e in condition
                    ? service.EObservable("e")
                    : Observable.Empty<string>()
                 select String.Format("E={0}", e);

        ds.Merge(es).ObserveOnDispatcher()
                    .Subscribe(r => this.label1.Content += " " + r);

С одной стороны, этот код не так хорош, как я надеялся, но с другой стороны, я неЯ думаю, я мог бы сделать это немного лучше.Надеюсь, вы сможете увидеть, как выполняются ваши требования.

Теперь о плохих новостях - методы расширения должны быть созданы для каждого прокси-сервера службы.Способ определения EventArgs не позволяет получить результаты без использования специального справочного класса службы в качестве параметра this для методов расширения.

С положительной стороны, хотя код в основном«копировать и вставить» с очень небольшими изменениями.

Кроме того, вам нужно создать столько методов расширения, сколько параметров вы используете, плюс один.

Вот методы расширения, которые мне были нужныопределить для моего примера кода:

public static class Service1ClientEx
{
    public static Func<IObservable<R>> ToObservableFunc<EA, R>(this Service1Client service, Action<Service1Client> async, Action<Service1Client, EventHandler<EA>> ah, Action<Service1Client, EventHandler<EA>> rh, Func<EA, R> resultSelector)
        where EA : System.ComponentModel.AsyncCompletedEventArgs
    {
        return () => Observable.Create<R>(o =>
        {
            var response =
                from ep in Observable.FromEventPattern<EA>(h => ah(service, h), h => rh(service, h))
                select resultSelector(ep.EventArgs);

            var subscription = response.Take(1).Subscribe(o);

            async(service);

            return subscription;
        }).ObserveOn(Scheduler.ThreadPool);
    }

    public static Func<P0, IObservable<R>> ToObservableFunc<P0, EA, R>(this Service1Client service, Action<Service1Client, P0> async, Action<Service1Client, EventHandler<EA>> ah, Action<Service1Client, EventHandler<EA>> rh, Func<EA, R> resultSelector)
        where EA : System.ComponentModel.AsyncCompletedEventArgs
    {
        return p0 => Observable.Create<R>(o =>
        {
            var response =
                from ep in Observable.FromEventPattern<EA>(h => ah(service, h), h => rh(service, h))
                select resultSelector(ep.EventArgs);

            var subscription = response.Take(1).Subscribe(o);

            async(service, p0);

            return subscription;
        }).ObserveOn(Scheduler.ThreadPool);
    }

    public static Func<P0, P1, IObservable<R>> ToObservableFunc<P0, P1, EA, R>(this Service1Client service, Action<Service1Client, P0, P1> async, Action<Service1Client, EventHandler<EA>> ah, Action<Service1Client, EventHandler<EA>> rh, Func<EA, R> resultSelector)
        where EA : System.ComponentModel.AsyncCompletedEventArgs
    {
        return (p0, p1) => Observable.Create<R>(o =>
        {
            var response =
                from ep in Observable.FromEventPattern<EA>(h => ah(service, h), h => rh(service, h))
                select resultSelector(ep.EventArgs);

            var subscription = response.Take(1).Subscribe(o);

            async(service, p0, p1);

            return subscription;
        }).ObserveOn(Scheduler.ThreadPool);
    }

    public static Func<P0, P1, P2, IObservable<R>> ToObservableFunc<P0, P1, P2, EA, R>(this Service1Client service, Action<Service1Client, P0, P1, P2> async, Action<Service1Client, EventHandler<EA>> ah, Action<Service1Client, EventHandler<EA>> rh, Func<EA, R> resultSelector)
        where EA : System.ComponentModel.AsyncCompletedEventArgs
    {
        return (p0, p1, p2) => Observable.Create<R>(o =>
        {
            var response =
                from ep in Observable.FromEventPattern<EA>(h => ah(service, h), h => rh(service, h))
                select resultSelector(ep.EventArgs);

            var subscription = response.Take(1).Subscribe(o);

            async(service, p0, p1, p2);

            return subscription;
        }).ObserveOn(Scheduler.ThreadPool);
    }

    public static Func<P0, P1, P2, P3, IObservable<R>> ToObservableFunc<P0, P1, P2, P3, EA, R>(this Service1Client service, Action<Service1Client, P0, P1, P2, P3> async, Action<Service1Client, EventHandler<EA>> ah, Action<Service1Client, EventHandler<EA>> rh, Func<EA, R> resultSelector)
        where EA : System.ComponentModel.AsyncCompletedEventArgs
    {
        return (p0, p1, p2, p3) => Observable.Create<R>(o =>
        {
            var response =
                from ep in Observable.FromEventPattern<EA>(h => ah(service, h), h => rh(service, h))
                select resultSelector(ep.EventArgs);

            var subscription = response.Take(1).Subscribe(o);

            async(service, p0, p1, p2, p3);

            return subscription;
        }).ObserveOn(Scheduler.ThreadPool);
    }

    public static IObservable<string> AObservable(this Service1Client service, string data)
    {
        return service
            .ToObservableFunc<string, ACompletedEventArgs, string>((s, p0) => s.AAsync(p0), (s, h) => s.ACompleted += h, (s, h) => s.ACompleted -= h, ea => ea.Result)
            .Invoke(data);
    }

    public static IObservable<string> BObservable(this Service1Client service, string data)
    {
        return service
            .ToObservableFunc<string, BCompletedEventArgs, string>((s, p0) => s.BAsync(p0), (s, h) => s.BCompleted += h, (s, h) => s.BCompleted -= h, ea => ea.Result)
            .Invoke(data);
    }

    public static IObservable<string> CObservable(this Service1Client service, string data)
    {
        return service
            .ToObservableFunc<string, CCompletedEventArgs, string>((s, p0) => s.CAsync(p0), (s, h) => s.CCompleted += h, (s, h) => s.CCompleted -= h, ea => ea.Result)
            .Invoke(data);
    }

    public static IObservable<string> DObservable(this Service1Client service, string dataA, string dataB, string dataC)
    {
        return service
            .ToObservableFunc<string, string, string, DCompletedEventArgs, string>((s, p0, p1, p2) => s.DAsync(p0, p1, p2), (s, h) => s.DCompleted += h, (s, h) => s.DCompleted -= h, ea => ea.Result)
            .Invoke(dataA, dataB, dataC);
    }

    public static IObservable<string> EObservable(this Service1Client service, string data)
    {
        return service
            .ToObservableFunc<string, ECompletedEventArgs, string>((s, p0) => s.EAsync(p0), (s, h) => s.ECompleted += h, (s, h) => s.ECompleted -= h, ea => ea.Result)
            .Invoke(data);
    }
}

Как я уже сказал, болезненно.

Вам, конечно, не нужно создавать такие методы расширения для использования Rx с WCF в Silverlight, ноя чувствую, что после того, как вы их создали, клиентский код становится намного проще для работы.

Дайте мне знать, если вам нужно что-то пояснить.

...