Как насчет чего-то вроде этого:
public static IObservable<TResult> SelectAsync<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> projectAsync)
{
return Observable.Create<TResult>(
observer =>
{
var throttle = new BehaviorSubject<TResult>(default);
var observable = source
.Zip(throttle, (value, _) => value)
.SelectMany(value => Observable.Defer(() => Observable.StartAsync(() => projectAsync(value))))
.Publish();
return new CompositeDisposable(
observable.Subscribe(throttle),
observable.Subscribe(observer),
observable.Connect(),
throttle
);
}
);
}
В этом методе расширения Zip
в сочетании с BehaviorSubject
образуют дроссель, в котором элементы ставятся в очередь до тех пор, пока projectAsync
не будет завершено.
Затем его можно использовать следующим образом:
public static async Task<Unit> DoSomethingAsync(int value)
{
Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Started processing value '{value}'");
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Completed processing value '{value}'");
return Unit.Default;
}
public static async Task RunAsync()
{
IObservable<int> source = Observable.Generate(0, value => value < 25, value => value + 1, value => value, value => TimeSpan.FromSeconds(0.1));
await source
.Do(value => Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Received value '{value}'"))
.SelectAsync(value => DoSomethingAsync(value))
.ToTask();
}
При этом создается наблюдаемый источник, который генерирует 25 элементов с интервалами 100 мс. В методе DoSomethingAsync
используется Task.Delay
для имитации задержки обработки в 1 секунду. Выполнение этого кода должно привести к следующему результату:
[02:07:56] Received value '0'
[02:07:56] Started processing value '0'
[02:07:56] Received value '1'
[02:07:56] Received value '2'
[02:07:57] Received value '3'
[02:07:57] Received value '4'
[02:07:57] Received value '5'
[02:07:57] Received value '6'
[02:07:57] Received value '7'
[02:07:57] Received value '8'
[02:07:57] Received value '9'
[02:07:57] Completed processing value '0'
[02:07:57] Started processing value '1'
[02:07:57] Received value '10'
[02:07:57] Received value '11'
[02:07:58] Received value '12'
[02:07:58] Received value '13'
[02:07:58] Received value '14'
[02:07:58] Received value '15'
[02:07:58] Received value '16'
[02:07:58] Received value '17'
[02:07:58] Received value '18'
[02:07:58] Completed processing value '1'
[02:07:58] Started processing value '2'
[02:07:58] Received value '19'
[02:07:58] Received value '20'
[02:07:59] Received value '21'
[02:07:59] Received value '22'
[02:07:59] Received value '23'
[02:07:59] Received value '24'
[02:07:59] Completed processing value '2'
[02:07:59] Started processing value '3'
[02:08:00] Completed processing value '3'
[02:08:00] Started processing value '4'
[02:08:01] Completed processing value '4'
...
[02:08:20] Started processing value '23'
[02:08:21] Completed processing value '23'
[02:08:21] Started processing value '24'
[02:08:22] Completed processing value '24'
Вы должны знать, что этот код не предоставляет никаких средств противодействия источнику, поэтому, если источник постоянно генерирует элементы быстрее, чем projectAsync
давление памяти будет нарастать (через организацию очереди в операторе Zip
), пока вы не получите исключение нехватки памяти.
Кроме того, хотя я не знаю варианта использования этого требования, вы можете захотеть подумайте, что здесь лучше: "System.Interactive.Asyn c" или "System.Threading.Tasks.DataFlow".