В Rx, как сгруппировать последние элементы после определенного периода времени? - PullRequest
7 голосов
/ 03 апреля 2012

Извините, если название не очень ясное, я не могу придумать ничего лучшего ...

Я получаю пользовательский ввод в виде IObservable<char>, и я хотел бы преобразовать его в IObservable<char[]>, группируя символы каждый раз, когда пользователь перестает печатать в течение более 1 секунды. Так, например, если ввод выглядит следующим образом:

h
e
l
l
o
(pause)
w
o
r
l
d
(pause)
!
(pause)

Я бы хотел, чтобы результат был:

['h', 'e', 'l', 'l', 'o']
['w', 'o', 'r', 'l', 'd']
['!']

Я подозреваю, что решение довольно простое, но я не могу найти правильный подход ... Я пытался использовать Buffer, GroupByUntil, Throttle и некоторые другие, но безрезультатно.

Любая идея приветствуется!


РЕДАКТИРОВАТЬ: У меня есть кое-что, что почти работает:

        _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1)))
              .ObserveOnDispatcher()
              .Subscribe(OnCompleteInput);

Но мне нужно сбрасывать задержку каждый раз, когда набирается новый символ ...

Ответы [ 4 ]

7 голосов
/ 03 апреля 2012

Buffer и Throttle будет достаточно, если ваш источник горячий. Чтобы сделать его горячим, вы можете использовать .Publish().RefCount(), чтобы обеспечить только одну подписку на источник.

IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source,
                                              TimeSpan dueTime)
{
    if (source == null) throw new ArgumentNullException("source");
    //defer dueTime checking to Throttle
    var hot = source.Publish().RefCount();
    return hot.Buffer(() => hot.Throttle(dueTime));
}
0 голосов
/ 03 апреля 2012

Я написал расширение некоторое время назад, чтобы сделать то, что вы ищете - BufferWithInactivity.

Вот оно:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source,
    TimeSpan inactivity,
    int maximumBufferSize)
{
    return Observable.Create<IEnumerable<T>>(o =>
    {
        var gate = new object();
        var buffer = new List<T>();
        var mutable = new SerialDisposable();
        var subscription = (IDisposable)null;
        var scheduler = Scheduler.ThreadPool;

        Action dump = () =>
        {
            var bts = buffer.ToArray();
            buffer = new List<T>();
            if (o != null)
            {
                o.OnNext(bts);
            }
        };

        Action dispose = () =>
        {
            if (subscription != null)
            {
                subscription.Dispose();
            }
            mutable.Dispose();
        };

        Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
            onAction =>
            {
                lock (gate)
                {
                    dispose();
                    dump();
                    if (o != null)
                    {
                        onAction(o);
                    }
                }
            };

        Action<Exception> onError = ex =>
            onErrorOrCompleted(x => x.OnError(ex));

        Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());

        Action<T> onNext = t =>
        {
            lock (gate)
            {
                buffer.Add(t);
                if (buffer.Count == maximumBufferSize)
                {
                    dump();
                    mutable.Disposable = Disposable.Empty;
                }
                else
                {
                    mutable.Disposable = scheduler.Schedule(inactivity, () =>
                    {
                        lock (gate)
                        {
                            dump();
                        }
                    });
                }
            }
        };

        subscription =
            source
                .ObserveOn(scheduler)
                .Subscribe(onNext, onError, onCompleted);

        return () =>
        {
            lock (gate)
            {
                o = null;
                dispose();
            }
        };
    });
}
0 голосов
/ 03 апреля 2012

Это должно сработать. Это не так кратко, как ваше решение, так как реализует логику через класс вместо методов расширения, но это может быть лучшим способом сделать это. Короче говоря: каждый раз, когда вы получаете char, добавьте его к List и (пере) запустите таймер, который истекает через одну секунду; когда таймер истекает, сообщите нашим подписчикам List в виде массива и сбросьте состояние, чтобы оно было готово для следующего раза.

    class Breaker : IObservable<char[]>, IObserver<char>
    {
        List<IObserver<char[]>> observers = new List<IObserver<char[]>>();
        List<char> currentChars;
        DispatcherTimer t;
        public Breaker(IObservable<char> source)
        {
            source.Subscribe(this);
            t = new DispatcherTimer { Interval = new TimeSpan(0, 0, 1) };
            t.Tick += TimerOver;
            currentChars = new List<char>();
        }
        public IDisposable Subscribe(IObserver<char[]> observer)
        {
            observers.Add(observer);
            return null; //TODO return a useful IDisposable
        }
        public void OnCompleted()
        {
            //TODO implement completion logic
        }
        public void OnError(Exception e)
        {
            //TODO implement error logic
        }
        public void OnNext(char value)
        {
            currentChars.Add(value);
            t.Start();
        }
        void TimerOver(object sender, EventArgs e)
        {
            char[] chars = currentChars.ToArray();
            foreach (var obs in observers)
                obs.OnNext(chars);
            currentChars.Clear();
            t.Stop();
        }
    }
0 голосов
/ 03 апреля 2012

ОК, я нашел решение:

        Func<IObservable<char>> bufferClosingSelector =
            () =>
            _input.Timeout(TimeSpan.FromSeconds(1))
                  .Catch(Observable.Return('\0'))
                  .Where(i => i == '\0');
        _input.Buffer(bufferClosingSelector)
              .ObserveOnDispatcher()
              .Subscribe(OnCompleteInput);

По сути, bufferClosingSelector что-то выдвигает всякий раз, когда происходит тайм-аут, который закрывает текущий буфер. Вероятно, есть более простой и элегантный способ, но он работает ... Я открыт для лучших предложений;)

...