Если у вас есть:
IObservable<string> UnparsedMessages = ...;
Func<string, Message> ParseMessage = ...;
Тогда вы можете использовать метод расширения SelectAsync
, например:
IObservable<Message> ParsedMessages = UnparsedMessages.SelectAsync(ParseMessage);
Метод расширения SelectAsync
обрабатывает каждое непарсированное сообщение асинхронно и обеспечивает возврат результатов в том порядке, в котором они были получены.
Дайте мне знать, если это делает то, что вам нужно.
Вот код:
public static IObservable<U> SelectAsync<T, U>(this IObservable<T> source,
Func<T, U> selector)
{
var subject = new Subject<U>();
var queue = new Queue<System.Threading.Tasks.Task<U>>();
var completing = false;
var subscription = (IDisposable)null;
Action<Exception> onError = ex =>
{
queue.Clear();
subject.OnError(ex);
subscription.Dispose();
};
Action dequeue = () =>
{
lock (queue)
{
var error = false;
while (queue.Count > 0 && queue.Peek().IsCompleted)
{
var task = queue.Dequeue();
if (task.Exception != null)
{
error = true;
onError(task.Exception);
break;
}
else
{
subject.OnNext(task.Result);
}
}
if (!error && completing && queue.Count == 0)
{
subject.OnCompleted();
subscription.Dispose();
}
}
};
Action<T> enqueue = t =>
{
if (!completing)
{
var task = new System.Threading.Tasks.Task<U>(() => selector(t));
queue.Enqueue(task);
task.ContinueWith(tu => dequeue());
task.Start();
}
};
subscription = source.Subscribe(
t => { lock(queue) enqueue(t); },
x => { lock(queue) onError(x); },
() => { lock(queue) completing = true; });
return subject.AsObservable();
}
В итоге мне пришлось вернуться к этой работе для работы, и я написал более надежную версию этого кода (основанную также на ответе Ричарда).
Ключевым преимуществом этого кода является отсутствие какой-либо явной очереди. Я просто использую продолжение задач, чтобы привести результаты в порядок. Работает как удовольствие!
public static IObservable<U> ForkSelect<T, U>(this IObservable<T> source, Func<T, U> selector)
{
return source.ForkSelect<T, U>(t => Task<U>.Factory.StartNew(() => selector(t)));
}
public static IObservable<U> ForkSelect<T, U>(this IObservable<T> source, Func<T, Task<U>> selector)
{
if (source == null) throw new ArgumentNullException("source");
if (selector == null) throw new ArgumentNullException("selector");
return Observable.CreateWithDisposable<U>(observer =>
{
var gate = new object();
var onNextTask = Task.Factory.StartNew(() => { });
var sourceCompleted = false;
var taskErrored = false;
Action<Exception> onError = ex =>
{
sourceCompleted = true;
onNextTask = onNextTask.ContinueWith(t => observer.OnError(ex));
};
Action onCompleted = () =>
{
sourceCompleted = true;
onNextTask = onNextTask.ContinueWith(t => observer.OnCompleted());
};
Action<T> onNext = t =>
{
var task = selector(t);
onNextTask = Task.Factory.ContinueWhenAll(new[] { onNextTask, task }, ts =>
{
if (!taskErrored)
{
if (task.IsFaulted)
{
taskErrored = true;
observer.OnError(task.Exception);
}
else
{
observer.OnNext(task.Result);
}
}
});
};
var subscription = source
.AsObservable()
.Subscribe(
t => { if (!sourceCompleted) lock (gate) onNext(t); },
ex => { if (!sourceCompleted) lock (gate) onError(ex); },
() => { if (!sourceCompleted) lock (gate) onCompleted(); });
var @return = new CompositeDisposable(subscription);
return @return;
});
}
И перегрузки SelectMany
, позволяющие использовать LINQ:
public static IObservable<U> SelectMany<T, U>(this IObservable<T> source, Func<T, Task<U>> selector)
{
return source.ForkSelect<T, U>(selector);
}
public static IObservable<V> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Task<U>> taskSelector, Func<T, U, V> resultSelector)
{
if (source == null) throw new ArgumentNullException("source");
if (taskSelector == null) throw new ArgumentNullException("taskSelector");
if (resultSelector == null) throw new ArgumentNullException("resultSelector");
return source.Zip(source.ForkSelect<T, U>(taskSelector), (t, u) => resultSelector(t, u));
}
Таким образом, эти методы теперь можно использовать так:
var observableOfU = observableOfT.ForkSelect(funcOfT2U);
Или:
var observableOfU = observableOfT.ForkSelect(funcOfT2TaskOfU);
Или:
var observableOfU =
from t in observableOfT
from u in funcOfT2TaskOfU(t)
select u;
Наслаждайтесь!