Многопоточность приложения с чрезвычайно высокой скоростью передачи сообщений (какой метод мне следует использовать?) - PullRequest
2 голосов
/ 27 октября 2011

Я создаю торговую платформу для хедж-фонда, и мне нужно иметь возможность обрабатывать рыночные данные в отдельном потоке (ах), и я пытаюсь выяснить, какой метод мне следует использовать.

Конечная цель состоит в том, чтобы иметь возможность обрабатывать до 1000 сообщений в секунду как можно быстрее.

Вот что я думаю до сих пор:

Метод I: Поток с ConcurrentQueue

Этот метод создает поток и, в основном, поток непрерывно извлекает сообщения из очереди (поток остается в живых).

public class MessageQueue
{
    private ConcurrentQueue<Message>    MessageQueue;
    private Thread                      QueueThread;

    public MessageQueue()
    {
        MessageQueue = new ConcurrentQueue<Message>();
        QueueThread = new Thread(ProcessQueue);
        QueueThread.Start();
    }

    public void ProcessQueue()
    {
        Message OrderMessage;

        while (IsRunning)
        {
            if (MessageQueue.TryDequeue(out OrderMessage)
            {
                ProcessMessage(OrderMessage);
            }
        }
    }

    public static void OnInboundMessage(Message MarketMessage)
    {
        MessageQueue.Enqueue(DataMessage);
    }

    public static void ProcessMessage(Message MarketMessage)
    {
        // Process Message Here
    }
}

Вопрос: Почему люди используютМетод ThreadStart при создании нового потока?Кажется, это работает так же хорошо, если вы не используете его - для чего он нужен?

Вопрос: В чем преимущество использования ConcurrentQueue?Когда у обычной очереди возникнет проблема параллелизма?Может показаться, что до тех пор, пока вы проверяете, что в очереди уже есть что-то в очереди, не должно быть никаких проблем параллелизма, но я, возможно, чего-то не понимаю.

Вопрос: Должен ли я устанавливать какие-либо свойства потока, такие как «ApartmentState» или «IsBackground»?

Метод II: ThreadPool

Этот метод просто вызывает ThreadPool с каждым новым сообщением.

public class MessagePool
{
    public static void OnInboundMessage(Message MarketMessage)
    {
        ThreadPool.QueueUserWorkItem(obj => ProcessMessage(MarketMessage);
    }

    public static void ProcessMessage(Message MarketMessage)
    {
        // Process Message here
    }
}

Вопрос: Существуют ли какие-либо настройки ThreadPool, которые могут сделать его более подходящим решением?

Вопрос: Я тоже не совсем 'get 'лямбда-выражения, но они, кажется, работают - возможно, есть лучший способ вызвать ThreadPools?Есть ли какие-либо недостатки (производительность и т. Д.) В использовании лямбды?

Метод III: MessageTasks

Этот метод использует Задачу аналогично ThreadPool.

public class MessageTasks
{
    public static void OnInboundMessage(Message MarketMessage)
    {
        Task.Factory.StartNew(() => ProcessMessage(MarketMessage));
    }

    public static void ProcessMessage(Message MarketMessage)
    {
        // Process Message here
    }
}

Вопрос: Повысит ли создание моих собственных TaskFactory и TaskScheduler свою производительность?

Мой тест производительности

Может показаться, что метод I, безусловно, самый быстрый методобработка сообщений.Казалось бы, ThreadPool добавляет около 9 микросекунд для добавления нового элемента Workitem, а метод Task.Factory - около 12 микросекунд.У кого-нибудь были похожие результаты?

Заранее спасибо за вашу помощь - и если вы считаете, что я решаю эту проблему совершенно неправильно, не стесняйтесь, дайте мне знать!

Уильям

Ответы [ 2 ]

4 голосов
/ 27 октября 2011

Публичная торговая платформа с несколькими тысячами запросов в секунду написала http://code.google.com/p/disruptor/ для поддержки этого варианта использования. Он написан на Java, и похожий на работу родственный проект создал http://code.google.com/p/disruptor-net/ для платформы .NET.

Имеется хорошая презентация на http://www.infoq.com/presentations/LMAX, которая объясняет преимущества в производительности (в 10-100 раз быстрее, чем ConcurrentQueue) и надежности и обсуждает необходимые изменения в остальной части архитектуры программы.

1 голос
/ 27 октября 2011

Я бы предложил использовать Concurrent Queue вместе с Конкурирующими потребителями корпоративной моделью интеграции.Этот подход позволяет получать несколько сообщений одновременно и одновременно обрабатывать сообщения несколькими потоками потребителя.Для достижения наилучших результатов производительности вы должны реализовать хорошую многопоточную синхронизацию.

...