Почему IEnumerable.ToObservable такой медленный? - PullRequest
9 голосов
/ 02 апреля 2020

Я пытаюсь один раз перечислить большое IEnumerable и наблюдать за перечислением с различными присоединенными операторами (Count, Sum, Average et c). Очевидный способ - преобразовать его в IObservable с помощью метода ToObservable, а затем подписать на него наблюдателя. Я заметил, что это намного медленнее, чем другие методы, такие как выполнение простого l oop и уведомление наблюдателя о каждой итерации или использование метода Observable.Create вместо ToObservable. Разница существенная: в 20-30 раз медленнее. Это то, что есть, или я что-то не так делаю?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Вывод:

ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec

. NET Core 3.0, C# 8, System.Reactive 4.3. 2, Windows 10, Консольное приложение, Выпуск построен


Обновление: Вот пример реальной функциональности, которую я хочу достичь:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");

Вывод:

Количество: 10 000 000, Сумма: 49 999 995 000 000, Среднее: 4 999 999,5

Важное отличие этого подхода по сравнению с использованием стандартных LINQ операторов , перечисляемый источник перечисляется только один раз.


Еще одно наблюдение: с использованием ToObservable(Scheduler.Immediate) немного быстрее (около 20%), чем ToObservable().

Ответы [ 2 ]

6 голосов
/ 02 апреля 2020

В этом разница между наблюдаемым с хорошим поведением и наблюдаемым «наблюдаем за тем, как думаешь, быстрее, лучше, но это не так».

Когда Вы достаточно глубоко погрузитесь в источник и обнаружите эту прекрасную маленькую строчку:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));

Фактически вызывает hasNext = enumerator.MoveNext(); один раз за запланированную рекурсивную итерацию.

Это позволяет вам выбрать планировщик для вашего .ToObservable(schedulerOfYourChoice) вызова.

С помощью других выбранных вами опций вы создали серию простых вызовов .OnNext, которые практически ничего не делают. Method2 даже не имеет вызова .Subscribe.

Оба из Method2 и Method1 выполняются с использованием текущего потока, и оба запускаются до завершения до завершения подписки. Они блокируют звонки. Они могут вызывать условия гонки.

Method1 - единственный, который ведет себя хорошо как наблюдаемый. Он асинхронный и может работать независимо от подписчика.

Имейте в виду, что наблюдаемые - это коллекции, которые выполняются с течением времени. Обычно они имеют асин c источник или таймер или реакцию на внешний раздражитель. Они не часто убегают от простого перечисления. Если вы работаете с перечислимым, то следует ожидать, что синхронная работа будет выполняться быстрее.

Скорость не является целью Rx. Выполнение сложных запросов на основе временных значений - это цель.

0 голосов
/ 02 апреля 2020

Потому что Субъект ничего не делает.

Похоже, что производительность оператора l oop различна для 2 случаев:

for(int i=0;i<1000000;i++)
    total++;

или

for(int i=0;i<1000000;i++)
    DoHeavyJob();

Если используется другой субъект, с Медленная реализация OnNext, результат будет более приемлемым свой собственный IScheduler и решить, когда запускать каждую задачу

Надеюсь, это поможет

С уважением

...