Параллельные запросы HttpWeb с реактивными расширениями - PullRequest
4 голосов
/ 08 сентября 2011

У меня есть класс «Изображение» с тремя свойствами: Url, Id, Content.У меня есть список из 10 таких изображений.Это приложение Silverlight.

Я хочу создать метод:

IObservable<Image> DownloadImages(List<Image> imagesToDownload)
{
     //start downloading all images in imagesToDownload
     //OnImageDownloaded: 
                          image.Content = webResponse.Content
                          yield image

}

Этот метод начинает загрузку всех 10 изображений параллельно.Затем, когда каждая загрузка завершается, он устанавливает для Image.Content значение WebResponse.Content для этой загрузки.

Результатом должен быть поток IObservable с каждым загруженным изображением.

Я являюсьновичок в RX, и я думаю, что то, чего я хочу, может быть достигнуто с помощью ForkJoin, но это в экспериментальном выпуске реактивных расширений dll, которые я не хочу использовать.

Также мне действительно не нравится подсчет загрузокна обратные вызовы, чтобы обнаружить, что все изображения были загружены, а затем вызвать onCompleted ().

Мне кажется, что это не в духе Rx.

Также я выкладываю, какое решение я имеюдо сих пор закодирован, хотя мне не нравится мое решение, потому что оно длинное / уродливое и использует счетчики.

     return Observable.Create((IObserver<Attachment> observer) =>
         {
             int downloadCount = attachmentsToBeDownloaded.Count;
                foreach (var attachment in attachmentsToBeDownloaded)
                        {
                            Action<Attachment> action = attachmentDDD =>
                            this.BeginDownloadAttachment2(attachment).Subscribe(imageDownloadWebResponse =>
                                {
                                    try
                                    {
                                        using (Stream stream = imageDownloadWebResponse.GetResponseStream())
                                        {
                                            attachment.FileContent = stream.ReadToEnd();
                                        } 
                                        observer.OnNext(attachmentDDD);

                                        lock (downloadCountLocker)
                                        {
                                            downloadCount--;
                                            if (downloadCount == 0)
                                            {
                                                observer.OnCompleted();
                                            }
                                        }
                                    } catch (Exception ex)
                                    {
                                        observer.OnError(ex);
                                    }
                                });
                            action.Invoke(attachment);
                        }

                        return () => { }; //do nothing when subscriber disposes subscription
                    });
            }

Хорошо, я справился, чтобы оно заработало в конце, основываясь на ответе Джима.

    var obs = from image in attachmentsToBeDownloaded.ToObservable()
               from webResponse in this.BeginDownloadAttachment2(image).ObserveOn(Scheduler.ThreadPool)
               from responseStream in Observable.Using(webResponse.GetResponseStream, Observable.Return)
               let newImage = setAttachmentValue(image, responseStream.ReadToEnd())
               select newImage;

где setAttachmentValue просто берет, делает `image.Content = bytes;возвратное изображение;

BeginDownloadAttachment2 код:

        private IObservable<WebResponse> BeginDownloadAttachment2(Attachment attachment)
    {
        Uri requestUri = new Uri(this.DownloadLinkBaseUrl + attachment.Id.ToString();
        WebRequest imageDownloadWebRequest = HttpWebRequest.Create(requestUri);
        IObservable<WebResponse> imageDownloadObservable = Observable.FromAsyncPattern<WebResponse>(imageDownloadWebRequest.BeginGetResponse, imageDownloadWebRequest.EndGetResponse)();

        return imageDownloadObservable;
    }

1 Ответ

3 голосов
/ 08 сентября 2011

Как насчет этого немного упростить.Возьмите свой список изображений и преобразуйте его в наблюдаемый.Затем рассмотрите возможность использования Observable.FromAsyncPattern для управления запросами на обслуживание.Наконец, используйте SelectMany, чтобы согласовать запрос с ответом.Я делаю некоторые предположения о том, как вы получаете файловые потоки здесь.По сути, если вы можете передать делегатов BeginInvoke / EndInvoke в FromAsyncPattern для запроса на обслуживание, вы в порядке.

var svcObs = Observable.FromAsyncPattern<Stream>(this.BeginDownloadAttachment2, This.EndDownloadAttchment2);

var obs = from image in imagesToDownload.ToObservable()
          from responseStream in svcObs(image)
          .ObserveOnDispatcher()
          .Do(response => image.FileContent = response.ReadToEnd())
          select image;
return obs;
...