Как выполнить пакетную запись в базу данных каждые N страниц с помощью ReactiveExtensions Range и WithAsync - PullRequest
0 голосов
/ 12 июля 2020

У меня есть следующая функция, которая работает хорошо ... однако, учитывая объем генерируемых данных, мне нужно выполнять пакетную запись в базу данных каждые 10 или около того извлеченных страниц.

var start = DateTime.Now;

IList<AggV2> list = null;

var lastRan = DateTime.UtcNow.AddMonths(-6); // get dummy date 6 months ago
var daysToRun = (DateTime.UtcNow - lastRan).Days;

try
{
    IObservable<IList<AggV2>> query =
        Observable
            .Using(
                () => new HttpClient(),
                hc =>
                    from day in
                        Observable
                            .Range(1, daysToRun)
                            .Select(day =>
                                Observable
                                    .FromAsync(ct => PolygonWebApi.GetGroupedDailyBarsAsync(hc, this.apiKey, Locale.US, Market.Stocks, lastRan.AddDays(day), false, ct))
                                    .Select(r =>
                                    {
                                        this.logger.LogInformation("got {0} records for {1}", r.Results.Count(), lastRan.AddDays(day));
                                        return new TickersResponseWithDay(lastRan.AddDays(day), r);
                                    }))
                            .Merge(MaxConcurrentDownloads)
                    from tv2 in day.AggregateResponse.Results
                    select tv2)
            .ToList();

    list = await query.ToTask(cancellationToken);
}
catch (OperationCanceledException) { }
catch (Exception e) { this.logger.LogError(e, e.Message); }

var duration = DateTime.Now - start;

if (cancellationToken.IsCancellationRequested)
    this.logger.LogInformation("{0} cancelled after {1}, database not updated", this.GetType().Name, duration.Humanize());
else
{
    this.logger.LogInformation("{0} downloaded {1} tickers in {2}, saving to database...", this.GetType().Name, list.Count, duration.Humanize());
    await SaveTickersToDatabaseAsync(list, cts.Token);
}

Вместо этого Чтобы получить все данные и затем записать, я хотел бы вызывать SaveTickersToDatabaseAsync(list, cancellationToken) каждые 10 страниц.

Мне также нужно иметь возможность выйти из приложения в любой момент cancellationToken установлено.

Можно ли, пожалуйста, объединить вышеуказанные требования по пакетированию и отмене?

1 Ответ

1 голос
/ 14 июля 2020

Это довольно просто с использованием .Buffer(10). Однако, вставляя SaveTickersToDatabaseAsync в запрос (что является правильным решением), вы делаете вашу общую обработку ошибок и ведение журнала все более неактуальными в конце метода. Я предлагаю попробовать удалить его и попытаться поместить все это в запрос.

Вот как должен выглядеть код:

IObservable<IList<Unit>> query =
    Observable
        .Using(
            () => new HttpClient(),
            hc =>
                from day in
                    Observable
                        .Range(1, daysToRun)
                        .Select(day =>
                            Observable
                                .FromAsync(ct => PolygonWebApi.GetGroupedDailyBarsAsync(hc, this.apiKey, Locale.US, Market.Stocks, lastRan.AddDays(day), false, ct))
                                .Do(r => this.logger.LogInformation("got {0} records for {1}", r.Results.Count(), lastRan.AddDays(day)))
                                .Select(r => new TickersResponseWithDay(lastRan.AddDays(day), r)))
                        .Merge(MaxConcurrentDownloads)
                from tv2 in day.AggregateResponse.Results
                select tv2)
        .Buffer(10)
        .SelectMany(xs => Observable.FromAsync(ct => SaveTickersToDatabaseAsync(xs, ct)))
        .ToList();

IList<Unit> list = await query.ToTask(cancellationToken);

Теперь вам также следует подумать об использовании обычный Subscribe, а не query.ToTask(cancellationToken) для выполнения запроса.

Ваш код будет выглядеть так:

IObservable<Unit> query =
    Observable
        .Defer(() =>
        {
            var lastRan = DateTime.UtcNow.AddMonths(-6); // get dummy date 6 months ago
            var daysToRun = (DateTime.UtcNow - lastRan).Days;
            
            return
                Observable
                    .Using(
                        () => new HttpClient(),
                        hc =>
                            from day in
                                Observable
                                    .Range(1, daysToRun)
                                    .Select(day =>
                                        Observable
                                            .FromAsync(ct => PolygonWebApi.GetGroupedDailyBarsAsync(hc, this.apiKey, Locale.US, Market.Stocks, lastRan.AddDays(day), false, ct))
                                            .Do(r => this.logger.LogInformation("got {0} records for {1}", r.Results.Count(), lastRan.AddDays(day)))
                                            .Select(r => new TickersResponseWithDay(lastRan.AddDays(day), r)))
                                    .Merge(MaxConcurrentDownloads)
                            from tv2 in day.AggregateResponse.Results
                            select tv2)
                    .Buffer(10)
                    .SelectMany(xs => Observable.FromAsync(ct => SaveTickersToDatabaseAsync(xs, ct)));
        });

IDisposable subscription =
    query
        .Subscribe(
            x => { /* each call to `SaveTickersToDatabaseAsync` runs this code */  },
            ex => { /* an exception? then end here */ },
            () => { /* successfully completed */ });

Это чистый и самодостаточный, и это способ Rx делать вещи.

...