Как использовать Rx для опроса изображений через асинхронную службу WCF - PullRequest
1 голос
/ 26 сентября 2011

У меня есть асинхронная служба WCF, которая принимает URI и возвращает изображение (в виде потока).

Я хочу сделать это:

  • Убедитесь, что существует действительный канал WCF, если нет, создайте его
  • Выполните асинхронный вызов службы
  • В случае успеха сохраните изображение в переменную-член
  • Если я получу исключение, закройте канал
  • Если это не удастся или удастся, подождите 200 мс, а затем начните снова (зацикливание навсегда или до отмены)

До сих пор я придумал это чудовище:

    private void PollImage(string imageUri)
    {
        const int pollingHertz = 1;
        const int millisecondsTimeout = 1000 / pollingHertz;
        Thread.Sleep(millisecondsTimeout);

        if (_channel == null)
        {
            _channel = _channelFactory.CreateChannel();
        }

        var getImageFunc = Observable.FromAsyncPattern<string, Stream>
                                  (_channel.BeginGetImage, _channel.EndGetImage);

        getImageFunc(imageUri)
            .Finally(() => PollImage(imageUri))
            .Subscribe(
                stream => UpdateImageStream(imageUri, stream),
                ex =>
                    {
                        Trace.TraceError(ex.ToString());
                        ((ICommunicationObject) _channel).CloseOrAbort();
                        _channel = null;
                    });
    }

Я действительно хочу выучить Rx, но каждый раз, когда я пытаюсь, мне остается почесать голову.

Кто-нибудь хочет дать мне несколько советов по этому поводу? Спасибо

Ответы [ 3 ]

8 голосов
/ 27 сентября 2011

У меня есть решение для вас, но я собираюсь предложить изменить ваш PollImage метод, чтобы сделать его более похожим на Rx.

Подпись должна выглядеть так:

IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)

Вы должны рассматривать PollImage как наблюдаемую фабрику, и она фактически не будет запрашивать изображения, пока вы не подпишетесь на возвращаемую наблюдаемую. Преимущество этого подхода состоит в том, что он делает возможным отписаться - это требуется для последней точки маркера - и он четко разделяет код, который опрашивает изображения, и код, который обновляет локальные переменные.

Итак, вызов PollImage выглядит следующим образом:

PollImage(imageUri, TimeSpan.FromMilliseconds(200.0))
    .Subscribe(image =>
    {
        /* do save/update images here */
    });

И реализация выглядит так:

private IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
{
    Func<Stream, Image> getImageFromStream = st =>
    {
        /* read image from stream here */
    };

    return Observable.Create<Image>(o =>
    {
        if (_channel == null)
        {
            _channel = _channelFactory.CreateChannel();
        }

        var getImageFunc =
            Observable
                .FromAsyncPattern<string, Stream>(
                    _channel.BeginGetImage,
                    _channel.EndGetImage);

        var query =
            from ts in Observable.Timer(gapInterval)
            from stream in getImageFunc(imageUri)
            from img in Observable.Using(
                () => stream,
                st => Observable.Start(
                    () => getImageFromStream(st)))
            select img;

        return query.Do(img => { }, ex =>
        {
            Trace.TraceError(ex.ToString());
            ((ICommunicationObject)_channel).CloseOrAbort();
            _channel = null;
        }).Repeat().Retry().Subscribe(o);                   
    });
}

Наблюдаемая query ожидает завершения gapInterval, затем вызывает функцию WCF для возврата потока, а затем преобразует поток в изображение.

Внутреннее выражение return делает несколько вещей.

Сначала он использует оператор Do для захвата любых возникающих исключений и выполняет отслеживание и сброс канала, как и раньше.

Затем он вызывает .Repeat() для эффективного повторного выполнения запроса, заставляя его ждать gapInterval, прежде чем снова вызвать веб-сервис. Я мог бы использовать Observable.Interval вместо Observable.Timer в query и отбросить вызов до .Repeat(), но это означало бы, что вызовы веб-службы начинаются каждый gapInterval, а не ждут так долго после того, как он завершился в последний раз .

Затем он вызывает .Retry(), который эффективно перезапускает наблюдаемое, если обнаруживает исключение, так что подписчик никогда не видит исключение. Оператор Do фиксирует ошибки, так что все в порядке.

Наконец, он подписывает наблюдателя и возвращает IDisposable, позволяющий вызывающему коду отказаться от подписки.

Кроме реализации функции getImageFromStream, вот и все.

Теперь слово предостережения. Многие люди неправильно понимают, как работает подписка на наблюдаемые, и это может привести к трудностям в обнаружении ошибок.

Возьмем для примера:

var xs = Observable.Interval(TimeSpan.FromSeconds(1.0));

var s1 = xs.Subscribe(x => { });
var s2 = xs.Subscribe(x => { });

Оба s1 и s2 подписываются на xs, но вместо того, чтобы использовать один таймер, каждый из них создает таймер. У вас есть два экземпляра внутренней работы Observable.Interval, а не один.

Теперь это правильное поведение для наблюдаемых. В случае, если один из них выйдет из строя, другой не получит, потому что у них нет общих внутренних элементов - они изолированы друг от друга.

Однако в вашем коде (и моем в этом отношении) у вас есть потенциальная проблема с многопоточностью, потому что вы разделяете _channel для нескольких вызовов на PollImage. В случае сбоя одного вызова происходит сброс канала, что может привести к сбоям одновременных вызовов.

Я предлагаю создать новый канал для каждого вызова, чтобы избежать проблем с параллелизмом.

Надеюсь, это поможет.

0 голосов
/ 27 сентября 2011

Если вы действительно хотите использовать, тогда люди уже ответили на ваш вопрос, и если вы ищете альтернативные способы, я бы посоветовал взглянуть на TPL (объекты и т. Д.), Которые позволяют вам создавать объект задачи из шаблона метода Async. (ваш вызов веб-службы), а затем запустите задачу с токеном отмены, чтобы через некоторое время, если задача не была выполнена, вы могли отменить ее, вызвав метод отмены токена.

0 голосов
/ 27 сентября 2011

Это то, что я придумал (с некоторой помощью!) ... все еще не "идеально", но, похоже, работает.

Как сказал @Enigma, я теперь избавился от общего _channel и заменил его захваченным локальным var.Это работает, но я не понимаю Rx enuf, чтобы знать, если это плохой / ошибочный подход.Я подозреваю, что, по крайней мере, есть более чистый путь.

Кроме этого, моим главным возражением является Do (), где я вызываю EnsureChannel ... кажется немного вонючим.Но ... это работает ...

О, и я должен иметь _ (подчеркивание) в SelectMany, иначе GetImage больше не вызывается.

    private IDisposable PollImage(string imageUri)
    {
        ICameraServiceAsync channel = _channelFactory.CreateChannel();
        return Observable
            .Timer(TimeSpan.FromSeconds(0.2))
            .Do(_ => { channel = EnsureChannel(channel); })
            .SelectMany(_ =>
                Observable
                .FromAsyncPattern<string, Stream>(channel.BeginGetImage, channel.EndGetImage)(imageUri))
            .Retry()
            .Repeat()
            .Subscribe(stream => UpdateImageStream(imageUri, stream));
    }

    private ICameraServiceAsync EnsureChannel(ICameraServiceAsync channel)
    {
        var icc = channel as ICommunicationObject;
        if (icc != null)
        {
            var communicationState = icc.State; // Copy local for debug inspection
            if (communicationState == CommunicationState.Faulted)
            {
                icc.CloseOrAbort();
                channel = null;
            }
        }
        return channel ?? _channelFactory.CreateChannel();
    }
...