Полная наблюдаемая последовательность после «длинного» интервала - PullRequest
0 голосов
/ 01 февраля 2019

Следующая наблюдаемая последовательность добавляет каждый элемент в ReplaySubject, так что я могу получить доступ к любым элементам позже и даже ожидать завершения ReplaySubject.Он завершает ReaplySubject после достижения временного промежутка.

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

Я бы хотел, чтобы последовательность выполнялась до тех пор, пока не прошло определенное время с момента последнего вызова OnNext, а затем завершена.Это будет очень полезно в различных сценариях связи Bluetooth, когда устройство Bluetooth выдаст мне последовательность данных, а затем просто остановится без какого-либо сообщения о завершении или события.В этих сценариях мне нужно эвристически определить, когда будет выполнена последовательность, а затем завершить ее самостоятельно.Итак, если это было «слишком долго» с момента последнего уведомления Bluetooth, я хотел бы завершить ReplaySubject.

Я мог бы сделать это, создав таймер, сбросив его при получении каждого элемента, и затем завершив ReplaySubject, когда таймер достиг «слишком длинного», но я слышал, что создание объекта и манипулирование им изв рамках наблюдаемой подписки не является потокобезопасным.

Любое предложение о том, как я могу завершить последовательность после "слишком длинного" интервала?

Вот версия, которая не является поточно-ориентированной по сравнению с тем, что я слышал, но должна работать как задумано:

bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
    reading = false;
};

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeWhile(x => reading)
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            timer.Stop();
            timer.Start();

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

это кажется удовлетворительным, исходя из первого ответа Симонаре:

                characteristic.WhenNotificationReceived()
                .Timeout(TimeSpan.FromSeconds(1))
                .Subscribe(
                    onNext: result =>
                    {
                        string nextTag = BitConverter.ToString(result.Data);
                        nextTag = nextTag.Replace("-", "");

                        rfidPlayer.OnNext(nextTag);
                    },
                    onError: error =>
                    {
                        rfidPlayer.OnCompleted();
                    });

Ответы [ 2 ]

0 голосов
/ 05 февраля 2019

Мне кажется неприятным использовать Timeout из-за исключений.

Я предпочитаю вводить значение в последовательность, которую я могу использовать для завершения последовательности.Если моя последовательность, например, выдает неотрицательные числа, то, если я введу -1, я знаю, чтобы завершить последовательность.

Вот пример:

Начните с этой наблюдаемой, которая генерирует силы2, начиная с 1, и также задерживает создание каждого значения на количество миллисекунд ожидающего значения.

Observable
    .Generate(1, x => true, x => 2 * x, x => x, x => TimeSpan.FromMilliseconds(x))

То есть 1, 2, 4, 8 и т. д., становится все медленнее и медленнее.

Теперь я хочу остановить эту последовательность, если нет значений для 3.0 секунд, тогда я могу сделать это:

    .Select(x => Observable.Timer(TimeSpan.FromSeconds(3.0)).Select(y => -1).StartWith(x))
    .Switch()
    .TakeWhile(x => x >= 0)

Если я запусту эту последовательность, я получу такой вывод:

1 
2 
4 
8 
16 
32 
64 
128 
256 
512 
1024 
2048 

Последовательность вот-вот выдаст 4096, но сначала она ждет 4096 миллисекунд, чтобы получить это значение - тем временем Observable.Timer(TimeSpan.FromSeconds(3.0)) запускает и выводит -1, таким образом останавливая последовательность.

Ключевой частью этого запроса является использование Switch.Он принимает IObservable<IObservable<T>> и создает IObservable<T>, только подписываясь на последнюю внешнюю наблюдаемую и отписываясь от предыдущей.

Итак, в моем запросе каждое новое значение, созданное последовательностью, останавливается и перезапускается.Timer.

В вашем случае ваша наблюдаемая будет выглядеть так:

characteristic
    .WhenNotificationReceived()
    .Select(result => BitConverter.ToString(result.Data).Replace("-", ""))
    .Select(x => Observable.Timer(TimeSpan.FromSeconds(1.0)).Select(y => (string)null).StartWith(x))
    .Switch()
    .TakeWhile(x => x != null)
    .Subscribe(rfidPlayer);
0 голосов
/ 01 февраля 2019

вы можете использовать Timeout Operator .Единственным недостатком этого является то, что он заканчивается сигналом ошибки.Вам может потребоваться обработать ложную ошибку

Оператор Timeout позволяет вам прервать Observable с завершением onError, если этот Observable не может излучать какие-либо элементы в течение указанного промежутка времени.

Если вы используете подход, описанный ниже, вы можете превзойти ошибку

 .Timeout(200, Promise.resolve(42));

Другой вариант позволяет указать тайм-аут переключиться на указанную вами резервную копию Observable, а не завершаться с ошибкой, еслиусловие тайм-аута срабатывает.


characteristic.WhenNotificationReceived()
    .Timeout(TimeSpan.FromSeconds(1))
    .Subscribe(
        onNext: result =>
        {
            ....
            rfidPlayer.OnNext(....);
        },
        onError: error =>
        {
            rfidPlayer.OnCompleted();
        });
...