Буфер по времени или промежуточной сумме для реактивных расширений - PullRequest
0 голосов
/ 05 ноября 2018

Я довольно новичок в Reactive Extensions и хочу буферизовать поток на основе времени или текущей суммы, не превышающей пороговое значение (размер каждого элемента определяется лямбда-выражением), в зависимости от того, что произойдет первым, во многом как существующий буфер по счету или времени.

В настоящее время я написал свою собственную реализацию метода Buffer, который работает как положено, используя IScheduler для запуска по таймауту, а затем управляю своими собственными буферами в памяти и генерирую их всякий раз, когда накопленная сумма превышает пороговое значение, но это выглядит как немного низкий уровень, и я подумал, что должно быть более элегантное решение, чтобы выразить его, используя каким-либо образом существующие реактивные операции и, возможно, вместо этого использовать перегрузку буфера TBufferClosing.

Лучшее решение, которое я нашел до сих пор, заключается в следующем, но у него есть недостаток, заключающийся в том, что он включает последний элемент, вызвавший пороговое значение, в результате чего сумма превышает максимальную запрошенную сумму:

    public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan) 
    {
        var shared = source.Publish().RefCount();

        return shared.Buffer(() => Observable.Amb(
            Observable.Timer(timeSpan)
                .Select(_ => Unit.Default),
            shared.Select(sizeSelector)
                .Scan((a, b) => a + b)
                .SkipWhile(accumulated => accumulated < maxSize)
                .Select(_ => Unit.Default))
            );
    }

Возможно ли это заставить работать с существующими операторами (путем настройки моей версии выше или другим способом полностью), или я вынужден остаться с моей собственной реализацией буфера, обрабатывающей таймеры и буферизировать себя?

1 Ответ

0 голосов
/ 07 ноября 2018

ОК, это должно работать. Поздние ответы лучше, чем никогда. Я не думаю, что есть способ сделать это лучше, чем вы, используя операторы Buffer.

По сути, проблема - это проблема конечного автомата, что означает, что вам нужно решение Scan. Проблема в том, что у вас есть два разных источника, которые могут изменить ваше состояние: новый элемент и время ожидания. Scan на самом деле не работает с двумя несколькими источниками, поэтому нам нужно как-то объединить эти два типа событий в один.

Я делал нечто подобное раньше с Дискриминационными союзами , и эта концепция должна работать здесь. Первое решение (использует пакет Nuget System.Collections.Immutable):

public static class X
{
    public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, Func<TSource, int> sizeSelector, int maxSize, TimeSpan bufferTimeSpan)
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); //our time-out mechanism

        return source
            .Union(queue.Delay(bufferTimeSpan))
            .ScanUnion(
                (list: ImmutableList<TSource>.Empty, size: 0, emitValue: (ImmutableList<TSource>)null),
                (state, item) => { // item handler
                    var itemSize = sizeSelector(item);
                    var newSize = state.size + itemSize;
                    if (newSize > maxSize)
                    {
                        queue.OnNext(Unit.Default);
                        return (ImmutableList<TSource>.Empty.Add(item), itemSize, state.list);
                    }
                    else
                        return (state.list.Add(item), newSize, null);
                },
                (state, _) => { // time out handler
                    queue.OnNext(Unit.Default); 
                    return (ImmutableList<TSource>.Empty, 0, state.list); 
                }
            )
            .Where(t => t.emitValue != null)
            .Select(t => t.emitValue.ToList());
    }
}

Объяснение: Union объединяет два потока разных типов в один поток, где элемент может быть типа A или типа B. ScanUnion работает так же, как Scan, но предлагает две функции для обработки двух разных типы предметов.

BehaviorSubject вызывается всякий раз, когда открывается новое окно буфера, оператор Delay проверяет, получает ли Scan его после заданного промежутка времени. Состояние внутри Scan содержит список текущих буферных элементов и размер. emitValue используется, когда окно буфера закрывается, и для передачи значений.

Вот код помощника Дискриминационного Союза:

public static class DUnionExtensions
{
    public class DUnion<T1, T2>
    {
        public DUnion(T1 t1)
        {
            Type1Item = t1;
            Type2Item = default(T2);
            IsType1 = true;
        }

        public DUnion(T2 t2, bool ignored) //extra parameter to disambiguate in case T1 == T2
        {
            Type2Item = t2;
            Type1Item = default(T1);
            IsType1 = false;
        }

        public bool IsType1 { get; }
        public bool IsType2 => !IsType1;

        public T1 Type1Item { get; }
        public T2 Type2Item { get; }
    }

    public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return a.Select(x => new DUnion<T1, T2>(x))
            .Merge(b.Select(x => new DUnion<T1, T2>(x, false)));
    }

    public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
            TState initialState,
            Func<TState, T1, TState> type1Handler,
            Func<TState, T2, TState> type2Handler)
    {
        return source.Scan(initialState, (state, u) => u.IsType1
            ? type1Handler(state, u.Type1Item)
            : type2Handler(state, u.Type2Item)
        );
    }
}
...