Как использовать system.reactive для выполнения операции, в которой используется значение изнутри каждой итерации? - PullRequest
0 голосов
/ 28 марта 2020

В качестве примера, давайте предположим, что я решил начать со следующего:

var startingSequence = await GetLastSequence(database);

return Observable.Create<ExternalSourceChange>(async (observer) =>
{
    var currentSequence = startingSequence;

    var done = false;
    do
    {
        var changes = await CreateRequest()
            .AppendPathSegment(database)
            .AppendPathSegment("_changes")
            .SetQueryParams(new
            {
                since = currentSequence,
                include_docs = "true",
                limit = 1,
            })
            .GetJsonAsync<ExternalSourceChangeJson>();

        // note: If we've already processed past this sequence, skip it.
        if (string.CompareOrdinal(changes.last_seq, currentSequence) <= 0)
        {
            continue;
        }

        var result = changes.results.First();
        var deleted = result.doc.GetValue("_deleted")?.Value<bool>() ?? false;

        var databaseChange = new ExternalSourceChangeJson(result.id, result.doc.ToString())
        {
            Deleted = deleted,
        };

        observer.OnNext(databaseChange);

        currentSequence = ((List<ExternalSourceChangeListJson>)changes
            .results)
            .OrderBy((result) => result.seq)
            .LastOrDefault()?.seq
            ?? currentSequence;

        done = changes.pending == 0;

    } while (! done);

    await SetLastSequence(database, currentSequence);

    observer.OnCompleted();

    // note: Not sure I have anything to do here.
    return () => {};
});

Могу ли я что-нибудь сделать с вышеперечисленным, чтобы добиться большего от system.reactive?

  • Changes<dynamic> создается на каждой итерации
  • currentSequence и done - это, на мой взгляд, две вещи, которые я мог бы улучшить в управлении

Ответы [ 2 ]

3 голосов
/ 30 марта 2020

Использование Observable.Create здесь не путь к go. Если вы когда-нибудь обнаружите, что возвращаете return () => {}; (или return Disposable.Empty;, который вы часто видите), то вы делаете что-то не так.

Почти всегда есть способ использовать встроенные операторы, чтобы получить хороший устойчивый запрос.

Теперь, так как в вашем запросе много работы, которую я не могу легко воспроизвести, я приведу упрощенный пример того, как вы можете делать то, что хотите.

Для начала вот мое крайнее упрощение получения чего-либо из базы данных:

private static int __counter = 0;
public Task<int> GetCounterAsync() => Task.Run(() => __counter++);

Теперь я собираюсь кодировать неправильный подход, основанный на вашем коде, чтобы вы могли увидеть, как мой окончательный код будет относиться к вашему. Моя цель здесь - извлекать значения из «базы данных» до тех пор, пока я не получу 10 и не определим sh с этим значением.

IObservable<int> query =
    Observable.Create<int>(async observer =>
    {
        var done = false;
        do
        {
            var counter = await GetCounterAsync();
            observer.OnNext(counter);
            done = counter == 10;
        } while (!done);
        observer.OnCompleted();
        return () => { };
    });

Это разработано так, чтобы выглядеть как ваш код. Я хочу подчеркнуть, что это не способ сделать это.

Вот правильный путь.

IObservable<int> query =
    Observable
        .Defer(() => Observable.FromAsync(() => GetCounterAsync()))
        .Repeat()
        .TakeUntil(x => x == 10);

Observable.Defer важен, поскольку он вызывает Observable.FromAsync(() => GetCounterAsync()) вызывается заново каждый раз, когда срабатывает оператор .Repeat(). Без этого результат первого вызова Observable.FromAsync(() => GetCounterAsync()) повторяется бесконечно.

Теперь, если вам нужно включить в ваш запрос состояние, которое часто является причиной того, что люди используют Observable.Create, тогда вы всегда можете оберните все это в другой Observable.Defer.

IObservable<int> query =
    Observable
        .Defer(() =>
        {
            var finish = 10;
            return
                Observable
                    .Defer(() => Observable.FromAsync(() => GetCounterAsync()))
                    .Repeat()
                    .TakeUntil(x => x == finish);
        });

Таким образом, любое состояние, необходимое в вашем запросе, создается заново для каждого подписчика.

Если вам необходимо сделайте что-нибудь с состоянием в конце, вы можете сделать это:

IObservable<int> query =
    Observable
        .Defer(() =>
        {
            var finish = 10;
            return
                Observable
                    .Defer(() => Observable.FromAsync(() => GetCounterAsync()))
                    .Repeat()
                    .TakeUntil(x => x == finish)
                    .Finally(() => Console.WriteLine($"Finished at {finish}"));
        });

query.Subscribe(x => Console.WriteLine(x));

Это приводит к:

0
1
2
3
4
5
6
7
8
9
10
Finished at 10

Дайте мне знать, если это помогает или есть что-то, что я пропустил в ваш вопрос.

1 голос
/ 30 марта 2020

Вот как вы можете сделать его более надежным, включив обработку ошибок и реакцию на отмену подписки.

return Observable.Create<ExternalSourceChange>(observer =>
{
    var cts = new CancellationTokenSource();
    var fireAndForget = Task.Run(async () =>
    {
        try
        {
            while (true)
            {
                cts.Token.ThrowIfCancellationRequested();
                var changes = await CreateRequest().GetJsonAsync();
                //...
                observer.OnNext(databaseChange);
                //...
                if (changes.pending == 0) break;
            }
            observer.OnCompleted();
        }
        catch (OperationCanceledException ex) when (ex.CancellationToken == cts.Token)
        {
            // Do nothing (the subscriber unsubscribed)
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
        }
    });

    return System.Reactive.Disposables.Disposable.Create(() =>
    {
        cts.Cancel();
    });
});

Вы можете передать cts.Token любым асинхронным c методам, которые вы вызываете и которые принимают отмену токены, для еще более быстрого завершения l oop.


Обновление: Я только что заметил другую перегрузку метода Observable.Create, который принимает асинхронный c делегат с параметром CancellationToken:

// Summary:
// Creates an observable sequence from a specified cancellable asynchronous Subscribe
// method. The CancellationToken passed to the asynchronous Subscribe method is
// tied to the returned disposable subscription, allowing best-effort cancellation.
public static IObservable<TResult> Create<TResult>(
    Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync);

Это можно использовать для упрощения вещей, избавившись от задачи «забей и забудь», CancellationTokenSource и Disposable.Create в конце.

return Observable.Create<ExternalSourceChange>(async (observer, cancellationToken) =>
{ // etc...
...