Использование Observable.Create
здесь не путь к go. Если вы когда-нибудь обнаружите, что возвращаете return () => {};
(или return Disposable.Empty;
, который вы часто видите), то вы делаете что-то не так.
Почти всегда есть способ использовать встроенные операторы, чтобы получить хороший устойчивый запрос.
Теперь, так как в вашем запросе много работы, которую я не могу легко воспроизвести, я приведу упрощенный пример того, как вы можете делать то, что хотите.
Для начала вот мое крайнее упрощение получения чего-либо из базы данных:
private static int __counter = 0;
public Task<int> GetCounterAsync() => Task.Run(() => __counter++);
Теперь я собираюсь кодировать неправильный подход, основанный на вашем коде, чтобы вы могли увидеть, как мой окончательный код будет относиться к вашему. Моя цель здесь - извлекать значения из «базы данных» до тех пор, пока я не получу 10
и не определим sh с этим значением.
IObservable<int> query =
Observable.Create<int>(async observer =>
{
var done = false;
do
{
var counter = await GetCounterAsync();
observer.OnNext(counter);
done = counter == 10;
} while (!done);
observer.OnCompleted();
return () => { };
});
Это разработано так, чтобы выглядеть как ваш код. Я хочу подчеркнуть, что это не способ сделать это.
Вот правильный путь.
IObservable<int> query =
Observable
.Defer(() => Observable.FromAsync(() => GetCounterAsync()))
.Repeat()
.TakeUntil(x => x == 10);
Observable.Defer
важен, поскольку он вызывает Observable.FromAsync(() => GetCounterAsync())
вызывается заново каждый раз, когда срабатывает оператор .Repeat()
. Без этого результат первого вызова Observable.FromAsync(() => GetCounterAsync())
повторяется бесконечно.
Теперь, если вам нужно включить в ваш запрос состояние, которое часто является причиной того, что люди используют Observable.Create
, тогда вы всегда можете оберните все это в другой Observable.Defer
.
IObservable<int> query =
Observable
.Defer(() =>
{
var finish = 10;
return
Observable
.Defer(() => Observable.FromAsync(() => GetCounterAsync()))
.Repeat()
.TakeUntil(x => x == finish);
});
Таким образом, любое состояние, необходимое в вашем запросе, создается заново для каждого подписчика.
Если вам необходимо сделайте что-нибудь с состоянием в конце, вы можете сделать это:
IObservable<int> query =
Observable
.Defer(() =>
{
var finish = 10;
return
Observable
.Defer(() => Observable.FromAsync(() => GetCounterAsync()))
.Repeat()
.TakeUntil(x => x == finish)
.Finally(() => Console.WriteLine($"Finished at {finish}"));
});
query.Subscribe(x => Console.WriteLine(x));
Это приводит к:
0
1
2
3
4
5
6
7
8
9
10
Finished at 10
Дайте мне знать, если это помогает или есть что-то, что я пропустил в ваш вопрос.