Rx.NET оператор "ворота" - PullRequest
0 голосов
/ 03 июня 2018

[Примечание: я использую 3.1, если это имеет значение.Кроме того, я спрашивал об этом при просмотре кода, но пока никаких ответов.]

Мне нужен оператор, который позволяет потоку логических значений действовать в качестве шлюза для другого потока (пусть значения проходят, когда поток шлюза равен trueбросьте их, когда это ложно).Я бы обычно использовал Switch для этого, но если исходный поток холодный, он будет продолжать его воссоздавать, чего я не хочу.

Я также хочу очистить после себя, так что результат завершится, если либоисточника или гейта завершен.

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
    var s = source.Publish().RefCount();
    var g = gate.Publish().RefCount();

    var sourceCompleted = s.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
    var gateCompleted = g.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);

    var anyCompleted = Observable.Amb(sourceCompleted, gateCompleted);

    var flag = false;
    g.TakeUntil(anyCompleted).Subscribe(value => flag = value);

    return s.Where(_ => flag).TakeUntil(anyCompleted);
}

Помимо общего многословия, мне не нравится, что я подписываюсь на вентиль, даже если на результат никогда не подписывается (в этом случае этот оператор должен быть неактивным).Есть ли способ избавиться от этой подписки?

Я также пробовал эту реализацию, но это еще хуже, когда дело доходит до очистки после себя:

return Observable.Create<T>(
    o =>
    {
        var flag = false;
        gate.Subscribe(value => flag = value);

        return source.Subscribe(
            value =>
            {
                if (flag) o.OnNext(value);
            });
    });

Это тестыЯ использую для проверки реализации:

[TestMethod]
public void TestMethod1()
{
    var output = new List<int>();

    var source = new Subject<int>();
    var gate = new Subject<bool>();

    var result = source.When(gate);
    result.Subscribe(output.Add, () => output.Add(-1));

    // the gate starts with false, so the source events are ignored
    source.OnNext(1);
    source.OnNext(2);
    source.OnNext(3);
    CollectionAssert.AreEqual(new int[0], output);

    // setting the gate to true will let the source events pass
    gate.OnNext(true);
    source.OnNext(4);
    CollectionAssert.AreEqual(new[] { 4 }, output);
    source.OnNext(5);
    CollectionAssert.AreEqual(new[] { 4, 5 }, output);

    // setting the gate to false stops source events from propagating again
    gate.OnNext(false);
    source.OnNext(6);
    source.OnNext(7);
    CollectionAssert.AreEqual(new[] { 4, 5 }, output);

    // completing the source also completes the result
    source.OnCompleted();
    CollectionAssert.AreEqual(new[] { 4, 5, -1 }, output);
}

[TestMethod]
public void TestMethod2()
{
    // completing the gate also completes the result
    var output = new List<int>();

    var source = new Subject<int>();
    var gate = new Subject<bool>();

    var result = source.When(gate);
    result.Subscribe(output.Add, () => output.Add(-1));

    gate.OnCompleted();
    CollectionAssert.AreEqual(new[] { -1 }, output);
}

Ответы [ 3 ]

0 голосов
/ 03 июня 2018

Обновление : Это прекращается, когда также завершаются ворота.Я пропустил TestMethod2 в копировании / вставке:

    return gate.Publish(_gate => source
        .WithLatestFrom(_gate.StartWith(false), (value, b) => (value, b))
        .Where(t => t.b)
        .Select(t => t.value)
        .TakeUntil(_gate.IgnoreElements().Materialize()
    ));

Это проходит ваши тесты TestMethod1, оно не завершается, когда наблюдаемые ворота делают.

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
    return source
        .WithLatestFrom(gate.StartWith(false), (value, b) => (value, b))
        .Where(t => t.b)
        .Select(t => t.value);
}
0 голосов
/ 04 июня 2018

Это работает:

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
    return
        source.Publish(ss => gate.Publish(gs =>
            gs
                .Select(g => g ? ss : ss.IgnoreElements())
                .Switch()
                .TakeUntil(Observable.Amb(
                    ss.Select(s => true).Materialize().LastAsync(),
                    gs.Materialize().LastAsync()))));
}

Это проходит оба теста.

0 голосов
/ 03 июня 2018

Вы были на правильном пути с Observable.Create.Вы должны вызывать onError и onCompleted из обеих подписок на наблюдаемом объекте, чтобы правильно завершить или вызвать ошибку при необходимости.Также, возвращая оба значения IDisposable в делегате Create, вы гарантируете правильную очистку обеих подписок, если собираетесь утилизировать подписку Когда до завершения source или gate.

    public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
    {
        return Observable.Create<T>(
            o =>
            {
                var flag = false;
                var gs = gate.Subscribe(
                    value => flag = value,
                    e => o.OnError(e),
                    () => o.OnCompleted());

                var ss = source.Subscribe(
                    value =>
                    {
                        if (flag) o.OnNext(value);
                    },
                    e => o.OnError(e), 
                    () => o.OnCompleted());

                return new CompositeDisposable(gs, ss);
            });
    }

Короче, но гораздо труднее читать версию, используя только операторы Rx.Для холодных наблюдаемых, вероятно, нужен источник / refcount для источника.

    public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
    {
        return gate
            .Select(g => g ? source : source.IgnoreElements())
            .Switch()
            .TakeUntil(source.Materialize()
                             .Where(s => s.Kind == NotificationKind.OnCompleted));
    }
...