Реактивное выражение linq для 2 IObservable - PullRequest
2 голосов
/ 02 марта 2011

В следующем коде, если я правильно понимаю объединения в RX, я должен увидеть следующие предупреждения:

  • Запад
  • Тест
  • Test-West *
  • Готово

Я получаю 3 из 4 ожидаемых предупреждений ... почему я тоже не получаю "Test-West"?

public partial class MainWindow : Window
{
    public MainWindow()
    {
        InitializeComponent(); 

        var loginInitial = new LoginInitial();
        var loginCheckList = new LoginCheckList();



        var result1 = from x in loginInitial.Status
                    from y in loginCheckList.Status
                    where x == "Test" && y == "West"
                    select new { x, y };

        result1.Subscribe(x => MessageBox.Show(x.x + "-" + x.y));

        var result2 = from x in loginInitial.Status
                      where x == "Test"
                      select x;

        result2.Subscribe(x => MessageBox.Show(x));

        var result3 = from x in loginCheckList.Status
                      where x == "West"
                      select x;

        result3.Subscribe(x => MessageBox.Show(x));

        var task1 = Task.Factory.StartNew(() =>
                                  {
                                      for (int i = 0; i < 10000000; i++)
                                      {
                                          if (i == 9000000)
                                              loginInitial.Status.Publish("9000000");
                                          if (i == 9000001)
                                              loginInitial.Status.Publish("Test");
                                      }
                                  });

        var task2 = Task.Factory.StartNew(() =>
                                  {
                                        for (int i = 0; i < 1000000; i++)
                                        {
                                            if (i == 800000)
                                                loginInitial.Status.Publish("800000");
                                            if (i == 800001)
                                                loginCheckList.Status.Publish("West");
                                        }
                                  });
        Task.WaitAll(task1, task2);

        MessageBox.Show("Done");
    }
}

public class LoginInitial
{
    public PublishObservable<string> Status = new PublishObservable<string>(); 
}

public class LoginCheckList
{
    public PublishObservable<string> Status = new PublishObservable<string>();
}

public class PublishObservable<T> : IObservable<T>
{
    private IList<IObserver<T>> _observers = new List<IObserver<T>>();

    public void Publish(T value)
    {
        lock (_observers)
        {
            foreach (var observer in _observers)
            {
                observer.OnNext(value);
            }
        }
    }

    public void Complete()
    {
        lock (_observers)
        {
            foreach (var observer in _observers)
            {
                observer.OnCompleted();
            }
        }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (_observers)
        {
            _observers.Add(observer);
        }
        return null;
    }
}

Ответы [ 2 ]

3 голосов
/ 02 марта 2011

Когда вы используете предложение from в Rx, вы говорите, что остальная часть предложения должна выполняться для всех случаев наблюдаемого. Для вложенных предложений from это означает, что вы ожидаете первого вхождения первого события, а затем начинаете выполнение остальной части предложения для этого вхождения (а затем делаете то же самое параллельно для всех будущих вхождений). Вы можете найти больше информации о том, как работает SelectMany, например, здесь .

Когда вы смотрите на свой пример:

var result1 = 
  from x in loginInitial.Status
  from y in loginCheckList.Status
  where x == "Test" && y == "West"
  select new { x, y }; 

... это означает, что предложение должно ждать loginInitial.Status. Когда это вызывает значение, оно начинает ждать loginCheckList.Status. Если я правильно понимаю ваш код, то наблюдаемая Initial выдаст значение после наблюдаемой наблюдаемой CheckList, так что к тому времени, как вы начнете ждать второго, значение уже будет сгенерировано, и вы получите не получить это снова.

Я думаю, что более подходящей операцией в вашем случае будет Observable.Zip или CombineLatest (см. this и this ).

1 голос
/ 02 марта 2011

Томас Петричек в значительной степени объясняет, почему это происходит. Я просто добавлю решение в качестве примера.

Наряду с настройкой result1 для использования CombineLatest (который также должен использовать синтаксис метода расширения в отличие от синтаксиса linq), я изменил реализацию на использование Subject, что устранит необходимость создания собственной реализации IObservable. Я также изменил ваши реализации, использующие несколько подписок, в одну подписку по результату mergin geach, наблюдаемому через Observable.Merge.

public partial class MainWindow : Window
{
    public MainWindow()
    {
        InitializeComponent(); 

        var loginInitial = new Subject<String>();
        var loginCheckList = new Subject<String>();

        var result1 = loginInitial.CombineLatest(loginCheckList, 
                (x, y) => new Tuple<string, string>(x, y))
            .Where(latest => latest.Item1 == "Test" && latest.Item2 == "West")
            .Select(latest => string.Format("{0} - {1}", latest.Item1, latest.Item2));

        var result2 = from x in loginInitial
                        where x == "Test"
                        select x;

        var result3 = from x in loginCheckList
                        where x == "West"
                        select x;

        Observable.Merge(result1, result2, result3)
            .Subscribe(Console.WriteLine);

        var task1 = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 10000000; i++)
            {
                if (i == 9000000)
                    loginInitial.OnNext("9000000");
                if (i == 9000001)
                    loginInitial.OnNext("Test");
            }
        });

        var task2 = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 1000000; i++)
            {
                if (i == 800000)
                    loginInitial.OnNext("800000");
                if (i == 800001)
                    loginCheckList.OnNext("West");
            }
        });

        Task.WaitAll(task1, task2);

        Console.WriteLine("Done");
    }
}

Примечание 1 - я использовал CombineLatest здесь, но вы также можете легко изменить его на Zip в зависимости от нужного вам поведения. Посмотрите на мраморные диаграммы на страницах RxAs для Zip и CombineLatest , чтобы лучше понять, как ведет себя каждый из них.

Примечание 2 - Я бы, вероятно, изменил бы result2 и result3, чтобы использовать синтаксис метода расширения, чтобы в одном методе не было сочетания подходов. В этом нет ничего плохого, но я бы предпочел последовательность использования одного типа синтаксиса, где это возможно.

...