Если вы ищете реактивное решение, всегда проще, если вы смоделируете свои входные данные как наблюдаемые. В данном случае:
var i1 = new BehaviorSubject<int>(1);
var i2 = new BehaviorSubject<int>(4);
var sum = i1.CombineLatest(i2, (i1Value, i2Value) => i1Value + i2Value);
Action syncChange1 = () =>
{
i1.OnNext(2);
i2.OnNext(5);
i1.OnNext(7);
};
Action syncChange2 = () =>
{
i1.OnNext(1);
i2.OnNext(1);
};
Action syncChange3 = () =>
{
i1.OnNext(3);
i1.OnCompleted();
i2.OnCompleted();
};
IObservable<Action> actions = new Action[] { syncChange1, syncChange2, syncChange3 }.ToObservable();
То же, что и вопрос, просто мы структурируем наши Действия как наблюдаемую серию изменений. Теперь могут возникнуть маги c:
var openWindow = new Subject<int>();
var closeWindow = new Subject<int>();
var gatedActions = actions
.Select((a, i) => new Action(() => {
openWindow.OnNext(i);
a();
closeWindow.OnNext(i);
}));
Теперь мы определили windows, который можно легко передать в .Buffer()
или .Window()
.
// alternative to window. Not used.
var buffer = sum.Buffer(openWindow, i => closeWindow.Where(cwi => cwi == i));
var listAsync = sum
.Window(openWindow, i => closeWindow.Where(cwi => cwi == i))
.SelectMany(w => w.TakeLast(1))
.ToList()
.RunAsync(new CancellationToken());
gatedActions.Subscribe(a => a(), () => { openWindow.OnCompleted(); closeWindow.OnCompleted(); });
var list = await listAsync; //output is {12, 2, 4}. The starting 5 can be worked in with a .Merge() or something.