[Примечание: я использую 3.1, если это имеет значение.Кроме того, я спрашивал об этом при просмотре кода, но пока никаких ответов.]
Мне нужен оператор, который позволяет потоку логических значений действовать в качестве шлюза для другого потока (пусть значения проходят, когда поток шлюза равен trueбросьте их, когда это ложно).Я бы обычно использовал Switch для этого, но если исходный поток холодный, он будет продолжать его воссоздавать, чего я не хочу.
Я также хочу очистить после себя, так что результат завершится, если либоисточника или гейта завершен.
public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
var s = source.Publish().RefCount();
var g = gate.Publish().RefCount();
var sourceCompleted = s.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
var gateCompleted = g.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
var anyCompleted = Observable.Amb(sourceCompleted, gateCompleted);
var flag = false;
g.TakeUntil(anyCompleted).Subscribe(value => flag = value);
return s.Where(_ => flag).TakeUntil(anyCompleted);
}
Помимо общего многословия, мне не нравится, что я подписываюсь на вентиль, даже если на результат никогда не подписывается (в этом случае этот оператор должен быть неактивным).Есть ли способ избавиться от этой подписки?
Я также пробовал эту реализацию, но это еще хуже, когда дело доходит до очистки после себя:
return Observable.Create<T>(
o =>
{
var flag = false;
gate.Subscribe(value => flag = value);
return source.Subscribe(
value =>
{
if (flag) o.OnNext(value);
});
});
Это тестыЯ использую для проверки реализации:
[TestMethod]
public void TestMethod1()
{
var output = new List<int>();
var source = new Subject<int>();
var gate = new Subject<bool>();
var result = source.When(gate);
result.Subscribe(output.Add, () => output.Add(-1));
// the gate starts with false, so the source events are ignored
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
CollectionAssert.AreEqual(new int[0], output);
// setting the gate to true will let the source events pass
gate.OnNext(true);
source.OnNext(4);
CollectionAssert.AreEqual(new[] { 4 }, output);
source.OnNext(5);
CollectionAssert.AreEqual(new[] { 4, 5 }, output);
// setting the gate to false stops source events from propagating again
gate.OnNext(false);
source.OnNext(6);
source.OnNext(7);
CollectionAssert.AreEqual(new[] { 4, 5 }, output);
// completing the source also completes the result
source.OnCompleted();
CollectionAssert.AreEqual(new[] { 4, 5, -1 }, output);
}
[TestMethod]
public void TestMethod2()
{
// completing the gate also completes the result
var output = new List<int>();
var source = new Subject<int>();
var gate = new Subject<bool>();
var result = source.When(gate);
result.Subscribe(output.Add, () => output.Add(-1));
gate.OnCompleted();
CollectionAssert.AreEqual(new[] { -1 }, output);
}