Очистка реактивных расширений - PullRequest
2 голосов
/ 22 декабря 2010

Если у вас длинная цепочка вызовов с использованием rx, например:

var responses = collectionOfHttpRequests.ToObservable()
.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)
.Select(res => res.GetResponseBodyString()) // Extension method to get the body of the request
.Subscribe();

и затем до завершения операции вы вызываете удаление, будут ли HTTP-запросы отменяться, закрываться и утилизироваться должным образом, или мне нужно каким-то образом выбирать httprequests из цепочек методов и удалять их по отдельности?

У меня есть вещь, при которой можно иметь несколько HTTP-запросов одновременно, и мне нужно иметь возможность отменить (не игнорировать) некоторые / все из них для экономии сетевого трафика.

Ответы [ 2 ]

2 голосов
/ 20 января 2011

Я не могу признать решение Ричарда Сзалая приемлемым. Если вы запустите 100 запросов, а второй запрос завершится с ошибкой недоступности сервера (например), остальные 98 запросов будут прерваны. Вторая проблема, как пользовательский интерфейс будет реагировать на такие наблюдаемые? Слишком грустно.

Имея в виду главу 4.3 Rx Design Guidelines Я хотел выразить WebRequest, наблюдаемый через оператор Observable.Using (). Но WebRequest не одноразовый! Поэтому я определил DisposableWebRequest:

public class DisposableWebRequest : WebRequest, IDisposable
{
    private static int _Counter = 0;

    private readonly WebRequest _request;
    private readonly int _index;

    private volatile bool _disposed = false;

    public DisposableWebRequest (string url)
    {
        this._request = WebRequest.Create(url);
        this._index = ++DisposableWebRequest._Counter;
    }

    public override IAsyncResult BeginGetResponse(AsyncCallback callback, object state)
    {
        return this._request.BeginGetResponse(callback, state);
    }

    public override WebResponse EndGetResponse(IAsyncResult asyncResult)
    {
        Trace.WriteLine(string.Format("EndGetResponse index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId));
        Trace.Flush();
        if (!this._disposed)
        {
            return this._request.EndGetResponse(asyncResult);
        }
        else
        {
            return null;
        }
    }

    public override WebResponse GetResponse()
    {
        return this._request.GetResponse();
    }

    public override void Abort()
    {
        this._request.Abort();
    }

    public void Dispose()
    {
        if(!this._disposed)
        {
            this._disposed = true;

            Trace.WriteLine(string.Format("Disposed index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId ));
            Trace.Flush();
            this.Abort();
        }
    }
}

Затем я создаю окно WPF и помещаю в него две кнопки (Пуск и Стоп).

Итак, давайте создадим правильные запросы наблюдаемой коллекции. Сначала определите наблюдаемую функцию создания URL-адреса:

        Func<IObservable<string>> createUrlObservable = () =>
            Observable
                .Return("http://yahoo.com")
                .Repeat(100)
                .OnStartup(() =>
                {
                    this._failed = 0;
                    this._successed = 0;
                });

На каждом URL-адресе мы должны создать веб-запрос, который можно обрезать, поэтому:

        Func<string, IObservable<WebResponse>> createRequestObservable = 
            url => 
            Observable.Using(
                () => new DisposableWebRequest("http://yahoo.com"),
                r =>
                {
                    Trace.WriteLine("Queued " + url);
                    Trace.Flush();
                    return Observable
                        .FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)();
                });

Дополнительно определите две наблюдаемые события, которые реагируют на нажатия кнопок «Пуск» / «Стоп»:

        var startMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StartButton, "Click");
        var stopMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StopButton, "Click");

Итак, кирпичи готовы, время их составлять (я делаю это в конструкторе представления сразу после InitializeComponent ()):

        startMouseDown
            .SelectMany(down =>
                createUrlObservable()
                    .SelectMany(url => createRequestObservable(url)
                        .TakeUntil(stopMouseDown)
                        .Select(r => r.GetResponseStream())
                        .Do(s =>
                            {
                                using (var sr = new StreamReader(s))
                                {
                                    Trace.WriteLine(sr.ReadLine());
                                    Trace.Flush();
                                }

                            })
                        .Do(r => this._successed++)
                        .HandleError(e => this._failed++))
                        .ObserveOnDispatcher()
                        .Do(_ => this.RefresLabels(),
                            e => { },
                            () => this.RefresLabels())

                        )
            .Subscribe();

Вы можете задаться вопросом о функции "HandleError ()". Если в вызове EndGetResponse () возникло исключение (я отключил сетевое соединение для его воспроизведения) и не было зафиксировано в наблюдаемом запросе - это приведет к сбою наблюдаемой startMouseDown. HandleError автоматически отлавливает исключение, выполняет действие и вместо вызова OnError для следующего наблюдателя вызывает OnCompleted:

public static class ObservableExtensions
{
    public static IObservable<TSource> HandleError<TSource>(this IObservable<TSource> source, Action<Exception> errorHandler)
    {
        return Observable.CreateWithDisposable<TSource>(observer =>
            {
                return source.Subscribe(observer.OnNext, 
                    e => 
                    { 
                        errorHandler(e);
                        //observer.OnError(e);
                        observer.OnCompleted();
                    },
                    observer.OnCompleted);
            });
    }
}

Последнее необъяснимое место - метод RefreshLabels, который обновляет элементы управления пользовательского интерфейса:

    private void RefresLabels()
    {
        this.SuccessedLabel.Content = string.Format("Successed {0}", this._successed);
        this.FailedLabel.Content = string.Format("Failed {0}", this._failed);
    }
2 голосов
/ 22 декабря 2010

Цепочка операторов Rx очистится после завершения последовательности, ошибок или удаления подписки.Однако каждый оператор будет очищать только то, что он должен очистить.Например, FromEvent откажется от подписки на событие.

В вашем случае отмена не поддерживается асинхронным шаблоном Begin/End , поэтому Rx ничего не может отменить.Однако вы можете использовать Finally для вызова HttpWebRequest.Abort.

var observableRequests = collectionOfHttpRequests.ToObservable();

var responses = observableRequests
    .SelectMany(req => 
        Observable.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)()
    )
    .Select(resp => resp.GetResponseBodyString())
    .Finally(() =>
    {
        observableRequests
            .Subscribe(req => req.Abort());
    })
    .Subscribe();
...