У меня есть два подхода.
Первый предоставляет способ связать воедино пары предикат / действие, чтобы «откачать» соответствующие значения. Это следует стилю оператора Rx.
Я могу написать это:
observable
.Syphon(s => s.StartsWith("a"), s => { })
.Syphon(s => s.StartsWith("b"), s => { })
.Syphon(s => s.StartsWith("c"), s => { })
.Syphon(s => s.StartsWith("d"), s => { })
.Subscribe(s => { /* otherwise */ });
Если у меня есть этот метод расширения:
public static IObservable<T> Syphon<T>(
this IObservable<T> source,
Func<T, bool> predicate,
Action<T> action)
{
if (source == null) throw new ArgumentNullException("source");
if (predicate == null) throw new ArgumentNullException("predicate");
if (action == null) throw new ArgumentNullException("action");
return Observable.Create<T>(o =>
source.Subscribe(
t =>
{
if (predicate(t))
{
action(t);
}
else
{
o.OnNext(t);
}
},
ex =>
o.OnError(ex),
() =>
o.OnCompleted()));
}
Он не позволяет вам добавлять и удалять пары предикат / действие на лету, но это довольно простой оператор, который может быть полезен.
Для полной функциональности добавления / удаления я предложил такой подход:
Func<Func<string, bool>, Action<string>, IDisposable> add;
observable
.Syphon(out add)
.Subscribe(s => { /* otherwise */ });
var startsWithA = add(s => s.StartsWith("a"), s => { /* a */ });
var startsWithB = add(s => s.StartsWith("b"), s => { /* b */ });
startsWithA.Dispose();
var startsWithC = add(s => s.StartsWith("c"), s => { /* c */ });
var startsWithD = add(s => s.StartsWith("d"), s => { /* d */ });
startsWithC.Dispose();
startsWithB.Dispose();
startsWithD.Dispose();
Перегрузка метода расширения .Syphon(out add)
позволяет методу эффективно возвращать два результата - нормальное возвращаемое значение - IObservable<T>
, а второе - как Func<Func<T, bool>, Action<T>, IDisposable>
. Это второе возвращаемое значение позволяет добавлять новые пары предикат / действие к оператору сифона, а затем удалять, вызывая Dispose
в возвращенной подписке - очень Rx-ish.
Вот метод расширения:
public static IObservable<T> Syphon<T>(
this IObservable<T> source,
out Func<Func<T, bool>, Action<T>, IDisposable> subscriber)
{
if (source == null) throw new ArgumentNullException("source");
var pas = new List<Tuple<Func<T, bool>, Action<T>>>();
subscriber = (p, a) =>
{
lock (pas)
{
var tuple = Tuple.Create(p, a);
pas.Add(tuple);
return Disposable.Create(() =>
{
lock (pas)
{
pas.Remove(tuple);
}
});
}
};
return Observable.Create<T>(o =>
source.Subscribe(
t =>
{
Action<T> a = null;
lock (pas)
{
var pa = pas.FirstOrDefault(x => x.Item1(t));
if (pa != null)
{
a = pa.Item2;
}
}
if (a != null)
{
a(t);
}
else
{
o.OnNext(t);
}
},
ex =>
o.OnError(ex),
() =>
o.OnCompleted()));
}
Я проверил код следующим образом:
var xs = Observable.Interval(TimeSpan.FromSeconds(0.2));
Func<Func<long, bool>, Action<long>, IDisposable> subscriber;
xs
.Syphon(out subscriber)
.Subscribe(x => Console.WriteLine(x));
var divBy3 = subscriber(
x => x % 3 == 0,
x => Console.WriteLine("divBy3"));
Thread.Sleep(2000);
var divBy2 = subscriber(
x => x % 2 == 0,
x => Console.WriteLine("divBy2"));
Thread.Sleep(2000);
divBy3.Dispose();
Thread.Sleep(2000);
divBy2.Dispose();
Thread.Sleep(10000);
И это произвело:
divBy3
1
2
divBy3
4
5
divBy3
7
8
divBy3
divBy2
11
divBy3
13
divBy2
divBy3
divBy2
17
divBy3
19
divBy2
21
divBy2
23
divBy2
25
divBy2
27
divBy2
29
30
31
32
...
И это казалось правильным. Дайте мне знать, если это решит это для вас.