Метод расширения SynchronousBuffer для IObservable <T> - PullRequest
0 голосов
/ 06 марта 2020

Мне нужен метод Buffer, который не буферизуется ни по времени, ни по определенным условиям. Он должен вести себя подобно методу моментального снимка:

Создание снимка ReplaySubject buffer

Однако он не должен делать один снимок, он должен буферизоваться при синхронном изменения происходят и предоставляют их как IObservable .

Я думаю, что должно быть почти простое решение, как этот метод Snapshot, но я не могу понять, как на самом деле решить эту проблему. (Примечание: метод моментального снимка также хорошо работает для запросов по нескольким предметам)

Вот метод теста:

    [TestMethod]
    public async Task SyncBufferTest()
    {
        var i1 = new BehaviorSubject<int>(1);
        var i2 = new BehaviorSubject<int>(4);
        var sum = i1.CombineLatest(i2, (i1Value, i2Value) => i1Value + i2Value);
        var listAsync = sum.SynchronousBuffer().Select(buf => buf.Last()).ToList().RunAsync(new CancellationToken());

        Action syncChange1 = () =>
        {
            i1.OnNext(2);
            i2.OnNext(5);
            i1.OnNext(7);
        };

        Action syncChange2 = () =>
        {
            i1.OnNext(1);
            i2.OnNext(1);
        };

        Action syncChange3 = () =>
        {
            i1.OnNext(3);
            i1.OnCompleted();
            i2.OnCompleted();
        };


        Task.Run(syncChange1)
            .ContinueWith(t => syncChange2())
            .ContinueWith(t => syncChange3());

        var list = await listAsync;

        CollectionAssert.AreEqual(new List<int> { 5, 12, 2, 4 }, list.ToList());
    }

Справочная информация:

Я работаю над концепцией архитектуры с уровнем реактивных данных в качестве основы приложения. Весь слой данных состоит из субъектов (как «говорящий» слой данных). В одной транзакции несколько из этих предметов меняются. У меня есть много наблюдаемых в более высоком уровне моего приложения, которые являются запросами к нескольким из этих тем. Поэтому мне нужен этот SynchronousBuffer для обработки синхронных изменений всех этих объектов во всех этих запросах, чтобы не получать уведомления несколько раз.

Ответы [ 2 ]

1 голос
/ 06 марта 2020

Если вы ищете реактивное решение, всегда проще, если вы смоделируете свои входные данные как наблюдаемые. В данном случае:

var i1 = new BehaviorSubject<int>(1);
var i2 = new BehaviorSubject<int>(4);
var sum = i1.CombineLatest(i2, (i1Value, i2Value) => i1Value + i2Value);
Action syncChange1 = () =>
{
    i1.OnNext(2);
    i2.OnNext(5);
    i1.OnNext(7);
};

Action syncChange2 = () =>
{
    i1.OnNext(1);
    i2.OnNext(1);
};

Action syncChange3 = () =>
{
    i1.OnNext(3);
    i1.OnCompleted();
    i2.OnCompleted();
};
IObservable<Action> actions = new Action[] { syncChange1, syncChange2, syncChange3 }.ToObservable();

То же, что и вопрос, просто мы структурируем наши Действия как наблюдаемую серию изменений. Теперь могут возникнуть маги c:

var openWindow = new Subject<int>();
var closeWindow = new Subject<int>();
var gatedActions = actions
    .Select((a, i) => new Action(() => {
        openWindow.OnNext(i);
        a();
        closeWindow.OnNext(i);
    }));

Теперь мы определили windows, который можно легко передать в .Buffer() или .Window().

// alternative to window. Not used.
var buffer = sum.Buffer(openWindow, i => closeWindow.Where(cwi => cwi == i)); 

var listAsync = sum
    .Window(openWindow, i => closeWindow.Where(cwi => cwi == i))
    .SelectMany(w => w.TakeLast(1))
    .ToList()
    .RunAsync(new CancellationToken());

gatedActions.Subscribe(a => a(), () => { openWindow.OnCompleted(); closeWindow.OnCompleted(); });

var list = await listAsync; //output is {12, 2, 4}. The starting 5 can be worked in with a .Merge() or something.
0 голосов
/ 06 марта 2020

Другой подход состоит в том, чтобы попытаться определить временное окно, в пределах которого вы считаете изменения синхронными:

var synchronounsWindow = TimeSpan.FromMilliseconds(100);
var actions = new Action[] {syncChange1, syncChange2, syncChange3};
IObservable<Unit> allChanges = Observable.Merge(
    i1.Select(_ => Unit.Default), 
    i2.Select(_ => Unit.Default)
);

Когда у нас есть временное окно, вы можете применять те же методы управления окнами / буферизации, что и другие ответ.

var buffer = sum.Buffer(allChanges.Throttle(synchronounsWindow)); //alternative to window if you like

IList<int> list = null;
var listAsync = sum
    .Window(allChanges.Throttle(synchronounsWindow))
    .SelectMany(w => w.TakeLast(1))
    .ToList()
    .Subscribe(l => { list = l;});

foreach (var a in actions)
{
    a();
    await Task.Delay(synchronounsWindow);
}
CollectionAssert.AreEqual(new List<int> { 12, 2, 4 }, list.ToList()); // again, skipping 5
...