Присоединение к Rx Streams - PullRequest
25 голосов
/ 18 марта 2012

Я пытаюсь смоделировать запрос Rx, который не является тривиальным (для меня):

  • В комнате есть Мужчины и Женщины.
  • Они входят и выходят изв комнате, и иногда находясь в комнате, они меняют свое местоположение.
  • Каждый мужчина может смотреть на одну (или ноль) женщину в данный момент времени.
  • Каждый мужчина обладает следующими свойствами:

    class Man
    {
      public const int LookingAtNobody = 0;
      public int Id { get; set; }
      public double Location { get; set; }
      public int LookingAt { get; set; }
    }
    
  • Каждая женщина обладает следующими свойствами:

    class Woman
    {
      public int Id { get; set; }
      public double Location { get; set; }
    }
    
  • Для представления мужчин у нас есть IObservable<IObservable<Man>>, и дляпредставлять женщин, которых мы имеем IObservable<IObservable<Woman>>.

Как бы вы использовали Rx для генерации векторов от мужчин к женщинам, на которых они смотрят: IObservable<IObservable<Tuple<double,double>>>?

Чтобы помочь, вот несколько юнит-тестов для некоторых простых случаев:

public class Tests : ReactiveTest
{
    [Test]
    public void Puzzle1()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnCompleted<Man>(300));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle2()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnCompleted<Man>(400));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(350),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle3()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }),
            OnCompleted<Man>(400));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle4()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }),
            OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }),
            OnCompleted<Man>(500));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));
        var w2 = scheduler.CreateHotObservable(
            OnNext(155, new Woman { Id = 20, Location = 100.0 }),
            OnNext(255, new Woman { Id = 20, Location = 200.0 }),
            OnNext(355, new Woman { Id = 20, Location = 300.0 }),
            OnCompleted<Woman>(455));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        var expectedVector2 = new[]
                       {
                           OnNext(300, Tuple.Create(3.0, 200.0)),
                           OnNext(355, Tuple.Create(3.0, 300.0)),
                           OnNext(400, Tuple.Create(4.0, 300.0)),
                           OnCompleted<Tuple<double,double>>(455),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
        ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]);
    }

    private static IEnumerable<Recorded<Notification<IList<Recorded<Notification<Tuple<double, double>>>>>>> runQuery(TestScheduler scheduler, IObservable<IObservable<Woman>> women, IObservable<IObservable<Man>> men)
    {
        // assuming nested sequences are hot
        var vectors =
            from manDuration in men
            join womanDuration in women on manDuration equals womanDuration
            select from man in manDuration
                   join woman in womanDuration on manDuration equals womanDuration
                   where man.LookingAt == woman.Id
                   select Tuple.Create(man.Location, woman.Location);

        var query = vectors.Select(vectorDuration =>
        {
            var vectorResults = scheduler.CreateObserver<Tuple<double, double>>();
            vectorDuration.Subscribe(vectorResults);
            return vectorResults.Messages;
        });

        var results = scheduler.Start(() => query, 0, 0, 1000).Messages;
        return results;
    }
}

(примечание: этот вопрос был перекрестно опубликован на форумах Rx: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe)

Ответы [ 2 ]

1 голос
/ 11 января 2014

Если я правильно вас понимаю, цель состоит в том, чтобы создать наблюдаемую «следуемую наблюдаемую», где «наблюдаемая наблюдаемая» начинается, когда мужчина начинает смотреть на женщину, и заканчивается, когда мужчина перестает смотреть на женщину. «Наблюдаемый» должен состоять из кортежей самых последних местоположений мужчины и женщины.

Идея здесь состоит в том, чтобы использовать CombineLatest, который будет принимать две наблюдаемые, и когда любая из них выдает значение, комбинатор оценивается для двух самых последних значений наблюдаемых, что создает значение в объединенной наблюдаемой , Однако CombineLatest завершается только тогда, когда обе наблюдаемые были завершены. В этом случае мы хотим завершить наблюдаемое, когда завершится любой из двух источников. Для этого мы определяем следующий метод расширения (я не думаю, что такой метод уже существует, но может быть более простое решение):

public static IObservable<TSource>
  UntilCompleted<TSource, TWhile>(this IObservable<TSource> source,
                                       IObservable<TWhile> lifetime)
{
  return Observable.Create<TSource>(observer =>
  {
    var subscription = source.Subscribe(observer);
    var limiter = lifetime.Subscribe(next => { }, () =>
    {
      subscription.Dispose();
      observer.OnCompleted();
    });
    return new CompositeDisposable(subscription, limiter);
  });
}

Этот метод похож на TakeUntil, но вместо того, чтобы принимать lifetime, он принимает значение, пока lifetime не завершится. Мы также можем определить простой метод расширения, который принимает первую строку, которая удовлетворяет предикату:

public static IObservable<TSource>
  Streak<TSource>(this IObservable<TSource> source,
                       Func<TSource, bool> predicate)
{
  return source.SkipWhile(x => !predicate(x)).TakeWhile(predicate);
}

Теперь для окончательного запроса мы объединяем всех мужчин со всеми женщинами, используя CombineLatest, и завершим это наблюдаемое на ранней стадии, используя UntilCompleted. Чтобы получить «следите за наблюдаемыми», мы выбираем полосу, где мужчина смотрит на женщину. Затем мы просто отображаем это на кортеж локаций.

var vectors =
  from manDuration in men
  from womanDuration in women
  select manDuration
  .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })
  .UntilCompleted(womanDuration)
  .UntilCompleted(manDuration)
  .Streak(pair => pair.Man.LookingAt == pair.Woman.Id)
  .Select(pair => Tuple.Create(pair.Man.Location, pair.Woman.Location));

Это проходит все ваши тесты, но не обрабатывает сценарий, когда мужчина некоторое время смотрит на женщину 10, потом на 20, а потом снова на 10; используется только первая полоса. Чтобы наблюдать все полосы, мы можем использовать следующий метод расширения, который возвращает наблюдаемую полосу:

public static IObservable<IObservable<TSource>>
  Streaks<TSource>(this IObservable<TSource> source,
                        Func<TSource, bool> predicate)
{
  return Observable.Create<IObservable<TSource>>(observer =>
  {
    ReplaySubject<TSource> subject = null;
    bool previous = false;
    return source.Subscribe(x =>
    {
      bool current = predicate(x);
      if (!previous && current)
      {
        subject = new ReplaySubject<TSource>();
        observer.OnNext(subject);
      }
      if (previous && !current) subject.OnCompleted();
      if (current) subject.OnNext(x);
      previous = current;
    }, () =>
    {
      if (subject != null) subject.OnCompleted();
      observer.OnCompleted();
    });
  });
}

При подписке только один раз на исходный поток и использовании ReplaySubject этот метод работает как для горячих, так и для холодных наблюдаемых. Теперь для окончательного запроса мы выберем все полосы следующим образом:

var vectors =
  from manDuration in men
  from womanDuration in women
  from streak in manDuration
  .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })
  .UntilCompleted(womanDuration)
  .UntilCompleted(manDuration)
  .Streaks(pair => pair.Man.LookingAt == pair.Woman.Id)
  select streak.Select(pair =>
    Tuple.Create(pair.Man.Location, pair.Woman.Location));
0 голосов
/ 11 января 2014

Я не уверен, что понимаю, почему вы моделируете поток местоположений мужчин и женщин как IObservable<IObservable<T>> вместо просто IObservable<T>, но это может сработать:

public static IObservable<Tuple<double, double>> GetLocationsObservable(IObservable<IObservable<Man>> menObservable, 
                                                                            IObservable<IObservable<Woman>> womenObservable)
{
    return Observable.CombineLatest(
        menObservable.Switch(),
        womenObservable.Switch(),
        (man, woman) => new {man, woman})
            .Where(manAndWoman => manAndWoman.man.LookingAt == manAndWoman.woman.Id)
            .Select(manAndWoman => Tuple.Create(manAndWoman.man.Location, manAndWoman.woman.Location));
}

Переключатели по существу «переключаются» на новое наблюдаемое при его нажатии, что выравнивает потоки. Где и выберите довольно просто.

У меня есть скрытое подозрение, что я что-то неправильно понимаю в требованиях, но я решил представить свой ответ на случай, если это поможет.

...