У меня есть решение для вас, но я собираюсь предложить изменить ваш PollImage
метод, чтобы сделать его более похожим на Rx.
Подпись должна выглядеть так:
IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
Вы должны рассматривать PollImage
как наблюдаемую фабрику, и она фактически не будет запрашивать изображения, пока вы не подпишетесь на возвращаемую наблюдаемую. Преимущество этого подхода состоит в том, что он делает возможным отписаться - это требуется для последней точки маркера - и он четко разделяет код, который опрашивает изображения, и код, который обновляет локальные переменные.
Итак, вызов PollImage
выглядит следующим образом:
PollImage(imageUri, TimeSpan.FromMilliseconds(200.0))
.Subscribe(image =>
{
/* do save/update images here */
});
И реализация выглядит так:
private IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
{
Func<Stream, Image> getImageFromStream = st =>
{
/* read image from stream here */
};
return Observable.Create<Image>(o =>
{
if (_channel == null)
{
_channel = _channelFactory.CreateChannel();
}
var getImageFunc =
Observable
.FromAsyncPattern<string, Stream>(
_channel.BeginGetImage,
_channel.EndGetImage);
var query =
from ts in Observable.Timer(gapInterval)
from stream in getImageFunc(imageUri)
from img in Observable.Using(
() => stream,
st => Observable.Start(
() => getImageFromStream(st)))
select img;
return query.Do(img => { }, ex =>
{
Trace.TraceError(ex.ToString());
((ICommunicationObject)_channel).CloseOrAbort();
_channel = null;
}).Repeat().Retry().Subscribe(o);
});
}
Наблюдаемая query
ожидает завершения gapInterval
, затем вызывает функцию WCF для возврата потока, а затем преобразует поток в изображение.
Внутреннее выражение return
делает несколько вещей.
Сначала он использует оператор Do
для захвата любых возникающих исключений и выполняет отслеживание и сброс канала, как и раньше.
Затем он вызывает .Repeat()
для эффективного повторного выполнения запроса, заставляя его ждать gapInterval
, прежде чем снова вызвать веб-сервис. Я мог бы использовать Observable.Interval
вместо Observable.Timer
в query
и отбросить вызов до .Repeat()
, но это означало бы, что вызовы веб-службы начинаются каждый gapInterval
, а не ждут так долго после того, как он завершился в последний раз .
Затем он вызывает .Retry()
, который эффективно перезапускает наблюдаемое, если обнаруживает исключение, так что подписчик никогда не видит исключение. Оператор Do
фиксирует ошибки, так что все в порядке.
Наконец, он подписывает наблюдателя и возвращает IDisposable
, позволяющий вызывающему коду отказаться от подписки.
Кроме реализации функции getImageFromStream
, вот и все.
Теперь слово предостережения. Многие люди неправильно понимают, как работает подписка на наблюдаемые, и это может привести к трудностям в обнаружении ошибок.
Возьмем для примера:
var xs = Observable.Interval(TimeSpan.FromSeconds(1.0));
var s1 = xs.Subscribe(x => { });
var s2 = xs.Subscribe(x => { });
Оба s1
и s2
подписываются на xs
, но вместо того, чтобы использовать один таймер, каждый из них создает таймер. У вас есть два экземпляра внутренней работы Observable.Interval
, а не один.
Теперь это правильное поведение для наблюдаемых. В случае, если один из них выйдет из строя, другой не получит, потому что у них нет общих внутренних элементов - они изолированы друг от друга.
Однако в вашем коде (и моем в этом отношении) у вас есть потенциальная проблема с многопоточностью, потому что вы разделяете _channel
для нескольких вызовов на PollImage
. В случае сбоя одного вызова происходит сброс канала, что может привести к сбоям одновременных вызовов.
Я предлагаю создать новый канал для каждого вызова, чтобы избежать проблем с параллелизмом.
Надеюсь, это поможет.