Учитывая описанную проблему, которая является немного неопределенной, поскольку я не вижу, для чего используется фактический идентификатор (Guid
), или значения DateTime
- у меня следующий запрос решить вашу проблему:
IObservable<bool> execute =
idUpdate
.Publish(_idUpdate =>
from qsu in queryStateUpdate
select qsu.Item1
? _idUpdate.Select(x => true)
: Observable.Empty<bool>())
.Switch();
Я проверил это со следующими idUpdate
& queryStateUpdate
наблюдаемыми.
var rnd = new Random();
IObservable<Tuple<Guid, DateTime>> idUpdate =
Observable
.Generate(
0,
n => n < 10000,
n => n + 1,
n => Tuple.Create(Guid.NewGuid(), DateTime.Now),
n => TimeSpan.FromSeconds(rnd.NextDouble() * 0.1));
IObservable<Tuple<bool, DateTime>> queryStateUpdate =
Observable
.Generate(
0,
n => n < 100,
n => n + 1,
n => n % 2 == 0,
n => TimeSpan.FromSeconds(rnd.NextDouble() * 2.0))
.StartWith(true)
.DistinctUntilChanged()
.Select(b => Tuple.Create(b, DateTime.Now));
Если вы сможете дать некоторые разъяснения по вашей проблеме, я, вероятно, смогу дать лучший ответ в соответствии с вашими потребностями.
РЕДАКТИРОВАТЬ: Добавлено поведение "replay (1)" требуется, когда Id меняется, когда неактивен.
Обратите внимание, что я избавился от необходимости иметь кортежи с DateTime
.
IObservable<Guid> idUpdate = ...
IObservable<bool> queryStateUpdate = ...
var replay = new ReplaySubject<Guid>(1);
var disposer = new SerialDisposable();
Func<bool, IObservable<bool>, IObservable<Guid>,
IObservable<Guid>> getSwitch = (qsu, qsus, iu) =>
{
if (qsu)
{
return replay.Merge(iu);
}
else
{
replay.Dispose();
replay = new ReplaySubject<Guid>(1);
disposer.Disposable = iu.TakeUntil(qsus).Subscribe(replay);
return Observable.Empty<Guid>();
}
};
var query =
queryStateUpdate
.DistinctUntilChanged()
.Publish(qsus =>
idUpdate
.Publish(ius =>
qsus
.Select(qsu =>
getSwitch(qsu, qsus, ius))))
.Switch();