Как реализовать буферизацию с таймаутом в RX - PullRequest
4 голосов
/ 11 января 2011

Мне нужно реализовать обработку событий, которая выполняется с задержкой, когда в течение определенного периода не поступают новые события.(Я должен поставить в очередь задачу разбора, когда текстовый буфер изменился, но я не хочу начинать разбор, когда пользователь все еще печатает.)

Я новичок в RX, но насколькоЯ вижу, что мне нужно сочетание методов BufferWithTime и Timeout.Я предполагаю, что это работает следующим образом: он буферизует события до тех пор, пока они не будут получены регулярно в течение определенного периода времени между последующими событиями.Если в потоке событий есть промежуток (более длительный, чем временной интервал), он должен вернуть распространяемые до сих пор события.

Взглянув на то, как реализованы функции Buffer и Timeout, я, возможно, мог бы реализовать свой метод BufferWithTimeout (если у всех есть такой, пожалуйста, поделитесь со мной), но мне интересно, можно ли этого достичь, просто комбинируя существующие методы,Есть идеи?

Ответы [ 4 ]

11 голосов
/ 05 апреля 2014

Это довольно старый вопрос, но я полагаю, что стоит упомянуть следующий ответ, поскольку все другие решения вынуждают пользователя подписываться вручную, отслеживать изменения и т. Д.

В качестве решения "Rx-y" я предлагаю следующее.

var buffers = source
    .GroupByUntil(
        // yes. yes. all items belong to the same group.
        x => true,
        g => Observable.Amb<int>(
               // close the group after 5 seconds of inactivity
               g.Throttle(TimeSpan.FromSeconds(5)),
               // close the group after 10 items
               g.Skip(9)
             ))
    // Turn those groups into buffers
    .SelectMany(x => x.ToArray());

По сути, источник является оконным, пока некоторая наблюдаемая не будет определена в терминах самого нового окна. Создается новое окно (сгруппированное наблюдаемое), и мы используем это окно, чтобы определить, когда окно должно закрыться. В этом случае я закрываю окно после 5 секунд бездействия или максимальной длины 10 (9 + 1).

3 голосов
/ 11 января 2011

Я думаю, BufferWithTime - это то, что вам нужно.

Ничего не встроено, но что-то вроде этого должно работать:

Примечание: Если ошибкапроисходит из источника, буфер не сбрасывается.Это соответствует текущей (или текущей последней проверенной мной) функциональности BufferWith*

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout)
{
    return source.BufferWithTimeout(timeout, Scheduler.TaskPool);
}

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler)
{
    return Observable.CreateWithDisposable<TSource[]>(observer =>
    {
        object lockObject = new object();
        List<TSource> buffer = new List<TSource>();

        MutableDisposable timeoutDisposable = new MutableDisposable();

        Action flushBuffer = () =>
        {
            TSource[] values;

            lock(lockObject)
            {
                values = buffer.ToArray();
                buffer.Clear();
            }

            observer.OnNext(values);
        };

        var sourceSubscription = source.Subscribe(
            value =>
            {
                lock(lockObject)
                {
                    buffer.Add(value);
                }

                timeoutDisposable.Disposable = 
                    scheduler.Schedule(flushBuffer, timeout);
            },
            observer.OnError,
            () =>
            {
                flushBuffer();
                observer.OnCompleted();
            });

        return new CompositeDisposable(sourceSubscription, timeoutDisposable);
    });
}
2 голосов
/ 13 января 2011

В дополнение к ответу Ричарда Сзалая я только что посмотрел на нового оператора Window из последней версии rx.Это «своего рода» решает вашу проблему в том смысле, что вы можете «буферизовать с тайм-аутом», то есть получать выходные данные в течение времени, которое длится до истечения времени ожидания, но вместо того, чтобы получать результаты в виде IEnumerable, вы фактически получаете ихкак IObservable.

Вот краткий пример того, что я имею в виду:

private void SetupStream()
{
    var inputStream = Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
        h => new MouseButtonEventHandler(h), 
        h => MouseDown += h,
        h => MouseDown -= h);

    var timeout = inputStream.Select(evt => Observable.Timer(TimeSpan.FromSeconds(10), Scheduler.Dispatcher))
        .Switch();

    inputStream.Window(() => timeout)
        .Subscribe(OnWindowOpen);
}


private void OnWindowOpen(IObservable<IEvent<MouseButtonEventArgs>> window)
{
    Trace.WriteLine(string.Format("Window open"));

    var buffer = new List<IEvent<MouseButtonEventArgs>>();

    window.Subscribe(click =>
    {

        Trace.WriteLine(string.Format("Click"));

        buffer.Add(click);

    }, () => ProcessEvents(buffer));
}

private void ProcessEvents(IEnumerable<IEvent<MouseButtonEventArgs>> clicks)
{
    Trace.WriteLine(string.Format("Window closed"));

    //...
}

Каждый раз, когда открывается окно, вы получаете все события по мере их поступления, сохраняете их в буфере и обрабатываете, когдаокно завершается (что на самом деле происходит, когда открывается следующее окно).

Не уверен, что Ричард изменит свой пример, чтобы использовать Window, теперь он доступен, но подумал, что в качестве альтернативы его стоит поднять.

1 голос
/ 11 января 2011

Если вам просто нужно запустить операцию, когда пользователь перестает печатать на определенное время, и необязательно нужны промежуточные события, тогда Throttle - это операция, которую вы выполняете. Проверьте здесь для примера его использования в этом сценарии.

...