Моя цель заключается в том, чтобы спулировать все элементы / уведомления, начиная с IObservable<T>
, для будущих подписчиков.
Например, если кто-то подписывается на поток сообщений, он сначала получает все сообщения, которые были получены до подписки.Затем он начинает получать новые сообщения, когда они есть.Это должно происходить незаметно, без повторов и потерь на «границе» между старыми и новыми сообщениями.
Я придумал следующий метод расширения:
public static IObservable<T> WithHistory<T>(this IObservable<T> source)
{
var accumulator = new BlockingCollection<T>();
source.Subscribe(accumulator.Add);
return accumulator
.GetConsumingEnumerable()
.ToObservable()
.SubscribeOn(ThreadPoolScheduler.Instance);
}
Насколько я его тестировал,это работает:
class Generator<T>
{
event Action<T> onPush;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);
public void Push(T item) => onPush?.Invoke(item);
}
...
private static void Main()
{
var g = new Generator<int>();
var ongoingItems = g.Items;
var allItems = g.Items.WithHistory();
g.Push(1);
g.Push(2);
ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));
g.Push(3);
g.Push(4);
g.Push(5);
Console.ReadLine();
}
Результат:
Ongoing: got 3
Ongoing: got 4
Ongoing: got 5
WithHistory: got 1
WithHistory: got 2
WithHistory: got 3
WithHistory: got 4
WithHistory: got 5
Однако использование BlockingCollection<T>
представляется излишним.Кроме того, описанный выше метод не поддерживает завершение, обработку ошибок и может привести к взаимоблокировке без .SubscribeOn(ThreadPoolScheduler.Instance)
.
Есть ли лучший способ добиться этого без описанных недостатков?