ОК, это должно работать. Поздние ответы лучше, чем никогда. Я не думаю, что есть способ сделать это лучше, чем вы, используя операторы Buffer
.
По сути, проблема - это проблема конечного автомата, что означает, что вам нужно решение Scan
. Проблема в том, что у вас есть два разных источника, которые могут изменить ваше состояние: новый элемент и время ожидания. Scan
на самом деле не работает с двумя несколькими источниками, поэтому нам нужно как-то объединить эти два типа событий в один.
Я делал нечто подобное раньше с Дискриминационными союзами , и эта концепция должна работать здесь. Первое решение (использует пакет Nuget System.Collections.Immutable
):
public static class X
{
public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); //our time-out mechanism
return source
.Union(queue.Delay(bufferTimeSpan))
.ScanUnion(
(list: ImmutableList<TSource>.Empty, size: 0, emitValue: (ImmutableList<TSource>)null),
(state, item) => { // item handler
var itemSize = sizeSelector(item);
var newSize = state.size + itemSize;
if (newSize > maxSize)
{
queue.OnNext(Unit.Default);
return (ImmutableList<TSource>.Empty.Add(item), itemSize, state.list);
}
else
return (state.list.Add(item), newSize, null);
},
(state, _) => { // time out handler
queue.OnNext(Unit.Default);
return (ImmutableList<TSource>.Empty, 0, state.list);
}
)
.Where(t => t.emitValue != null)
.Select(t => t.emitValue.ToList());
}
}
Объяснение: Union
объединяет два потока разных типов в один поток, где элемент может быть типа A или типа B. ScanUnion
работает так же, как Scan
, но предлагает две функции для обработки двух разных типы предметов.
BehaviorSubject
вызывается всякий раз, когда открывается новое окно буфера, оператор Delay
проверяет, получает ли Scan
его после заданного промежутка времени. Состояние внутри Scan
содержит список текущих буферных элементов и размер. emitValue
используется, когда окно буфера закрывается, и для передачи значений.
Вот код помощника Дискриминационного Союза:
public static class DUnionExtensions
{
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}
public DUnion(T2 t2, bool ignored) //extra parameter to disambiguate in case T1 == T2
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}
public bool IsType1 { get; }
public bool IsType2 => !IsType1;
public T1 Type1Item { get; }
public T2 Type2Item { get; }
}
public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
{
return a.Select(x => new DUnion<T1, T2>(x))
.Merge(b.Select(x => new DUnion<T1, T2>(x, false)));
}
public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
TState initialState,
Func<TState, T1, TState> type1Handler,
Func<TState, T2, TState> type2Handler)
{
return source.Scan(initialState, (state, u) => u.IsType1
? type1Handler(state, u.Type1Item)
: type2Handler(state, u.Type2Item)
);
}
}