У вас куча проблем с вашим кодом.
Во-первых, Rx - это модель асинхронного программирования, которую вы пытаетесь запустить синхронно.Вызов await sequence
(и аналогично sequence.Wait()
) вызовет у вас горе почти все время.
Далее вы создаете две подписки на наблюдаемую sequence
- одну с sequence.Subscribe(...)
и сновас await sequence.ToList()
.Это отдельные подписки на базовый subject
, и их нужно рассматривать как отдельные.
И, наконец, вы смешиваете внешнее состояние (bool waiting = true
) в свой запрос subject.TakeWhile(x => waiting)
.Это плохо, поскольку по своей сути не является поточно-ориентированным, и вы должны кодировать так, как будто ваш запрос выполняется в нескольких потоках.
Что происходит с вашим кодом, так это то, что await sequence.ToList()
подписывается на ваш запрос ПОСЛЕ вы выкачали свои subject.OnNext(i++)
значения, чтобы запрос никогда не заканчивался.Никакое значение никогда не выталкивается из объекта ПОСЛЕ .ToList()
, чтобы вызвать .TakeWhile(x => waiting)
для прекращения наблюдаемого..ToList()
просто сидит в ожидании OnCompleted
, которое никогда не приходит.
Вам нужно переместить await sequence.ToList()
до того, как вы выкачаете значения - что вы не можете сделать, потому что он все равно застрянетожидание OnCompleted
, которое никогда не наступит.
Вот почему вам нужно кодировать асинхронно.
Теперь две подписки также вызывают состояние гонки.sequence.Subscribe
может установить waiting
на false
до того, как sequence.ToList()
получит какие-либо значения.Вот почему вы должны кодировать так, как будто ваш запрос выполняется в нескольких потоках.Поэтому, чтобы избежать этого, у вас должна быть только одна подписка.
Вам нужно потерять .TakeWhile(x => waiting)
и вставить условие внутрь так: subject.TakeWhile(x => x <= 50);
.
Затем вы напишите свой код следующим образом:
//Create a new subject
Subject<int> subject = new Subject<int>();
//Observe the subject until some pre-determined stopping criteria
IObservable<int> sequence = subject.TakeWhile(x => x <= 50);
sequence
.ToList()
.Subscribe(list =>
{
Console.WriteLine(String.Join(", ", list));
});
//fake bluetooth messages
int i = 0;
while (i < 100)
subject.OnNext(i++);
Этот код запускается и выдает на консоль 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50
.
Не пишите код Rx синхронно - потеряйте await
.Не запускайте несколько подписок, которые могут создать условия гонки.Не вводите внешнее состояние в свои запросы.
Кроме того, с помощью метода WhenNotificationReceived
вы не выполняете последовательность должным образом.
Вы используете опасный оператор .Publish().RefCount()
пара, которая создает последовательность, на которую нельзя подписаться после ее завершения.
Попробуйте этот пример:
var query =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Take(3)
.Publish()
.RefCount();
var s1 = query.Subscribe(Console.WriteLine);
Thread.Sleep(2500);
var s2 = query.Subscribe(Console.WriteLine);
Thread.Sleep(2500);
s1.Dispose();
s2.Dispose();
var s3 = query.Subscribe(Console.WriteLine);
Thread.Sleep(2500);
s3.Dispose();
Это приводит только к:
0
1
2
2
The s3
подписка ничего не дает.Я не думаю, что это то, что вы ищете.