Это довольно просто с использованием .Buffer(10)
. Однако, вставляя SaveTickersToDatabaseAsync
в запрос (что является правильным решением), вы делаете вашу общую обработку ошибок и ведение журнала все более неактуальными в конце метода. Я предлагаю попробовать удалить его и попытаться поместить все это в запрос.
Вот как должен выглядеть код:
IObservable<IList<Unit>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from day in
Observable
.Range(1, daysToRun)
.Select(day =>
Observable
.FromAsync(ct => PolygonWebApi.GetGroupedDailyBarsAsync(hc, this.apiKey, Locale.US, Market.Stocks, lastRan.AddDays(day), false, ct))
.Do(r => this.logger.LogInformation("got {0} records for {1}", r.Results.Count(), lastRan.AddDays(day)))
.Select(r => new TickersResponseWithDay(lastRan.AddDays(day), r)))
.Merge(MaxConcurrentDownloads)
from tv2 in day.AggregateResponse.Results
select tv2)
.Buffer(10)
.SelectMany(xs => Observable.FromAsync(ct => SaveTickersToDatabaseAsync(xs, ct)))
.ToList();
IList<Unit> list = await query.ToTask(cancellationToken);
Теперь вам также следует подумать об использовании обычный Subscribe
, а не query.ToTask(cancellationToken)
для выполнения запроса.
Ваш код будет выглядеть так:
IObservable<Unit> query =
Observable
.Defer(() =>
{
var lastRan = DateTime.UtcNow.AddMonths(-6); // get dummy date 6 months ago
var daysToRun = (DateTime.UtcNow - lastRan).Days;
return
Observable
.Using(
() => new HttpClient(),
hc =>
from day in
Observable
.Range(1, daysToRun)
.Select(day =>
Observable
.FromAsync(ct => PolygonWebApi.GetGroupedDailyBarsAsync(hc, this.apiKey, Locale.US, Market.Stocks, lastRan.AddDays(day), false, ct))
.Do(r => this.logger.LogInformation("got {0} records for {1}", r.Results.Count(), lastRan.AddDays(day)))
.Select(r => new TickersResponseWithDay(lastRan.AddDays(day), r)))
.Merge(MaxConcurrentDownloads)
from tv2 in day.AggregateResponse.Results
select tv2)
.Buffer(10)
.SelectMany(xs => Observable.FromAsync(ct => SaveTickersToDatabaseAsync(xs, ct)));
});
IDisposable subscription =
query
.Subscribe(
x => { /* each call to `SaveTickersToDatabaseAsync` runs this code */ },
ex => { /* an exception? then end here */ },
() => { /* successfully completed */ });
Это чистый и самодостаточный, и это способ Rx делать вещи.