Реактивные расширения кажутся очень медленными - я что-то не так делаю? - PullRequest
13 голосов
/ 25 ноября 2010

Я оцениваю Rx для проекта торговой платформы, который должен будет обрабатывать тысячи сообщений в секунду.Существующая платформа имеет сложную систему маршрутизации событий (многоадресные делегаты), которая отвечает на эти сообщения и выполняет много последующей обработки.

Я посмотрел на Reactive Extensions для очевидных преимуществ, но заметил, что он несколько медленнее, обычноВ 100 раз медленнее.

Я создал модульный тест для демонстрации этого, который выполняется с простым приращением в 1 миллион раз, с использованием различных разновидностей Rx и прямого "контрольного" теста делегата.

Вот результаты:

Delegate                                 - (1000000) - 00:00:00.0410000
Observable.Range()                       - (1000000) - 00:00:04.8760000
Subject.Subscribe() - NewThread          - (1000000) - 00:00:02.7630000
Subject.Subscribe() - CurrentThread      - (1000000) - 00:00:03.0280000
Subject.Subscribe() - Immediate          - (1000000) - 00:00:03.0030000
Subject.Subscribe() - ThreadPool         - (1000000) - 00:00:02.9800000
Subject.Subscribe() - Dispatcher         - (1000000) - 00:00:03.0360000

Как видите, все методы Rx примерно в 100 раз медленнее, чем эквивалент делегата.Очевидно, что Rx много делает под прикрытием, которое будет полезно в более сложном примере, но это кажется невероятно медленным.

Это нормально или мои предположения о тестировании неверны?Код единицы измерения для приведенного ниже -

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using NUnit.Framework;
using System.Concurrency;

namespace RxTests
{
    [TestFixture]
    class ReactiveExtensionsBenchmark_Tests
    {
        private int counter = 0;

        [Test]
        public void ReactiveExtensionsPerformanceComparisons()
        {
            int iterations = 1000000;

            Action<int> a = (i) => { counter++; };

            DelegateSmokeTest(iterations, a);
            ObservableRangeTest(iterations, a);
            SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread");
            SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread");
            SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate");
            SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool");
            SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher");
        }

        public void ObservableRangeTest(int iterations, Action<int> action)
        {
            counter = 0;

            long start = DateTime.Now.Ticks;

            Observable.Range(0, iterations).Subscribe(action);

            OutputTestDuration("Observable.Range()", start);
        }


        public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
        {
            counter = 0;

            var eventSubject = new Subject<int>();
            var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb
            events.Subscribe(action);

            long start = DateTime.Now.Ticks;

            Enumerable.Range(0, iterations).ToList().ForEach
                (
                    a => eventSubject.OnNext(1)
                );

            OutputTestDuration("Subject.Subscribe() - " + mode, start);
        }

        public void DelegateSmokeTest(int iterations, Action<int> action)
        {
            counter = 0;
            long start = DateTime.Now.Ticks;

            Enumerable.Range(0, iterations).ToList().ForEach
                (
                    a => action(1)
                );

            OutputTestDuration("Delegate", start);
        }


        /// <summary>
        /// Output helper
        /// </summary>
        /// <param name="test"></param>
        /// <param name="duration"></param>
        public void OutputTestDuration(string test, long duration)
        {
            Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration)));
        }

        /// <summary>
        /// Test timing helper
        /// </summary>
        /// <param name="elapsedTicks"></param>
        /// <returns></returns>
        public string ElapsedDuration(long elapsedTicks)
        {
            return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString();
        }

    }
}

Ответы [ 3 ]

17 голосов
/ 25 ноября 2010

Я предполагаю, что команда Rx в первую очередь фокусируется на создании функциональности и пока не заботится об оптимизации производительности.

Используйте профилировщик для определения узких мест и замените классы медленного Rx на свои собственные оптимизированные версии.

Ниже приведены два примера.

Результаты:

Delegate                                 - (1000000) - 00:00:00.0368748

Simple - NewThread                       - (1000000) - 00:00:00.0207676
Simple - CurrentThread                   - (1000000) - 00:00:00.0214599
Simple - Immediate                       - (1000000) - 00:00:00.0162026
Simple - ThreadPool                      - (1000000) - 00:00:00.0169848

FastSubject.Subscribe() - NewThread      - (1000000) - 00:00:00.0588149
FastSubject.Subscribe() - CurrentThread  - (1000000) - 00:00:00.0508842
FastSubject.Subscribe() - Immediate      - (1000000) - 00:00:00.0513911
FastSubject.Subscribe() - ThreadPool     - (1000000) - 00:00:00.0529137

Прежде всего, кажется, очень важно, как реализуется наблюдаемая.Вот заметка, от которой нельзя отписаться, но она быстрая:

private IObservable<int> CreateFastObservable(int iterations)
{
    return Observable.Create<int>(observer =>
    {
        new Thread(_ =>
        {
            for (int i = 0; i < iterations; i++)
            {
                observer.OnNext(i);
            }
            observer.OnCompleted();
        }).Start();
        return () => { };
    });
}

Тест:

public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
    counter = 0;

    var start = Stopwatch.StartNew();

    var observable = CreateFastObservable(iterations);

    observable.SubscribeOn(scheduler).Run(action);

    OutputTestDuration("Simple - " + mode, start);
}

Субъекты добавляют много накладных расходов.Вот предмет, который лишен большей части функциональности, ожидаемой от предмета, но это быстро:

class FastSubject<T> : ISubject<T>
{
    private event Action onCompleted;
    private event Action<Exception> onError;
    private event Action<T> onNext;

    public FastSubject()
    {
        onCompleted += () => { };
        onError += error => { };
        onNext += value => { };
    }

    public void OnCompleted()
    {
        this.onCompleted();
    }

    public void OnError(Exception error)
    {
        this.onError(error);
    }

    public void OnNext(T value)
    {
        this.onNext(value);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        this.onCompleted += observer.OnCompleted;
        this.onError += observer.OnError;
        this.onNext += observer.OnNext;

        return Disposable.Create(() =>
        {
            this.onCompleted -= observer.OnCompleted;
            this.onError -= observer.OnError;
            this.onNext -= observer.OnNext;
        });
    }
}

Тест:

public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
    counter = 0;

    var start = Stopwatch.StartNew();

    var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount();

    observable.SubscribeOn(scheduler).Run(action);

    OutputTestDuration("FastSubject.Subscribe() - " + mode, start);
}
12 голосов
/ 25 августа 2012

Обновление для Rx 2.0: я взял код из исходного поста с (почти) последней бета-версией Linqpad 4.42.04 (ну, есть 06, но все равно): Rx Main assemblies

... инемного скорректировал его, чтобы использовать новый синтаксис планировщика Rx v2:

        public void ReactiveExtensionsPerformanceComparisons()
    {
        int iterations = 1000000;

        Action<int> a = (i) => { counter++; };

        DelegateSmokeTest(iterations, a);
        ObservableRangeTest(iterations, a);
        SubjectSubscribeTest(iterations, a, NewThreadScheduler.Default, "NewThread");
        SubjectSubscribeTest(iterations, a, CurrentThreadScheduler.Instance, "CurrentThread");
        SubjectSubscribeTest(iterations, a, ImmediateScheduler.Instance, "Immediate");
        SubjectSubscribeTest(iterations, a, ThreadPoolScheduler.Instance, "ThreadPool");
        // I *think* this is the same as the ThreadPool scheduler in my case
        SubjectSubscribeTest(iterations, a, DefaultScheduler.Instance, "Default");                
        // doesn't work, as LinqPad has no Dispatcher attched to the Gui thread, maybe there's a workaround; the Instance property on it is obsolete
        //SubjectSubscribeTest(iterations, a, DispatcherScheduler.Current, "ThreadPool");
    }

Примечание: результаты сильно различаются, в редких случаях Threadpool превосходит newThread, но в большинстве случаев NewThread имеет небольшое преимущество вышепланировщики под ним в списке:

Delegate                                 - (1000000) - 00:00:00.0440025
Observable.Range()                       - (1000000) - 00:00:01.9251101
Subject.Subscribe() - NewThread          - (1000000) - 00:00:00.0400023
Subject.Subscribe() - CurrentThread      - (1000000) - 00:00:00.0530030
Subject.Subscribe() - Immediate          - (1000000) - 00:00:00.0490028
Subject.Subscribe() - ThreadPool         - (1000000) - 00:00:00.0490028
Subject.Subscribe() - Default            - (1000000) - 00:00:00.0480028

Так что, похоже, они очень много работали над производительностью ..

10 голосов
/ 30 ноября 2010

Помните, что ваш Delegate не гарантирует никакой безопасности потока - он буквально вызывает делегат из любого потока, из которого он вызван, тогда как когда вы вызываете Observable.ObserveOn для маршалинга уведомлений другим потокам, Rx.NET должен сделать блокировку, чтобы сделатьуверен, что он делает то, что, как вы думаете, делает.

Итак, делегаты могут двигаться очень быстро, но если вы хотите создать что-то практичное, используя его, вы в конечном итоге создадите синхронизацию вручную, что замедлит вас.вниз.При этом Rx, как и LINQ, является абстракцией - если вам нужно, чтобы это было невероятно быстро, вы должны начать писать некрасивый код.

...