У меня есть класс «Изображение» с тремя свойствами: Url, Id, Content.У меня есть список из 10 таких изображений.Это приложение Silverlight.
Я хочу создать метод:
IObservable<Image> DownloadImages(List<Image> imagesToDownload)
{
//start downloading all images in imagesToDownload
//OnImageDownloaded:
image.Content = webResponse.Content
yield image
}
Этот метод начинает загрузку всех 10 изображений параллельно.Затем, когда каждая загрузка завершается, он устанавливает для Image.Content значение WebResponse.Content для этой загрузки.
Результатом должен быть поток IObservable с каждым загруженным изображением.
Я являюсьновичок в RX, и я думаю, что то, чего я хочу, может быть достигнуто с помощью ForkJoin, но это в экспериментальном выпуске реактивных расширений dll, которые я не хочу использовать.
Также мне действительно не нравится подсчет загрузокна обратные вызовы, чтобы обнаружить, что все изображения были загружены, а затем вызвать onCompleted ().
Мне кажется, что это не в духе Rx.
Также я выкладываю, какое решение я имеюдо сих пор закодирован, хотя мне не нравится мое решение, потому что оно длинное / уродливое и использует счетчики.
return Observable.Create((IObserver<Attachment> observer) =>
{
int downloadCount = attachmentsToBeDownloaded.Count;
foreach (var attachment in attachmentsToBeDownloaded)
{
Action<Attachment> action = attachmentDDD =>
this.BeginDownloadAttachment2(attachment).Subscribe(imageDownloadWebResponse =>
{
try
{
using (Stream stream = imageDownloadWebResponse.GetResponseStream())
{
attachment.FileContent = stream.ReadToEnd();
}
observer.OnNext(attachmentDDD);
lock (downloadCountLocker)
{
downloadCount--;
if (downloadCount == 0)
{
observer.OnCompleted();
}
}
} catch (Exception ex)
{
observer.OnError(ex);
}
});
action.Invoke(attachment);
}
return () => { }; //do nothing when subscriber disposes subscription
});
}
Хорошо, я справился, чтобы оно заработало в конце, основываясь на ответе Джима.
var obs = from image in attachmentsToBeDownloaded.ToObservable()
from webResponse in this.BeginDownloadAttachment2(image).ObserveOn(Scheduler.ThreadPool)
from responseStream in Observable.Using(webResponse.GetResponseStream, Observable.Return)
let newImage = setAttachmentValue(image, responseStream.ReadToEnd())
select newImage;
где setAttachmentValue просто берет, делает `image.Content = bytes;возвратное изображение;
BeginDownloadAttachment2 код:
private IObservable<WebResponse> BeginDownloadAttachment2(Attachment attachment)
{
Uri requestUri = new Uri(this.DownloadLinkBaseUrl + attachment.Id.ToString();
WebRequest imageDownloadWebRequest = HttpWebRequest.Create(requestUri);
IObservable<WebResponse> imageDownloadObservable = Observable.FromAsyncPattern<WebResponse>(imageDownloadWebRequest.BeginGetResponse, imageDownloadWebRequest.EndGetResponse)();
return imageDownloadObservable;
}