Я согласен с Ричардом.
Реализация для .ToObservable()
выглядит следующим образом:
public static IObservable<TSource> ToObservable<TSource>(
this IEnumerable<TSource> source)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
return source.ToObservable<TSource>(Scheduler.CurrentThread);
}
Это вызывает перегрузку .ToObservable(IScheduler)
с Scheduler.CurrentThread
, и так как вы используете.Sleep(...)
, чтобы вызвать задержки, которые должно завершить наблюдаемое, прежде чем код сможет выйти за пределы метода .Subscribe(...)
.Подумайте только о том, как бы этот код вел себя, если бы он все выполнялся в одном потоке (что это такое.)
Чтобы обойти это, вы можете использовать пул задач или планировщики пула потоков, как предлагает Ричард, ноЯ думаю, у вас есть более фундаментальная проблема с вашим кодом.И это то, что вы используете поток «старой школы», спящий и не полагающийся на методы Rx.
Попробуйте создать свою наблюдаемую вещь:
var observable =
Observable
.GenerateWithTime(0, i => i <= 1000, i => i + 1,
i => i, i => TimeSpan.FromMilliseconds(i % 10 < 5 ? 500 : 1000))
.Timestamp();
GenerateWithTime(...)
делает все, чтоваш метод GenerateAlternatingFastAndSlowEvents
сделал, но он создает наблюдаемое напрямую и делает это, используя Scheduler.ThreadPool
под капотом, так что вам не нужно указывать какие-либо планировщики.