Буферизация текущих элементов, сгенерированных Observable.FromEvent - PullRequest
0 голосов
/ 19 декабря 2018

Моя цель заключается в том, чтобы спулировать все элементы / уведомления, начиная с IObservable<T>, для будущих подписчиков.

Например, если кто-то подписывается на поток сообщений, он сначала получает все сообщения, которые были получены до подписки.Затем он начинает получать новые сообщения, когда они есть.Это должно происходить незаметно, без повторов и потерь на «границе» между старыми и новыми сообщениями.

Я придумал следующий метод расширения:

public static IObservable<T> WithHistory<T>(this IObservable<T> source)
{
    var accumulator = new BlockingCollection<T>();

    source.Subscribe(accumulator.Add);

    return accumulator
        .GetConsumingEnumerable()
        .ToObservable()
        .SubscribeOn(ThreadPoolScheduler.Instance);
}

Насколько я его тестировал,это работает:

class Generator<T>
{
    event Action<T> onPush;

    public IObservable<T> Items =>
        Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);

    public void Push(T item) => onPush?.Invoke(item);
}

...

private static void Main()
{
    var g = new Generator<int>();
    var ongoingItems = g.Items;
    var allItems = g.Items.WithHistory();

    g.Push(1);
    g.Push(2);

    ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
    allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));

    g.Push(3);
    g.Push(4);
    g.Push(5);

    Console.ReadLine();
}

Результат:

Ongoing: got 3
Ongoing: got 4
Ongoing: got 5
WithHistory: got 1
WithHistory: got 2
WithHistory: got 3
WithHistory: got 4
WithHistory: got 5

Однако использование BlockingCollection<T> представляется излишним.Кроме того, описанный выше метод не поддерживает завершение, обработку ошибок и может привести к взаимоблокировке без .SubscribeOn(ThreadPoolScheduler.Instance).

Есть ли лучший способ добиться этого без описанных недостатков?

1 Ответ

0 голосов
/ 19 декабря 2018

Лучший способ сделать это с помощью .Replay()

void Main()
{
    var g = new Generator<int>();
    var ongoingItems = g.Items;
    var allItems = g.Items.Replay().RefCount();

    using(var tempSubscriber = allItems.Subscribe())
    {
        g.Push(1);
        g.Push(2);

        ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
        allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));

        g.Push(3);
        g.Push(4);
        g.Push(5);

        Console.ReadLine();
    }
}

.Replay().RefCount() создает наблюдаемое, которое будет хранить внутреннюю очередь для воспроизведения, пока есть подписчик.Однако, если у вас есть постоянный подписчик (как ваше решение использует метод WithHistory), у вас есть утечка памяти.Лучший способ обойти это - иметь временного подписчика, который автоматически отключается после того, как вас больше не интересует история.

...