Как реализовать «обработчик оставшегося» наблюдателя, используя реактивные расширения - PullRequest
3 голосов
/ 17 октября 2011

У меня есть IObservable<string> и несколько наблюдателей, которые обрабатывают строки на основе некоторого условия:

observable.Subscribe(s => { if (s.StartsWith("a")) {...} });
observable.Subscribe(s => { if (s.StartsWith("b")) {...} });
observable.Subscribe(s => { if (s.StartsWith("c")) {...} });
observable.Subscribe(s => { if (s.StartsWith("d")) {...} });
....

Это упрощенный пример (условие более сложное, и наблюдаемые события не являются строками), ноВы поняли.

Я хотел бы иметь IObserver<string>, который перехватывает все строки, которые не обрабатываются никаким другим наблюдателем.Наблюдатели с различными условиями (например, StartsWith("e")) могут быть добавлены в любое время, и набор условий не перекрывается.

Поддерживается ли этот сценарий каким-либо образом?Или я должен пометить наблюдаемые строки как обработанные и подписаться на необработанные строки, как только все другие наблюдатели попытались (и как мне это реализовать)?

Ответы [ 3 ]

3 голосов
/ 17 октября 2011

У меня есть два подхода.

Первый предоставляет способ связать воедино пары предикат / действие, чтобы «откачать» соответствующие значения. Это следует стилю оператора Rx.

Я могу написать это:

observable
    .Syphon(s => s.StartsWith("a"), s => { })
    .Syphon(s => s.StartsWith("b"), s => { })
    .Syphon(s => s.StartsWith("c"), s => { })
    .Syphon(s => s.StartsWith("d"), s => { })
    .Subscribe(s => { /* otherwise */ });

Если у меня есть этот метод расширения:

public static IObservable<T> Syphon<T>(
    this IObservable<T> source,
    Func<T, bool> predicate,
    Action<T> action)
{
    if (source == null) throw new ArgumentNullException("source");
    if (predicate == null) throw new ArgumentNullException("predicate");
    if (action == null) throw new ArgumentNullException("action");
    return Observable.Create<T>(o =>
        source.Subscribe(
            t =>
            {
                if (predicate(t))
                {
                    action(t);
                }
                else
                {
                    o.OnNext(t);
                }
            },
            ex =>
                o.OnError(ex),
            () =>
                o.OnCompleted()));
}

Он не позволяет вам добавлять и удалять пары предикат / действие на лету, но это довольно простой оператор, который может быть полезен.

Для полной функциональности добавления / удаления я предложил такой подход:

Func<Func<string, bool>, Action<string>, IDisposable> add;

observable
    .Syphon(out add)
    .Subscribe(s => { /* otherwise */ });

var startsWithA = add(s => s.StartsWith("a"), s => { /* a */ });
var startsWithB = add(s => s.StartsWith("b"), s => { /* b */ });
startsWithA.Dispose();
var startsWithC = add(s => s.StartsWith("c"), s => { /* c */ });
var startsWithD = add(s => s.StartsWith("d"), s => { /* d */ });
startsWithC.Dispose();
startsWithB.Dispose();
startsWithD.Dispose();

Перегрузка метода расширения .Syphon(out add) позволяет методу эффективно возвращать два результата - нормальное возвращаемое значение - IObservable<T>, а второе - как Func<Func<T, bool>, Action<T>, IDisposable>. Это второе возвращаемое значение позволяет добавлять новые пары предикат / действие к оператору сифона, а затем удалять, вызывая Dispose в возвращенной подписке - очень Rx-ish.

Вот метод расширения:

public static IObservable<T> Syphon<T>(
    this IObservable<T> source,
    out Func<Func<T, bool>, Action<T>, IDisposable> subscriber)
{
    if (source == null) throw new ArgumentNullException("source");

    var pas = new List<Tuple<Func<T, bool>, Action<T>>>();

    subscriber = (p, a) =>
    {
        lock (pas)
        {
            var tuple = Tuple.Create(p, a);
            pas.Add(tuple);
            return Disposable.Create(() =>
            {
                lock (pas)
                {
                    pas.Remove(tuple);
                }
            });
        }
    };

    return Observable.Create<T>(o =>
        source.Subscribe(
            t =>
            {
                Action<T> a = null;
                lock (pas)
                {
                    var pa = pas.FirstOrDefault(x => x.Item1(t));
                    if (pa != null)
                    {
                        a = pa.Item2;
                    }
                }
                if (a != null)
                {
                    a(t);
                }
                else
                {
                    o.OnNext(t);
                }
            },
            ex =>
                o.OnError(ex),
            () =>
                o.OnCompleted()));
}

Я проверил код следующим образом:

var xs = Observable.Interval(TimeSpan.FromSeconds(0.2));

Func<Func<long, bool>, Action<long>, IDisposable> subscriber;
xs
    .Syphon(out subscriber)
    .Subscribe(x => Console.WriteLine(x));

var divBy3 = subscriber(
    x => x % 3 == 0,
    x => Console.WriteLine("divBy3"));

Thread.Sleep(2000);

var divBy2 = subscriber(
    x => x % 2 == 0,
    x => Console.WriteLine("divBy2"));

Thread.Sleep(2000);
divBy3.Dispose();
Thread.Sleep(2000);
divBy2.Dispose();
Thread.Sleep(10000);

И это произвело:

divBy3
1
2
divBy3
4
5
divBy3
7
8
divBy3
divBy2
11
divBy3
13
divBy2
divBy3
divBy2
17
divBy3
19
divBy2
21
divBy2
23
divBy2
25
divBy2
27
divBy2
29
30
31
32
...

И это казалось правильным. Дайте мне знать, если это решит это для вас.

2 голосов
/ 17 октября 2011

Один из вариантов - сделать подписчиков заметными.Итак, что эти подписчики делают, если они не обрабатывают значение, то они излучают его через свой наблюдаемый интерфейс, и тогда последний подписчик (который обрабатывает все неиспользуемые значения) будет объектом в одну тонну, который подписывается на каждый из наблюдаемых интерфейсовдругих подписчиков.Что-то вроде:

public class MyObserver : IObserver<string>, IObservable<string>
{
    Subject<string> s = new Subject<string>();
    public MyObserver(IObserver<string> obs)
    {
        s.Subscribe(obs);
    }
    public void OnCompleted()
    { }
    public void OnError(Exception error)
    { }
    public void OnNext(string value)
    {
        //If condition matches then else dont do on next
        s.OnNext(value);
    }
    public IDisposable Subscribe(IObserver<string> observer)
    {
        return s.Subscribe(observer);
    }
}
public class LastObserver : IObserver<string>
{
    public void OnCompleted()
    {   }

    public void OnError(Exception error)
    { }

    public void OnNext(string value)
    { //Do something with not catched value
    }
}
static LastObserver obs = new LastObserver();
static void Main()
{
    var timer = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => i.ToString());
    timer.Subscribe(new MyObserver(obs));
    timer.Subscribe(new MyObserver(obs));
    timer.Subscribe(new MyObserver(obs));

} 
1 голос
/ 17 октября 2011

Я не знаю ни одного из готовых способов сделать это, но я бы сделал это как в

class ConditionAction
{
     public Predicate<string> Condition {get; set; }
     public Action<string> Action {get; set; }
}

var conditions = new ConditionAction[]{action1, action2, action3};

foreach (var condition in conditions)
       observable.Where(condition.Condition).Subscribe(condition.Action);
.....
observable.Where(s=>!conditions.Any(c=>c.Condition(s))).Subscribe(...);
...