Как подписаться на IObservable Sequence, принудительно завершить и получить все данные без условий гонки - PullRequest
0 голосов
/ 31 января 2019

Есть шаблон, с которым у меня проблемы при работе с наблюдаемыми.

Я работаю с устройством Bluetooth.

  1. Я отправляю на это устройство сообщение с просьбой что-то сделать и уведомляю меня о результате или результатах.
  2. Устройство начинает отправлять уведомления (может продолжаться 10 мс или 20 с)
  3. Я жду, пока устройство завершит отправку уведомлений.иногда это будет конкретное сообщение от устройства, а иногда я просто не получу больше сообщений в течение периода ожидания.
  4. Я преобразую сообщения в один элемент или IEnumerable и продолжаю в том же духе.

Пример первый:

  1. Я ввожу команду входа в систему с сообщением о входе в систему и паролем
  2. Устройство отправляет обратно сообщение об успехе или сбое (обычно10 мс или около того)
  3. Я жду, когда придет сообщение
  4. Я использую это сообщение, чтобы сообщить, может ли пользователь продолжить или ему нужно повторить свой пароль.

Пример два:

  1. Я посылаю команду устройству Bluetooth, запрашивающему все сети Wi-Fi в диапазоне
  2. Устройство включает радио Wi-Fi и отправляет обратно неизвестное количество сообщений, нов какой-то момент останавливается
  3. Я жду, пока сообщения прекратятся
  4. Я представляю полный список сетей Wi-Fi пользователю

Я думаю, что это должно быть сделано вэто что-то близкоеследующим образом.(Я удалил как можно больше кода, специфичного для Bluetooth, чтобы сосредоточиться на Rx):

//Create a new subject
Subject<int> subject = new Subject<int>();

//Observe the subject until some pre-determined stopping criteria
bool waiting = true;
IObservable<int> sequence = subject.TakeWhile(x => waiting);

//Subscribe to the subject so that I can trigger the stopping criteria
IDisposable subscription = sequence.Subscribe(
                onNext: result =>
                {
                     if (result > 50)
                        waiting = false;
                },
                onCompleted: () =>
                {
                     return;
                });

//fake bluetooth messages
int i = 0;
while (i < 100)
    subject.OnNext(i++);

//Gather them all up once the sequence is complete
//***application hangs here***
List<int> ints = await sequence.ToList() as List<int>;

//This line of code is never run
subscription.Dispose();

Я надеюсь, что парень из Rx поможет мне понять, почему этот вызов ToList () зависает.Я просто написал этот код на месте для этого вопроса, поэтому, если он не имеет смысла, дайте мне знать, и я обновлю его.

Вот фактический код, который использует стороннюю библиотеку Bluetooth и получаетэлементы с устройства Bluetooth.

    private static async Task<byte> WritePasswordToPeripheral<P>(P Peripheral, byte[] command) where P : Peripheral, IStatePeripheral
    {
        IGattService service = await Peripheral.RPHDevice.GetKnownService(BleService.Control);
        IGattCharacteristic characteristic = await service.GetKnownCharacteristics(BleCharacteristic.PasswordResult);

        //I know that this TakeWhile isn't necessary here because I'm using FirstAsync() later on
        //In some similar blocks I receive multiple notifications and so I need to decide when to stop listening in this way. 
        //In those situations I would call .ToList() instead of .FirstAsync()
        bool waiting = true;
        await characteristic.EnableNotifications().TakeWhile(x=>waiting);

        IObservable<CharacteristicGattResult> passwordResultSequence = characteristic
            .WhenNotificationReceived();

        IDisposable passwordResultSubscription = passwordResultSequence 
                                                    .Subscribe(
                                                    onNext: result =>
                                                    {
                                                        waiting = false;
                                                    },
                                                    onCompleted: () =>
                                                    {
                                                        return;
                                                    });

        try
        {
            await Peripheral.RPHDevice
                    .WriteCharacteristic(BleService.Control, BleCharacteristic.Password, command)
                    .Timeout(TimeSpan.FromSeconds(10));
        }
        catch (Exception)
        {
            return 0;
        }

        //In this case only one notification ever comes back and so FirstAsync would be nice
        var passwordResult = await passwordResultSequence.FirstAsync();
        await characteristic.DisableNotifications();
        passwordResultSubscription.Dispose();

        return passwordResult.Data[0];
    }

При получении уведомлений:

    IObservable<CharacteristicGattResult> notifyOb;
    public override IObservable<CharacteristicGattResult> WhenNotificationReceived()
    {
        this.AssertNotify();

        this.notifyOb = this.notifyOb ?? Observable.Create<CharacteristicGattResult>(ob =>
        {
            var handler = new EventHandler<CBCharacteristicEventArgs>((sender, args) =>
            {
                if (!this.Equals(args.Characteristic))
                    return;

                if (args.Error == null)
                    ob.OnNext(new CharacteristicGattResult(this, args.Characteristic.Value?.ToArray()));
                else
                    ob.OnError(new BleException(args.Error.Description));
            });
            this.Peripheral.UpdatedCharacterteristicValue += handler;
            return () => this.Peripheral.UpdatedCharacterteristicValue -= handler;
        })
        .Publish()
        .RefCount();

        return this.notifyOb;
    }

1 Ответ

0 голосов
/ 31 января 2019

У вас куча проблем с вашим кодом.

Во-первых, Rx - это модель асинхронного программирования, которую вы пытаетесь запустить синхронно.Вызов await sequence (и аналогично sequence.Wait()) вызовет у вас горе почти все время.

Далее вы создаете две подписки на наблюдаемую sequence - одну с sequence.Subscribe(...) и сновас await sequence.ToList().Это отдельные подписки на базовый subject, и их нужно рассматривать как отдельные.

И, наконец, вы смешиваете внешнее состояние (bool waiting = true) в свой запрос subject.TakeWhile(x => waiting).Это плохо, поскольку по своей сути не является поточно-ориентированным, и вы должны кодировать так, как будто ваш запрос выполняется в нескольких потоках.

Что происходит с вашим кодом, так это то, что await sequence.ToList() подписывается на ваш запрос ПОСЛЕ вы выкачали свои subject.OnNext(i++) значения, чтобы запрос никогда не заканчивался.Никакое значение никогда не выталкивается из объекта ПОСЛЕ .ToList(), чтобы вызвать .TakeWhile(x => waiting) для прекращения наблюдаемого..ToList() просто сидит в ожидании OnCompleted, которое никогда не приходит.

Вам нужно переместить await sequence.ToList() до того, как вы выкачаете значения - что вы не можете сделать, потому что он все равно застрянетожидание OnCompleted, которое никогда не наступит.

Вот почему вам нужно кодировать асинхронно.

Теперь две подписки также вызывают состояние гонки.sequence.Subscribe может установить waiting на false до того, как sequence.ToList() получит какие-либо значения.Вот почему вы должны кодировать так, как будто ваш запрос выполняется в нескольких потоках.Поэтому, чтобы избежать этого, у вас должна быть только одна подписка.

Вам нужно потерять .TakeWhile(x => waiting) и вставить условие внутрь так: subject.TakeWhile(x => x <= 50);.

Затем вы напишите свой код следующим образом:

//Create a new subject
Subject<int> subject = new Subject<int>();

//Observe the subject until some pre-determined stopping criteria
IObservable<int> sequence = subject.TakeWhile(x => x <= 50);

sequence
    .ToList()
    .Subscribe(list =>
    {
        Console.WriteLine(String.Join(", ", list));
    });

//fake bluetooth messages
int i = 0;
while (i < 100)
    subject.OnNext(i++);

Этот код запускается и выдает на консоль 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50.

Не пишите код Rx синхронно - потеряйте await.Не запускайте несколько подписок, которые могут создать условия гонки.Не вводите внешнее состояние в свои запросы.

Кроме того, с помощью метода WhenNotificationReceived вы не выполняете последовательность должным образом.

Вы используете опасный оператор .Publish().RefCount()пара, которая создает последовательность, на которую нельзя подписаться после ее завершения.

Попробуйте этот пример:

var query =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Take(3)
        .Publish()
        .RefCount();

var s1 = query.Subscribe(Console.WriteLine);

Thread.Sleep(2500);

var s2 = query.Subscribe(Console.WriteLine);

Thread.Sleep(2500);

s1.Dispose();
s2.Dispose();

var s3 = query.Subscribe(Console.WriteLine);

Thread.Sleep(2500);

s3.Dispose();

Это приводит только к:

0
1
2
2

The s3 подписка ничего не дает.Я не думаю, что это то, что вы ищете.

...