И LINQ, и Rx предназначены для поддержки преобразований, которые приводят к появлению новых объектов, а не тех, которые изменяют существующие объекты, но это все еще выполнимо. Вы уже сделали первый шаг, разбив задачу на части. Следующим шагом будет создание компонуемых функций, которые реализуют эти шаги.
1) У вас, по большей части, уже есть этот, но мы, вероятно, должны сохранить элементы для обновления позже.
public IEnumerable<XElement> GetImages(XDocument document)
{
var ns = document.Root.Name.Namespace;
return document.Root.Descendants(ns + "img");
}
2) Кажется, это то, где вы ударились о стену с точки зрения компоновки. Для начала давайте создадим наблюдаемый генератор FromEventAsyncPattern
. Уже есть такие для асинхронного шаблона Begin / End и стандартных событий, так что это будет где-то посередине.
public IObservable<TEventArgs> FromEventAsyncPattern<TDelegate, TEventArgs>
(Action method, Action<TDelegate> addHandler, Action<TDelegate> removeHandler
) where TEventArgs : EventArgs
{
return Observable.Create<TEventArgs>(
obs =>
{
//subscribe to the handler before starting the method
var ret = Observable.FromEventPattern<TDelegate, TEventArgs>(addHandler, removeHandler)
.Select(ep => ep.EventArgs)
.Take(1) //do this so the observable completes
.Subscribe(obs);
method(); //start the async operation
return ret;
}
);
}
Теперь мы можем использовать этот метод, чтобы превратить загрузки в наблюдаемые. Исходя из вашего использования, я думаю, что вы могли бы также использовать DownloadDataAsync
вместо WebClient.
public IObservable<byte[]> DownloadAsync(Uri address)
{
return Observable.Using(
() => new System.Net.WebClient(),
wc =>
{
return FromEventAsyncPattern<System.Net.DownloadDataCompletedEventHandler,
System.Net.DownloadDataCompletedEventArgs>
(() => wc.DownloadDataAsync(address),
h => wc.DownloadDataCompleted += h,
h => wc.DownloadDataCompleted -= h
)
.Select(e => e.Result);
//for robustness, you should probably check the error and cancelled
//properties instead of assuming it finished like I am here.
});
}
РЕДАКТИРОВАТЬ: Согласно вашему комментарию, вы, кажется, используете Silverlight, где WebClient
не является IDisposable
и не имеет метод, который я использовал. Чтобы справиться с этим, попробуйте что-то вроде:
public IObservable<byte[]> DownloadAsync(Uri address)
{
var wc = new System.Net.WebClient();
var eap = FromEventAsyncPattern<OpenReadCompletedEventHandler,
OpenReadCompletedEventArgs>(
() => wc.OpenReadAsync(address),
h => wc.OpenReadCompleted += h,
h => wc.OpenReadCompleted -= h);
return from e in eap
from b in e.Result.ReadAsync()
select b;
}
Вам понадобится найти реализацию ReadAsync
для чтения потока. Вы сможете найти его довольно легко, и сообщение уже было достаточно длинным, поэтому я его пропустил.
3 & 4) Теперь мы готовы собрать все вместе и обновить элементы. Поскольку шаг 3 очень прост, я просто объединю его с шагом 4.
public IObservable<Unit> ReplaceImageLinks(XDocument document)
{
return (from element in GetImages(document)
let address = new Uri(element.Attribute("src").Value)
select (From data in DownloadAsync(address)
Select Convert.ToBase64String(data)
).Do(base64 => element.Attribute("src").Value = base64)
).Merge()
.IgnoreElements()
.Select(s => Unit.Default);
//select doesn't really do anything as IgnoreElements eats all
//the values, but it is needed to change the type of the observable.
//Task may be more appropriate here.
}