Я оцениваю Rx для проекта торговой платформы, который должен будет обрабатывать тысячи сообщений в секунду.Существующая платформа имеет сложную систему маршрутизации событий (многоадресные делегаты), которая отвечает на эти сообщения и выполняет много последующей обработки.
Я посмотрел на Reactive Extensions для очевидных преимуществ, но заметил, что он несколько медленнее, обычноВ 100 раз медленнее.
Я создал модульный тест для демонстрации этого, который выполняется с простым приращением в 1 миллион раз, с использованием различных разновидностей Rx и прямого "контрольного" теста делегата.
Вот результаты:
Delegate - (1000000) - 00:00:00.0410000
Observable.Range() - (1000000) - 00:00:04.8760000
Subject.Subscribe() - NewThread - (1000000) - 00:00:02.7630000
Subject.Subscribe() - CurrentThread - (1000000) - 00:00:03.0280000
Subject.Subscribe() - Immediate - (1000000) - 00:00:03.0030000
Subject.Subscribe() - ThreadPool - (1000000) - 00:00:02.9800000
Subject.Subscribe() - Dispatcher - (1000000) - 00:00:03.0360000
Как видите, все методы Rx примерно в 100 раз медленнее, чем эквивалент делегата.Очевидно, что Rx много делает под прикрытием, которое будет полезно в более сложном примере, но это кажется невероятно медленным.
Это нормально или мои предположения о тестировании неверны?Код единицы измерения для приведенного ниже -
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using NUnit.Framework;
using System.Concurrency;
namespace RxTests
{
[TestFixture]
class ReactiveExtensionsBenchmark_Tests
{
private int counter = 0;
[Test]
public void ReactiveExtensionsPerformanceComparisons()
{
int iterations = 1000000;
Action<int> a = (i) => { counter++; };
DelegateSmokeTest(iterations, a);
ObservableRangeTest(iterations, a);
SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread");
SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread");
SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate");
SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool");
SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher");
}
public void ObservableRangeTest(int iterations, Action<int> action)
{
counter = 0;
long start = DateTime.Now.Ticks;
Observable.Range(0, iterations).Subscribe(action);
OutputTestDuration("Observable.Range()", start);
}
public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;
var eventSubject = new Subject<int>();
var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb
events.Subscribe(action);
long start = DateTime.Now.Ticks;
Enumerable.Range(0, iterations).ToList().ForEach
(
a => eventSubject.OnNext(1)
);
OutputTestDuration("Subject.Subscribe() - " + mode, start);
}
public void DelegateSmokeTest(int iterations, Action<int> action)
{
counter = 0;
long start = DateTime.Now.Ticks;
Enumerable.Range(0, iterations).ToList().ForEach
(
a => action(1)
);
OutputTestDuration("Delegate", start);
}
/// <summary>
/// Output helper
/// </summary>
/// <param name="test"></param>
/// <param name="duration"></param>
public void OutputTestDuration(string test, long duration)
{
Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration)));
}
/// <summary>
/// Test timing helper
/// </summary>
/// <param name="elapsedTicks"></param>
/// <returns></returns>
public string ElapsedDuration(long elapsedTicks)
{
return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString();
}
}
}