Я не могу признать решение Ричарда Сзалая приемлемым. Если вы запустите 100 запросов, а второй запрос завершится с ошибкой недоступности сервера (например), остальные 98 запросов будут прерваны. Вторая проблема, как пользовательский интерфейс будет реагировать на такие наблюдаемые? Слишком грустно.
Имея в виду главу 4.3 Rx Design Guidelines Я хотел выразить WebRequest, наблюдаемый через оператор Observable.Using (). Но WebRequest не одноразовый! Поэтому я определил DisposableWebRequest:
public class DisposableWebRequest : WebRequest, IDisposable
{
private static int _Counter = 0;
private readonly WebRequest _request;
private readonly int _index;
private volatile bool _disposed = false;
public DisposableWebRequest (string url)
{
this._request = WebRequest.Create(url);
this._index = ++DisposableWebRequest._Counter;
}
public override IAsyncResult BeginGetResponse(AsyncCallback callback, object state)
{
return this._request.BeginGetResponse(callback, state);
}
public override WebResponse EndGetResponse(IAsyncResult asyncResult)
{
Trace.WriteLine(string.Format("EndGetResponse index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId));
Trace.Flush();
if (!this._disposed)
{
return this._request.EndGetResponse(asyncResult);
}
else
{
return null;
}
}
public override WebResponse GetResponse()
{
return this._request.GetResponse();
}
public override void Abort()
{
this._request.Abort();
}
public void Dispose()
{
if(!this._disposed)
{
this._disposed = true;
Trace.WriteLine(string.Format("Disposed index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId ));
Trace.Flush();
this.Abort();
}
}
}
Затем я создаю окно WPF и помещаю в него две кнопки (Пуск и Стоп).
Итак, давайте создадим правильные запросы наблюдаемой коллекции.
Сначала определите наблюдаемую функцию создания URL-адреса:
Func<IObservable<string>> createUrlObservable = () =>
Observable
.Return("http://yahoo.com")
.Repeat(100)
.OnStartup(() =>
{
this._failed = 0;
this._successed = 0;
});
На каждом URL-адресе мы должны создать веб-запрос, который можно обрезать, поэтому:
Func<string, IObservable<WebResponse>> createRequestObservable =
url =>
Observable.Using(
() => new DisposableWebRequest("http://yahoo.com"),
r =>
{
Trace.WriteLine("Queued " + url);
Trace.Flush();
return Observable
.FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)();
});
Дополнительно определите две наблюдаемые события, которые реагируют на нажатия кнопок «Пуск» / «Стоп»:
var startMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StartButton, "Click");
var stopMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StopButton, "Click");
Итак, кирпичи готовы, время их составлять (я делаю это в конструкторе представления сразу после InitializeComponent ()):
startMouseDown
.SelectMany(down =>
createUrlObservable()
.SelectMany(url => createRequestObservable(url)
.TakeUntil(stopMouseDown)
.Select(r => r.GetResponseStream())
.Do(s =>
{
using (var sr = new StreamReader(s))
{
Trace.WriteLine(sr.ReadLine());
Trace.Flush();
}
})
.Do(r => this._successed++)
.HandleError(e => this._failed++))
.ObserveOnDispatcher()
.Do(_ => this.RefresLabels(),
e => { },
() => this.RefresLabels())
)
.Subscribe();
Вы можете задаться вопросом о функции "HandleError ()". Если в вызове EndGetResponse () возникло исключение (я отключил сетевое соединение для его воспроизведения) и не было зафиксировано в наблюдаемом запросе - это приведет к сбою наблюдаемой startMouseDown. HandleError автоматически отлавливает исключение, выполняет действие и вместо вызова OnError для следующего наблюдателя вызывает OnCompleted:
public static class ObservableExtensions
{
public static IObservable<TSource> HandleError<TSource>(this IObservable<TSource> source, Action<Exception> errorHandler)
{
return Observable.CreateWithDisposable<TSource>(observer =>
{
return source.Subscribe(observer.OnNext,
e =>
{
errorHandler(e);
//observer.OnError(e);
observer.OnCompleted();
},
observer.OnCompleted);
});
}
}
Последнее необъяснимое место - метод RefreshLabels, который обновляет элементы управления пользовательского интерфейса:
private void RefresLabels()
{
this.SuccessedLabel.Content = string.Format("Successed {0}", this._successed);
this.FailedLabel.Content = string.Format("Failed {0}", this._failed);
}