Наблюдайте за ценностями, не замеченными у других наблюдателей - PullRequest
3 голосов
/ 02 марта 2020

У меня есть наблюдаемое, которое испускает уникальные значения, например,

var source=Observable.Range(1,100).Publish();
source.Connect();

Я хочу наблюдать его значения, например, от двух наблюдателей, но каждый наблюдатель должен получать уведомление только для значений, не видимых в других наблюдателях.

Так что, если первый наблюдатель содержит значение 10, второй наблюдатель никогда не должен получать уведомление о значении 10.

Обновление

Я выбрал @ Asti` s ответ, потому что он был первым и, хотя глючил, он указывал в правильном направлении и голосом @ Shlomo с повышенным голосом. Жаль, что я не могу принять оба ответа, так как ответ @Shlomo был более правильным, и я действительно ценю всю его помощь, которую мы получаем по этому тегу.

Ответы [ 3 ]

3 голосов
/ 03 марта 2020

РЕДАКТИРОВАТЬ : @Asti исправил его ошибку, а я исправил свою, основываясь на его ответе. Наши ответы сейчас во многом похожи. У меня есть идея, как сделать чисто реактивный, если у меня будет время, я опубликую это позже.

Фиксированный код:

public static IObservable<T> RoundRobin2<T>(this IObservable<T> source)
{
    var subscribers = new BehaviorSubject<ImmutableList<IObserver<T>>>(ImmutableList<IObserver<T>>.Empty);
    ImmutableList<IObserver<T>> latest = ImmutableList<IObserver<T>>.Empty;
    subscribers.Subscribe(l => latest = l);

    var shared = source
            .Select((v, i) => (v, i))
            .WithLatestFrom(subscribers, (t, s) => (t.v, t.i, s))
            .Publish()
            .RefCount();
    return Observable.Create<T>(observer =>
    {
        subscribers.OnNext(latest.Add(observer));
        var dispose = Disposable.Create(() => subscribers.OnNext(latest.Remove(observer)));

        var sub = shared
            .Where(t => t.i % t.s.Count == t.s.FindIndex(o => o == observer))
            .Select(t => t.v)
            .Subscribe(observer);

        return new CompositeDisposable(dispose, sub);
    });
}

Оригинальный ответ : Я проголосовал за ответ @ Асти, потому что он в значительной степени прав: просто потому, что ты можешь, не значит, что ты должен. И его ответ в основном работает, но в нем есть ошибка:

Это прекрасно работает:

var source = Observable.Range(1, 20).Publish();
var dist = source.RoundRobin();
dist.Subscribe(i => Console.WriteLine($"One sees {i}"));
dist.Take(1).Subscribe(i => Console.WriteLine($"Two sees {i}"));

Это не так:

var source = Observable.Range(1, 20).Publish();
var dist = source.RoundRobin();
dist.Take(1).Subscribe(i => Console.WriteLine($"One sees {i}"));
dist.Subscribe(i => Console.WriteLine($"Two sees {i}"));

Вывод:

One sees 1
Two sees 1
Two sees 2
Two sees 3
Two sees 4
...

Сначала я подумал, что ошибка связана с Хэллоуином , но теперь я не уверен. .ToArray() в Repeat должны позаботиться об этом. Я также написал наблюдаемую реализацию pure-i sh с той же ошибкой. Эта реализация не гарантирует идеального Round Robin, но это не было вопросом:

public static IObservable<T> RoundRobin2<T>(this IObservable<T> source)
{
    var subscribers = new BehaviorSubject<ImmutableList<IObserver<T>>>(ImmutableList<IObserver<T>>.Empty);
    ImmutableList<IObserver<T>> latest = ImmutableList<IObserver<T>>.Empty;
    subscribers.Subscribe(l => latest = l);

    var shared = source
            .Select((v, i) => (v, i))
            .WithLatestFrom(subscribers, (t, s) => (t.v, t.i, s))
            .Publish()
            .RefCount();
    return Observable.Create<T>(observer =>
    {
        subscribers.OnNext(latest.Add(observer));
        var dispose = Disposable.Create(() => subscribers.OnNext(latest.Remove(observer)));

        var sub = shared
            .Where(t => t.i % t.s.Count == t.s.FindIndex(o => o == observer))
            .Select(t => t.v)
            .Subscribe(observer);

        return new CompositeDisposable(dispose, sub);
    });
}
3 голосов
/ 03 марта 2020

Наблюдаемые не должны вести себя по-разному для разных наблюдателей; лучшим подходом было бы дать каждому наблюдателю свою собственную отфильтрованную наблюдаемую.

При этом, если ваши ограничения требуют, чтобы вам было необходимо это поведение в одной наблюдаемой - мы можем использовать метод Round-Robin.

    public static IEnumerable<T> Repeat<T>(this IEnumerable<T> source)
    {
        for (; ; )
            foreach (var item in source.ToArray())
                yield return item;
    }

    public static IObservable<T> RoundRobin<T>(this IObservable<T> source)
    {
        var subscribers = new List<IObserver<T>>();
        var shared = source
            .Zip(subscribers.Repeat(), (value, observer) => (value, observer))
            .Publish()
            .RefCount();

        return Observable.Create<T>(observer =>
        {
            subscribers.Add(observer);
            var subscription = 
                shared
                .Where(pair => pair.observer == observer)
                .Select(pair => pair.value)
                .Subscribe(observer);

            var dispose = Disposable.Create(() => subscribers.Remove(observer));
            return new CompositeDisposable(subscription, dispose);
        });
    }

Использование:

var source = Observable.Range(1, 100).Publish();
var dist = source.RoundRobin();
dist.Subscribe(i => Console.WriteLine($"One sees {i}"));
dist.Subscribe(i => Console.WriteLine($"Two sees {i}"));

source.Connect();

Результат:

One sees 1
Two sees 2
One sees 3
Two sees 4
One sees 5
Two sees 6
One sees 7
Two sees 8
One sees 9
Two sees 10

Если у вас уже есть список наблюдателей, код становится намного проще.

2 голосов
/ 03 марта 2020

Это простая реализация распределенной очереди, использующая поток данных TPL. Но если разные наблюдатели не видят одно и то же значение, маловероятно, что оно будет вести себя неправильно. Он не циклический, но на самом деле имеет семантику противодавления.

    public static IObservable<T> Distribute<T>(this IObservable<T> source)
    {
        var buffer = new BufferBlock<T>();
        source.Subscribe(buffer.AsObserver());             
        return Observable.Create<T>(observer =>
            buffer.LinkTo(new ActionBlock<T>(observer.OnNext, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 })
        );
    }

Вывод

One sees 1
Two sees 2
One sees 3
Two sees 4
One sees 5
One sees 6
One sees 7
One sees 8
One sees 9
One sees 10

Я бы предпочел полностью пропустить Rx и просто использовать поток данных TPL.

...