Как выполнить Rx Observable в зависимости от условия в событии - PullRequest
6 голосов
/ 14 ноября 2011

У меня есть событие, которое я не контролирую, которое предоставляет мне данные. EventArgs выглядит примерно так:

class MyEventArg {
  bool IsLastItem {get;}
  Data DataItem {get;}
}

Я использую Rx для преобразования этого события в IObservable. Но я хочу завершить наблюдаемое, если IsLastItem равен true.

Какие-нибудь изящные идеи? Одним из способов было бы передать данные через субъект, который я имею больше контроля, чтобы установить событие OnComplete, если возникает условие ...

Ответы [ 3 ]

9 голосов
/ 14 ноября 2011

Если вы хотите, чтобы последний элемент был включен, вы можете объединить поток только с последним элементом вместе с обычным потоком, объединенным с TakeWhile.Вот простое консольное приложение, чтобы доказать это:

var subject = new List<string>
{                            
"test",
"last"
}.ToObservable();

var my = subject
            .Where(x => x == "last").Take(1)
            .Merge(subject.TakeWhile(x => x != "last"));

my.Subscribe(
    o => Console.WriteLine("On Next: " + o), 
    () => Console.WriteLine("Completed"));

Console.ReadLine();

Это печатает:

On Next: test
On Next: last
Completed

ОБНОВЛЕНИЕ Была ошибка, которая подавляла сообщение OnCompleted, если основнойНаблюдаемые на самом деле не завершены.Я исправил код, чтобы гарантировать, что OnCompleted будет вызван

И если вы хотите избежать многократной подписки на базовую последовательность для холодных наблюдаемых, вы можете изменить код следующим образом:

var my = subject.Publish(p => p
            .Where(x => x == "last").Take(1)
            .Merge(p.TakeWhile(x => x != "last")));
2 голосов
/ 15 ноября 2011
public static IObservable<TSource> TakeWhileInclusive<TSource>(
        this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    return Observable
        .Create<TSource>(o => source.Subscribe(x =>
                                                   {
                                                       o.OnNext(x);
                                                       if (!predicate(x))
                                                           o.OnCompleted();
                                                   },
                                               o.OnError,
                                               o.OnCompleted
                                  ));
}
2 голосов
/ 14 ноября 2011

Вы ищете что-то подобное?

IObservable<MyEventArg> result =
    myEventArgObservable.TakeWhile(arg => !arg.IsLastItem);
...