Полный IObservable по событию - PullRequest
0 голосов
/ 20 декабря 2018

Существует способ обернуть событие как наблюдаемое, используя Observable.FromEvent.Например, этот класс:

class Generator<T>
{
    event Action<T> onPush;

    public IObservable<T> Items =>
        Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);

    public void Push(T item) => onPush?.Invoke(item);
}

Однако я не нашел способа дополнить наблюдаемое также событием - как я могу это сделать?

Обновление:

Чтобы прояснить, что я имею в виду, вышеприведенный класс производит IObservable<T>, который является "бесконечным" и никогда не завершается.Я хочу сделать его завершенным другим событием, а не сделать еще одно наблюдаемое.Таким образом, вопрос может быть сведен к следующему:

Как сделать произвольное IObservable<T> завершенным преждевременно, то есть уведомление OnCompleted должно быть вызвано?

1 Ответ

0 голосов
/ 20 декабря 2018

Наблюдаемое представляет поток уведомлений или событий.Когда наблюдаемые источники от события, они по своей сути бесконечны.Observable подключается к событию, ссылаясь на объект, поэтому объект, поддерживающий событие, никогда не выйдет из области видимости..NET / C # не предоставляет способ указать, что событие никогда не будет вызываться снова, поэтому наблюдаемое прямое подключение к событию бесконечно.

Это не редкость;большинство наблюдаемых событий, основанных на событиях, никогда не вызывали явно OnCompleted, моделируя реальный мир, в котором довольно сложно однозначно сказать, что что-то никогда не повторится.

Однако это не проблема: наблюдаемые предназначеныбежать бесконечно и не наносить ущерба.Наблюдаемая отписка не занимает много ресурсов.Если вас не интересует наблюдаемая из событий система, отмените подписку на все подписки, и все в порядке.

Один из способов сделать это - использовать один из операторов Take, например, оператор TakeUntil (как упомянуто ниже).Попробуйте следующий код (используя класс Generator):

var g = new Generator<int>();
g.Items
    .TakeUntil(i => i > 3)
    .Subscribe(
        i => Console.WriteLine($"OnNext: {i}"), 
        e => Console.WriteLine($"OnError: Message: {e.Message}"), 
        () => Console.WriteLine("OnCompleted")
    );

g.Push(1);
g.Push(2);
g.Push(3);
g.Push(4);
g.Push(5);
g.Push(6);

Вывод:

OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted

TakeUntil отписывается от наблюдаемой Items после получения сообщения с целым числомбольше 3. Вот почему есть OnCompleted, а не 5, 6 сообщений.

Кроме того, как уже упоминалось в Enigmativity, ваш класс Generator<T> в основном совпадает с Subject<T>, я предлагаю вам использовать его.


Оригинальный ответ:

Сделайте еще одну наблюдаемую из события, затем используйте .TakeUntil:

class Generator<T>
{
    event Action<T> onPush;
    event Action<Unit> onCompleted;

    public IObservable<T> Items =>
        Observable.FromEvent<T>(d => onPush += d, d => onPush -= d)
        .TakeUntil(Completion);

    public IObservable<Unit> Completion =>
        Observable.FromEvent<Unit>(d => onCompleted += d, d => onCompleted -= d);

    public void Push(T item) => onPush?.Invoke(item);
    public void Complete() => onCompleted?.Invoke(Unit.Default);
}
...