Как использовать новый BufferWithTimeOrCount в Rx, который возвращает IObservable <IObservable <T>> вместо IObservable <IList <T>> - PullRequest
2 голосов
/ 30 января 2011

В Windows Phone 7 существует новая версия метода расширения BufferWithTimeOrCount для IObservable, который возвращает «поток потоков» вместо предыдущего «потока списков».У меня возникают трудности при попытке использовать новые или старые методы, поэтому, может быть, я просто не понимаю, как это работает, но моя цель - создать поток, который запускается только тогда, когда существующий поток соответствует указанному шаблону на основе времени во времяпредыдущие 2 сенсорных события.До сих пор я создал потоки для TouchUp и TouchDown (см. связанный вопрос ) и в псевдокоде я хочу что-то вроде:

//BufferLast2 should contain the last 1 or 2 touch events that occurred in the last 500ms. If no touches occurred this should return an empty set
var BufferLast2 = TouchDown.Merge(TouchUp).BufferWithTimeOrCount(TimeSpan.FromSeconds(0.5), 2);
//Use BufferLast2 to detect tap (TouchDown then TouchUp occuring in less than 0.5s)
var TouchTap = from touch2 in BufferLast2
               where touch2.Count == 2 && touch2.First().Action == TouchAction.Down && touch2.Last().Action == TouchAction.Up
               select touch2.First(); //returns initial TouchDown event
//Use BufferLast2 to detect Hold (TouchDown with no TouchUp occuring in 0.5s)
var TouchHold = from touch2 in BufferLast2
                where touch2.Count == 1 && touch2.First().Action == TouchAction.Down
                select touch2.First(); //returns initial TouchDown event

При использовании «Стабильной» Microsoft.Phone.Reactive версииRx, встроенный в ПЗУ, вызывающий IObservable<Class>.BufferWithTimeOrCount(...), возвращает IObservable<IList<Class>>, с которым довольно легко работать с использованием стандартных операторов списка (как описано выше), но по какой-то причине BufferLast2 всегда возвращал два события down вместо Down-> Последовательность, которую я ожидал.

Я подумал, что это может быть ошибка в коде, поэтому я попытался добавить ссылку на последнюю версию Rx и использовал Observable Extensions from C: \ Program Files (x86) \ Программируемость Microsoft Cloud \ Reactive Extensions \ v1.0.2838.0 \ WP7 \ System.Reactive.dll , в которой BufferWithTimeOrCount(...) возвращает IObservable<IObservable<Class>>.Это затрудняет написание простых фильтров, таких как Where x.Count == 2 или Where x.First().P == ....На самом деле я не понял, как сделать простой фильтр, такой как x.Count() == 2, для этого возвращаемого значения без создания совершенно отдельной подписки или объекта Subject, который выглядит слишком сложным.Вероятно, это простая ошибка, как и мой последний вопрос (все, что мне было нужно, это предложение Where :-P), но это действительно сводит меня с ума.Любая помощь?

Ответы [ 2 ]

3 голосов
/ 12 февраля 2011

Последний выпуск Rx, v1.0.2856.0 , предоставляет как буферы, так и окна.Для буферов мы восстановили оригинальные подписи на основе IList.Соответствующие оконные операторы будут возвращать вложенные наблюдаемые последовательности.

Способ, которым реализованы операторы Buffer *, состоит в составлении соответствующего оператора Window * с новым методом расширения ToList, который переводит IObservable в IObservable>.Все, что делает оператор Buffer *, это вызывает этот новый оператор ToList в селекторе SelectMany.

3 голосов
/ 30 января 2011

Изменение API делает буферизацию более Rx-y и вписывается в их реализацию оператора Window (не удивлюсь, если с помощью отражателя вы сможете увидеть операторы Buffer, используя Window). Я думаю, что есть множество причин, по которым они изменили это. Я не собираюсь их угадывать, потому что они намного умнее меня!

Итак, вот мое решение. Возможно, есть более чистый способ получить то, что вам нужно, но я бы, вероятно, реализовал свой собственный метод расширения для буферизации в списке. Может быть что-то вроде:

public static class BufferToList
{
   public static IObservable<IEnumerable<TSource>> BufferToList<TSource>(this IObservable<TSource> source)
    {
       return Observable.CreateWithDisposable<IEnumerable<TSource>>(observer => 
          {
             var list = new List<TSource>();

             return source.Subscribe(list.Add,
               observer.OnError,
               () =>
               {
                  observer.OnNext(list);
                  observer.OnCompleted();
               });
          });
    }
}

Тогда что-то вроде:

TouchDown.Merge(TouchUp)
   .BufferWithTimeOrCount(TimeSpan.FromSeconds(0.5), 2)
   .Select(bufferedValues => bufferedValues.BufferToList())
   .Subscribe(OnBufferOpen)

private void OnBufferOpen(IObservable<IEnumerable<IEvent<IEventArgs>>> bufferedListAsync)
{
   bufferedListAsync.Where(list => list.Count() == 2);
}

Я предлагаю, если вы хотите получить полное объяснение того, почему они изменили API, зайдите и задайте вопрос на форумах rx на msdn

...