Почему Rx Observable.Subscribe блокирует мою ветку? - PullRequest
4 голосов
/ 25 февраля 2011

Привет! Я попробовал один из 101 примеров приема:

    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while (true)
        {
            if (i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep(i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }

Я не понимаю, почему строка "Нажмите любую клавишу, чтобы отписаться" никогда не отображается.Я понимаю, что подписка является асинхронной, вы подписываетесь, и она сразу же возвращается.Чего мне не хватает, что приводит к блокировке основного потока?

Ответы [ 2 ]

7 голосов
/ 25 февраля 2011

Блокировка вызвана сочетанием ваших перечислимых циклов для while (true) и IEnumerable<T>.ToObservable() методов расширения со значением по умолчанию CurrentThreadScheduler.

Если вы поставите Scheduler.TaskPool (или Scheduler.ThreadPool в до .NET 4) с перегрузкой ToObservable, вы должны увидеть ожидаемое поведение (хотя оно не вызовет вашего абонента на главной нить, к вашему сведению).

Сказав это, я думаю, вы обнаружите, что ваша комбинация Thread.Sleep и Throttle будет работать так, как вы ожидаете. Возможно, вам лучше создать собственную наблюдаемую программу, которая использует планировщик для планирования ваших задержек.

2 голосов
/ 25 февраля 2011

Я согласен с Ричардом.

Реализация для .ToObservable() выглядит следующим образом:

public static IObservable<TSource> ToObservable<TSource>(
    this IEnumerable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return source.ToObservable<TSource>(Scheduler.CurrentThread);
}

Это вызывает перегрузку .ToObservable(IScheduler) с Scheduler.CurrentThread, и так как вы используете.Sleep(...), чтобы вызвать задержки, которые должно завершить наблюдаемое, прежде чем код сможет выйти за пределы метода .Subscribe(...).Подумайте только о том, как бы этот код вел себя, если бы он все выполнялся в одном потоке (что это такое.)

Чтобы обойти это, вы можете использовать пул задач или планировщики пула потоков, как предлагает Ричард, ноЯ думаю, у вас есть более фундаментальная проблема с вашим кодом.И это то, что вы используете поток «старой школы», спящий и не полагающийся на методы Rx.

Попробуйте создать свою наблюдаемую вещь:

var observable =
    Observable
        .GenerateWithTime(0, i => i <= 1000, i => i + 1,
            i => i, i => TimeSpan.FromMilliseconds(i % 10 < 5 ? 500 : 1000))
        .Timestamp();

GenerateWithTime(...) делает все, чтоваш метод GenerateAlternatingFastAndSlowEvents сделал, но он создает наблюдаемое напрямую и делает это, используя Scheduler.ThreadPool под капотом, так что вам не нужно указывать какие-либо планировщики.

...