Rx .NET взять первым и пропустить через временной интервал или условие - PullRequest
0 голосов
/ 28 августа 2018

Я изучаю rx в .NET, и у меня есть следующие требования:

  • Последовательность строк поступает из API. Они приходят через разные промежутки времени, которые я не знаю. Иногда в течение секунды поступают 5 строк, а в течение 5 секунд - только 1 строка.
  • Строки в основном состоят из пяти команд: «Пуск», «Стоп», «Влево», «Вправо», «Назад». Поступают другие команды, но они могут быть отфильтрованы.
  • Теперь программа должна выполняться всякий раз, когда поступает команда.
  • Если та же самая команда поступает в течение заданного периода времени (скажем, 2 секунды), команда должна выполняться только один раз . Если другая команда поступает в течение этого периода, она должна быть выполнена немедленно. Если та же команда, что и предыдущая, поступила через заданный промежуток времени, она должна быть выполнена.
  • Для входящих данных не назначена временная метка (но это может быть сделано при необходимости).

Итак, с учетом данных примера:

using System;
using System.Reactive.Linq;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var results = new[]
                {
                 "right", //1
                 "left", //2
                 "right", //3
                 "right", //4
                 "left", //5
                 "right", //6
                 "right", //7
                 "right", //8
                 "left" //9
                };

            var observable =  Observable.Generate(0, 
                x => x < results.Length, 
                x => x + 1,
                x => results[x], 
                x => 
                TimeSpan.FromSeconds(1)).Timestamp();

            observable.Subscribe(...);

            Console.ReadKey();
        }        
    }
}

Результат должен быть:

right //1
left //2
right //3
left //5
right //6
right //8
left //9

Строка 4 была пропущена, потому что ее только 1 секунда до последнего «правого», как и строка 7. Однако строка 8 не была пропущена, потому что есть строка 2 секунды для строки 6.

Возможные решения:

Я попытался использовать оконную функцию для пропуска записей, но это пропустит все строки, даже если они не имеют одно и то же значение:

observable.Publish(ps =>
           ps.Window(() => ps.Delay(TimeSpan.FromSeconds(2)))
             .SelectMany(x => x.Take(1))).Subscribe(f => Console.WriteLine(f.Value));

Я также попытался добавить временные метки к каждому значению и проверить их в DistinctUntilChanged () EqualityComparer, но, похоже, это также не работает должным образом.

Ответы [ 3 ]

0 голосов
/ 28 августа 2018

Я не проверял этот код, но вы получили общее представление.

        source
            .Select(x => (str: x, timestamp: DateTime.Now))
            .Scan((last: (str: "", timestamp: DateTime.MinValue), next: (str: "", timestamp: DateTime.MinValue)),
                (state, val) =>
                    (last: (str: state.next.str, timestamp: state.next.timestamp), next: (str: val.str, timestamp: val.timestamp))
                )
            .Where(x => (x.next.str != x.last.str) || (x.next.timestamp - x.last.timestamp) > TimeSpan.FromSeconds(2))
            .Select(x=>x.next.str);
0 голосов
/ 28 августа 2018

Это сложнее, чем я думал, из-за тройного случая (справа, справа, справа всего на одну секунду друг от друга). Использование прямой .Zip здесь не подойдет.

Это похоже на ответ Сентинеля и правильно обрабатывает тройной регистр:

source
    .Timestamp()        
    .Scan((state, item) => state == null || item.Timestamp - state.Timestamp > TimeSpan.FromSeconds(2) || state.Value != item.Value
        ? item
        : state
    )
    .DistinctUntilChanged()
    .Select(t => t.Value);

Пояснение:

  • .Timestamp() переносит каждое сообщение с отметкой времени, в которое оно приходит в
  • .Scan(1 arg) если появляется дубликат в течение 2 секунд, то он повторяет предыдущее сообщение, в противном случае выдает новое сообщение
  • .DistinctUntilChanged() удаляет дубликаты сообщений, что произойдет, потому что .Scan дважды отправляет старое сообщение
  • .Select удаляет отметку времени.
0 голосов
/ 28 августа 2018

Хм .. звучит как observable.DistinctUntilChanged для обнаружения различных событий, но объединяется через CombineLatest с observable.Debounce, чтобы также получить повторение через некоторое время ....

Это охватило бы основы, но я не уверен, что произойдет, если элемент, отличный от предыдущего, появится после "время дольше, чем отладка". Оба источника, операторы DistinctUntilChanged и Debounce, пропустят элемент "на в то же время ", и я не уверен, что будет делать CombineLatest на этом этапе. Есть изменение: вы получите такое событие дважды (одно и то же за очень короткий промежуток времени), поэтому вам потребуется еще раз его дедуплицировать.

Если вы хотите добавить какие-то временные метки, то есть и довольно явный способ сделать это, хотя я не уверен, действительно ли это проще ..

  • принимать исходные события - поток {command}
  • GroupBy тип элемента - поток подпотока {command}, каждый подпоток содержит ТОЛЬКО один тип команды
  • применить TimeInterval к каждому из этих подпотоков, вы получите поток подпотока {команда, время с момента последнего посещения}, каждый подпоток содержит ТОЛЬКО один тип команды
  • ОбъединениеПоследнее их всех, вы получаете поток {команда, время, с тех пор как ЭТОТ ТИП был последним увиденным}
  • преобразуйте его в {command, null-or-ObjectThatIsAlwaysDifferent} в зависимости от «времени», если время меньше периода удержания, то NULL, если время больше удержания, используйте некоторое магическое значение, которое IsAlwaysDifferent
  • DistinctUntilCanged, что

Вы должны иметь возможность реализовать магический ObjectThatIsAlwaysDifferent, просто сделав класс gethashcode равным 0 и равным false.

Это должно возвращать события, которые имеют команду, отличную от предыдущей, или те же, что и раньше, но произошли спустя время, превышающее удержание.

Теперь подумав об этом, можно сделать это очень просто, запечатав текущее и предыдущее значение:

  • взять исходный поток {command}
  • добавить отметки времени - {команда, отметка времени}
  • задержка на 1, помните источник и задержку
  • сжать их вместе, теперь вы получаете поток [{command, timestamp}, {prevcommand, prevtimestamp}]
  • отфильтруйте его по коду:
    • пройти когда команда! = Prevcommand
    • пройти, когда команда == prevcommand && timestamp-prevtimestamp> holdoff

и это должно быть. Как обычно, несколько способов сделать одно и то же.

...