РЕДАКТИРОВАТЬ : @Asti исправил его ошибку, а я исправил свою, основываясь на его ответе. Наши ответы сейчас во многом похожи. У меня есть идея, как сделать чисто реактивный, если у меня будет время, я опубликую это позже.
Фиксированный код:
public static IObservable<T> RoundRobin2<T>(this IObservable<T> source)
{
var subscribers = new BehaviorSubject<ImmutableList<IObserver<T>>>(ImmutableList<IObserver<T>>.Empty);
ImmutableList<IObserver<T>> latest = ImmutableList<IObserver<T>>.Empty;
subscribers.Subscribe(l => latest = l);
var shared = source
.Select((v, i) => (v, i))
.WithLatestFrom(subscribers, (t, s) => (t.v, t.i, s))
.Publish()
.RefCount();
return Observable.Create<T>(observer =>
{
subscribers.OnNext(latest.Add(observer));
var dispose = Disposable.Create(() => subscribers.OnNext(latest.Remove(observer)));
var sub = shared
.Where(t => t.i % t.s.Count == t.s.FindIndex(o => o == observer))
.Select(t => t.v)
.Subscribe(observer);
return new CompositeDisposable(dispose, sub);
});
}
Оригинальный ответ : Я проголосовал за ответ @ Асти, потому что он в значительной степени прав: просто потому, что ты можешь, не значит, что ты должен. И его ответ в основном работает, но в нем есть ошибка:
Это прекрасно работает:
var source = Observable.Range(1, 20).Publish();
var dist = source.RoundRobin();
dist.Subscribe(i => Console.WriteLine($"One sees {i}"));
dist.Take(1).Subscribe(i => Console.WriteLine($"Two sees {i}"));
Это не так:
var source = Observable.Range(1, 20).Publish();
var dist = source.RoundRobin();
dist.Take(1).Subscribe(i => Console.WriteLine($"One sees {i}"));
dist.Subscribe(i => Console.WriteLine($"Two sees {i}"));
Вывод:
One sees 1
Two sees 1
Two sees 2
Two sees 3
Two sees 4
...
Сначала я подумал, что ошибка связана с Хэллоуином , но теперь я не уверен. .ToArray()
в Repeat
должны позаботиться об этом. Я также написал наблюдаемую реализацию pure-i sh с той же ошибкой. Эта реализация не гарантирует идеального Round Robin, но это не было вопросом:
public static IObservable<T> RoundRobin2<T>(this IObservable<T> source)
{
var subscribers = new BehaviorSubject<ImmutableList<IObserver<T>>>(ImmutableList<IObserver<T>>.Empty);
ImmutableList<IObserver<T>> latest = ImmutableList<IObserver<T>>.Empty;
subscribers.Subscribe(l => latest = l);
var shared = source
.Select((v, i) => (v, i))
.WithLatestFrom(subscribers, (t, s) => (t.v, t.i, s))
.Publish()
.RefCount();
return Observable.Create<T>(observer =>
{
subscribers.OnNext(latest.Add(observer));
var dispose = Disposable.Create(() => subscribers.OnNext(latest.Remove(observer)));
var sub = shared
.Where(t => t.i % t.s.Count == t.s.FindIndex(o => o == observer))
.Select(t => t.v)
.Subscribe(observer);
return new CompositeDisposable(dispose, sub);
});
}