Наблюдаемое представляет поток уведомлений или событий.Когда наблюдаемые источники от события, они по своей сути бесконечны.Observable подключается к событию, ссылаясь на объект, поэтому объект, поддерживающий событие, никогда не выйдет из области видимости..NET / C # не предоставляет способ указать, что событие никогда не будет вызываться снова, поэтому наблюдаемое прямое подключение к событию бесконечно.
Это не редкость;большинство наблюдаемых событий, основанных на событиях, никогда не вызывали явно OnCompleted
, моделируя реальный мир, в котором довольно сложно однозначно сказать, что что-то никогда не повторится.
Однако это не проблема: наблюдаемые предназначеныбежать бесконечно и не наносить ущерба.Наблюдаемая отписка не занимает много ресурсов.Если вас не интересует наблюдаемая из событий система, отмените подписку на все подписки, и все в порядке.
Один из способов сделать это - использовать один из операторов Take
, например, оператор TakeUntil
(как упомянуто ниже).Попробуйте следующий код (используя класс Generator
):
var g = new Generator<int>();
g.Items
.TakeUntil(i => i > 3)
.Subscribe(
i => Console.WriteLine($"OnNext: {i}"),
e => Console.WriteLine($"OnError: Message: {e.Message}"),
() => Console.WriteLine("OnCompleted")
);
g.Push(1);
g.Push(2);
g.Push(3);
g.Push(4);
g.Push(5);
g.Push(6);
Вывод:
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
TakeUntil
отписывается от наблюдаемой Items
после получения сообщения с целым числомбольше 3. Вот почему есть OnCompleted, а не 5, 6 сообщений.
Кроме того, как уже упоминалось в Enigmativity, ваш класс Generator<T>
в основном совпадает с Subject<T>
, я предлагаю вам использовать его.
Оригинальный ответ:
Сделайте еще одну наблюдаемую из события, затем используйте .TakeUntil
:
class Generator<T>
{
event Action<T> onPush;
event Action<Unit> onCompleted;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d)
.TakeUntil(Completion);
public IObservable<Unit> Completion =>
Observable.FromEvent<Unit>(d => onCompleted += d, d => onCompleted -= d);
public void Push(T item) => onPush?.Invoke(item);
public void Complete() => onCompleted?.Invoke(Unit.Default);
}