Использование Rx для упрощения асинхронного запроса веб-службы Silverlight - PullRequest
4 голосов
/ 27 сентября 2010

Я написал упрощенную клиентскую библиотеку Silverlight для своего веб-сервиса WCF с использованием Rx, однако иногда замечаю, что пропускаю завершенные события.

public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
           from request in Observable.ToAsync<string>(client.GetReportDataAsync)(reportName)
           from result in Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted").Take(1)
           from close in this.CloseClient(client)
           select result.EventArgs.Result;
}

Я полагаю, что проблема вызвана тем, что веб-служба вызывается и возвращается до подписки на завершенное событие. Я не могу понять, как заставить Rx подписаться на событие до вызова Async. Я пробовал StartWith, но для этого нужно, чтобы типы ввода и вывода были одинаковыми, есть идеи?

Ответы [ 2 ]

7 голосов
/ 28 сентября 2010

Похоже, лучший ответ - использовать Observable.CreateWithDisposable ()

например,

public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
            from completed in Observable.CreateWithDisposable<GetReportDataCompletedEventArgs>(observer =>
                {
                    var subscription = Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted")
                        .Take(1)
                        .Select(e => e.EventArgs)
                        .Subscribe(observer);
                    client.GetReportDataAsync(reportName);
                    return subscription;
                })
            from close in this.CloseClient(client)
            select completed.Result;
}

. Чтобы упростить эту работу, я реорганизовал CreateWithDisposable в общую функцию,используется со всеми моими вызовами веб-службы, включая автоматическое определение имени события из типа аргументов события:

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start) where T : AsyncCompletedEventArgs
{
    if (typeof(T) == typeof(AsyncCompletedEventArgs))
    {
        throw new InvalidOperationException("Event arguments type cannot be used to determine event name, use event name overload instead.");
    }

    string completedEventName = typeof(T).Name.TrimEnd("EventArgs");
    return CallService<T>(serviceClient, start, completedEventName);
}

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start, string completedEventName) where T : AsyncCompletedEventArgs
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subscription = Observable.FromEvent<T>(serviceClient, completedEventName).Take(1).Select(e => e.EventArgs).Subscribe(observer);
        start();
        return subscription;
    });
}

// Example usage:
public IObservable<XElement> GetReport(string reportName)
{
    return from client in Observable.Return(new WebServiceClient())
            from completed in this.CallService<GetReportDataCompletedEventArgs>(client, () => client.GetReportDataAsync(reportName))
            from close in this.CloseClient(client)
            select completed.Result;
}

/// <summary>
/// Asynchronously closes the web service client
/// </summary>
/// <param name="client">The web service client to be closed.</param>
/// <returns>Returns a cold observable sequence of a single success Unit.</returns>
private IObservable<AsyncCompletedEventArgs> CloseClient(WebServiceClient client)
{
    return this.CallService<AsyncCompletedEventArgs>(client, client.CloseAsync, "CloseCompleted");
}

Надеюсь, это поможет кому-то еще!

1 голос
/ 03 декабря 2010

Мне нужно использовать общие WebClient.DownloadStringAsync, поэтому здесь моя версия.

Сначала оберните событие:

public static IObservable<IEvent<DownloadStringCompletedEventArgs>>
    GetDownloadStringObservableEvent(this WebClient wc)
{
    return Observable.FromEvent<DownloadStringCompletedEventArgs>(
        wc, "DownloadStringCompleted");
}

Затем создайте метод расширения:

public static IObservable<string> GetDownloadString(this WebClient wc, Uri uri)
{
    return Observable.CreateWithDisposable<string>(
        observer => {
            // Several downloads may be going on simultaneously. The token allows
            // us to establish that we're retrieving the right one.
            Guid token = Guid.NewGuid();
            var stringDownloaded = wc.GetDownloadStringObservableEvent()
                    .Where(evt => ((Guid)evt.EventArgs.UserState) == token)
                    .Take(1);        //implicitly unhooks handler after event is received
            bool errorOccurred = false;
            IDisposable unsubscribe =
                stringDownloaded.Subscribe(
                    // OnNext action
                    ev => {
                        // Propagate the exception if one is reported.
                        if (ev.EventArgs.Error != null) {
                            errorOccurred = true;
                            observer.OnError(ev.EventArgs.Error);
                        } else if (!ev.EventArgs.Cancelled) {
                            observer.OnNext(ev.EventArgs.Result);
                        }
                    },
                    // OnError action (propagate exception)
                    ex => observer.OnError(ex),
                    // OnCompleted action
                    () => {
                        if (!errorOccurred) {
                            observer.OnCompleted();
                        }
                    });
            try {
                wc.DownloadStringAsync(uri, token);
            } catch (Exception ex) {
                observer.OnError(ex);
            }
            return unsubscribe;
        }
    );
}

Использование простое:

wc.GetDownloadString(new Uri("http://myservice"))
    .Subscribe(resultCallback , errorCallback);
...