Реактивная программа не завершается после ожидания - PullRequest
1 голос
/ 04 апреля 2019

Я ожидал, что приведенная ниже программа будет отображать любые нажатия клавиш до тех пор, пока вы не нажмете z, но она не завершится, когда вы нажмете z, а только эхосигналы при каждом втором нажатии. Что я сделал не так?

using System.Reactive;
using System.Reactive.Linq;

public class Printer : IObserver<char>
{ 
    public void OnNext(char x)
    {
        Console.WriteLine(x);
    }

    public void OnError(Exception x)
    {

    }
    public void OnCompleted()
    {

    }
}

class Program
{
    static IObservable<char> keys = Observable.Defer(() =>Observable.Start(() =>Console.ReadKey().KeyChar)).Repeat(); //https://stackoverflow.com/questions/10675451/iobservable-of-keys-pressed
    public static int Main()
    {
        IObserver<char> x = new Printer();
        keys.Subscribe(x);
        keys.Where(b => b == 'z').Wait();
        return 0;
    }
}

1 Ответ

2 голосов
/ 08 апреля 2019

Хорошо, так что две проблемы, обе раздельные:

  1. То, что у вас здесь есть, это наблюдаемая холодная .Только когда вы начинаете наблюдать это, оно производит ценности.И наоборот, каждый раз, когда вы подписываетесь на него, он представляет новый поток - аналогично тому, как IEnumerable оценивает каждый раз, когда вы пытаетесь получить предметы.Вы можете ясно увидеть это, если поставить точку останова внутри Observable.Defer.

    У этих двух потоков может быть только одна подписка на источник , т. Е. Наблюдаемое нажатие клавиши.Таким образом, мы конвертируем наблюдаемую холодную в горячую.

  2. Метод Wait:

    Ожидает наблюдаемой последовательности до complete и возвращает последний элемент последовательности.Если последовательность завершается с уведомлением OnError, генерируется исключение.

    Таким образом, он будет ждать завершения последовательности, т. Е. OnCompleted был вызван по наблюдаемой цепочке.Итак, мы используем TakeUntil, чтобы последовательность завершилась только при выполнении условия (нажатие кнопки «z»).

        public static int Main()
        {
            var keys_stream = keys.Publish().RefCount(); // share
    
            IObserver<char> x = new Printer();
            keys_stream.Subscribe(x);
            keys_stream.TakeUntil(b => b == 'z').Wait(); //wait until z
            return 0;
        }
    
...