Окно Rx, Join, GroupJoin? - PullRequest
       2

Окно Rx, Join, GroupJoin?

2 голосов
/ 10 октября 2011

Я сгенерировал / протестировал две наблюдаемые, которые должны быть объединены для выполнения одного запроса.

Пользователь может иметь несколько ролей. Всякий раз, когда их идентификатор роли изменяется, данные должны быть обновлены. Но данные должны обновляться только в том случае, если запрос активен (существует некоторый элемент управления, которому в данный момент нужны данные).

Изменение идентификатора роли также может произойти, если запрос приостановлен. Когда запрос снова становится активным, данные также должны быть загружены.

//Tuple has the Id of the current Role and the time that the Id updated
IObservable<Tuple<Guid, DateTime>> idUpdate

//Tuple has the state of the query (true=active or false=suspended)
//and the time the state of the query updated
IObservable<Tuple<bool, DateTime>> queryStateUpdate 

Я хотел бы создать

//A hot observable that pushes true whenever the query should execute
IObservable<bool> execute 

Я разбил его на два случая, которые можно объединить, но я не могу понять, как создать наблюдаемые наблюдения.

  • case a) Идентификатор роли обновлен и последнее состояние было Активным
  • вариант b) состояние обновлено до Active &&, это первое активное состояние с момента обновления идентификатора роли

Я просмотрел видео, сайт Ли Кэмпбелла, TOC для начинающих и т. Д., Но не могу найти хорошего примера для этого rx соединения. Любые идеи о том, как создать наблюдаемые или выполнить наблюдения?

Ответы [ 3 ]

1 голос
/ 11 октября 2011

Я прочитал вопрос как говорящий, что есть поток уведомлений idUpdate, который будет обрабатываться, пока установлено queryStateUpdate.Если queryStateUpdate не установлено, то уведомления должны приостанавливаться до тех пор, пока queryStateUpdate не будет снова установлено.

В этом случае оператор соединения не решит вашу проблему.

Я быпредположим, что вам нужна какая-то форма кэша, когда queryStateUpdate не установлен, то есть

List<Tuple<Guid,DateTime>> cache = new List<Tuple<Guid,DateTime>>();
Subject<Tuple<Guid,DateTime>> execute = new Subject<Tuple<Guid,DateTime>>();

idUpdate.Subscribe( x => {
    if (queryStateUpdate.Last().Item1) //might be missing something here with Last, you might need to copy the state out
        exeucte.OnNext(x);
    else
        cache.Add(x);
    });

queryStateUpdate.Subscribe(x=> {
    if (x.Item1)
    {
       //needs threadsafety
       foreach(var x in cache)
           execute.OnNext(x);
      cache.Clear();
    });
1 голос
/ 11 октября 2011

Учитывая описанную проблему, которая является немного неопределенной, поскольку я не вижу, для чего используется фактический идентификатор (Guid), или значения DateTime - у меня следующий запрос решить вашу проблему:

IObservable<bool> execute =
    idUpdate
        .Publish(_idUpdate =>
            from qsu in queryStateUpdate
            select qsu.Item1
                ? _idUpdate.Select(x => true) 
                : Observable.Empty<bool>())
        .Switch();

Я проверил это со следующими idUpdate & queryStateUpdate наблюдаемыми.

var rnd = new Random();

IObservable<Tuple<Guid, DateTime>> idUpdate =
    Observable
        .Generate(
            0,
            n => n < 10000,
            n => n + 1,
            n => Tuple.Create(Guid.NewGuid(), DateTime.Now),
            n => TimeSpan.FromSeconds(rnd.NextDouble() * 0.1));

IObservable<Tuple<bool, DateTime>> queryStateUpdate =
    Observable
        .Generate(
            0,
            n => n < 100,
            n => n + 1,
            n => n % 2 == 0,
            n => TimeSpan.FromSeconds(rnd.NextDouble() * 2.0))
        .StartWith(true)
        .DistinctUntilChanged()
        .Select(b => Tuple.Create(b, DateTime.Now));

Если вы сможете дать некоторые разъяснения по вашей проблеме, я, вероятно, смогу дать лучший ответ в соответствии с вашими потребностями.


РЕДАКТИРОВАТЬ: Добавлено поведение "replay (1)" требуется, когда Id меняется, когда неактивен.

Обратите внимание, что я избавился от необходимости иметь кортежи с DateTime.

IObservable<Guid> idUpdate = ...
IObservable<bool> queryStateUpdate = ...

var replay = new ReplaySubject<Guid>(1);
var disposer = new SerialDisposable();
Func<bool, IObservable<bool>, IObservable<Guid>,
    IObservable<Guid>> getSwitch = (qsu, qsus, iu) =>
{
    if (qsu)
    {
        return replay.Merge(iu);
    }
    else
    {
        replay.Dispose();
        replay = new ReplaySubject<Guid>(1);
        disposer.Disposable = iu.TakeUntil(qsus).Subscribe(replay);
        return Observable.Empty<Guid>();
    }
};

var query =
    queryStateUpdate
        .DistinctUntilChanged()
        .Publish(qsus =>
            idUpdate
            .Publish(ius =>
                qsus
                    .Select(qsu =>
                        getSwitch(qsu, qsus, ius))))
        .Switch();
0 голосов
/ 12 октября 2011

Благодаря Enigmativity и AlSki.Используя кеш я придумал ответ.

var execute = new Subject<Guid>();
var cache = new Stack<Guid>();
idUpdate.CombineLatest(queryStateUpdate, (id, qs) => new { id, qs }).Subscribe( anon =>
{
  var id = anon.id;
  var queryState = anon.qs;
  //The roleId updated after the queryState updated
  if (id.Item2 > queryState.Item2)
  {
      //If the queryState is active, call execute
      if (observationState.Item1)
      {
         cache.Clear();
         execute.OnNext(roleId.Item1);
         return;
      }
      //If the id updated and the state is suspended, cache it
      cache.Push(id.Item1);
  }
  //The queryState updated after the roleId
  else if (queryState.Item2 > roleId.Item2)
  {
     //If the queryState is active and a roleId update has been cached, call execute
     if (queryState.Item1 && cache.Count > 0)
     {
         execute.OnNext(cache.Pop());
         cache.Clear();
     }
  }});
...