Масштабирование соединений с помощью BlockingCollection <T>() - PullRequest
0 голосов
/ 20 ноября 2018

У меня есть сервер, который связывается с 50 или более устройствами через TCP LAN. Для каждого цикла чтения сообщения сокета существует Task.Run.

Я буферизую каждое сообщение, попадающее в очередь блокировки, где каждая очередь блокировки имеет Task.Run с использованием BlockingCollection.Take ().

Так что-то вроде (полупсевдокод):

Задание чтения сокета

Task.Run(() =>
{
    while (notCancelled)
    {
        element = ReadXml();
        switch (element)
        {
            case messageheader:
                MessageBlockingQueue.Add(deserialze<messageType>());
            ...
        }
    }
});

Задача буфера сообщений

Task.Run(() =>
{
    while (notCancelled)
    {
        Process(MessageQueue.Take());
    }
});

Таким образом, 50+ задач чтения и 50+ задач блокируются в своих собственных буферах.

Я сделал это таким образом, чтобы избежать блокировки цикла чтения и позволить программе более справедливо распределять время обработки сообщений, или, как я полагаю.

Это неэффективный способ справиться с этим? что будет лучше?

Ответы [ 3 ]

0 голосов
/ 20 ноября 2018

Да, это немного неэффективно, потому что вы блокируете потоки ThreadPool.Я уже обсуждал эту проблему Использование Task.Yield для преодоления истощения ThreadPool при реализации шаблона производитель / потребитель

Вы также можете посмотреть примеры с тестированием шаблона производитель-потребитель: https://github.com/BBGONE/TestThreadAffinity

Вы можете использовать await Task.Yield в цикле, чтобы предоставить другим задачам доступ к этой теме.

Вы можете решить эту проблему также с помощью выделенных потоков или, лучше, пользовательского ThreadScheduler, который использует собственный пул потоков.Но неэффективно создавать более 50 простых потоков.Лучше настроить задачу, чтобы она была более эффективной.

Если вы используете BlockingCollection (потому что он может долго блокировать поток в ожидании записи (, если ограничен ) или чтенияили нет элементов для чтения) тогда лучше использовать System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md

Они не блокируют поток во время ожидания, когда коллекция будет доступна длянаписать или прочитать.Вот пример как это используется https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest

0 голосов
/ 20 ноября 2018

Вас может заинтересовать работа «каналов», в частности: System.Threading.Channels .Цель этого - предоставить асинхронные очереди поставщика / потребителя, охватывающие как одного, так и нескольких сценариев производителя и потребителя, верхние пределы и т. Д. Используя асинхронный API, вы не просто связываете множество потоковв ожидании чего-либо.

Ваш цикл чтения станет:

while (notCancelled) {
    var next = await queue.Reader.ReadAsync(optionalCancellationToken);
    Process(next);
}

, а производитель:

switch (element)
{
    case messageheader:
        queue.Writer.TryWrite(deserialze<messageType>());
        ...
}

так: минимальные изменения


В качестве альтернативы - или в комбинации - вы можете посмотреть на такие вещи, как «конвейеры» (https://www.nuget.org/packages/System.IO.Pipelines/)) - поскольку вы имеете дело с данными TCP, это было бы идеально подходит, и я обратил на это внимание.для пользовательского сервера веб-сокетов здесь, в Stack Overflow (который имеет дело с огромным числом соединений). Поскольку API является асинхронным во всем, он выполняет хорошую работу по балансировке работы - и API конвейеров спроектирован сИмеются в виду типичные сценарии TCP, например, частичное использование входящих потоков данных при определении границ фрейма. Я много писал об этом, с примерами кодав основном здесь .Обратите внимание, что "конвейеры" не включают в себя прямой уровень TCP, но сервер "kestrel" включает один или стороннюю библиотеку https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ (раскрытие: я ее написал).

0 голосов
/ 20 ноября 2018

На самом деле я делаю нечто подобное в другом проекте.Что я узнал или сделал бы по-другому:

  1. Прежде всего, лучше использовать выделенные потоки для цикла чтения / записи (с new Thread(ParameterizedThreadStart)), потому что Task.Run используетпоток пула, и когда вы используете его в (почти) бесконечном цикле, поток практически никогда не возвращается в пул.

    var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
    thread.Start(cancellationToken);
    
  2. Ваш Process может быть событием, которое выможет вызываться асинхронно, поэтому ваш цикл чтения может быть немедленно возвращен для обработки новых входящих пакетов как можно быстрее:

    private void ReaderLoop(object state)
    {
        var token = (CancellationToken)state;
        while (!token.IsCancellationRequested)
        {
            try
            {
                var message = MessageQueue.Take(token);
                OnMessageReceived(new MessageReceivedEventArgs(message));
            }
            catch (OperationCanceledException)
            {
                if (!disposed && IsRunning)
                    Stop();
                break;
            }
        }
    }
    

Обратите внимание, что если у делегата несколько целей, то это асинхронный вызовне тривиально.Я создал этот метод расширения для вызова делегата в потоках пула:

public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
    void Callback(IAsyncResult ar)
    {
        var method = (EventHandler<TEventArgs>)ar.AsyncState;
        try
        {
            method.EndInvoke(ar);
        }
        catch (Exception e)
        {
            HandleError(e, method);
        }
    }

    foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
        handler.BeginInvoke(sender, args, Callback, handler);
}

Таким образом, реализация OnMessageReceived может быть:

protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
    => messageReceivedHandler.InvokeAsync(this, e);

Наконец это был большой урок, что у BlockingCollection<T> есть некоторые проблемы с производительностью.Он использует SpinWait внутри, чей метод SpinOnce ждет все дольше и дольше, если в течение длительного времени нет входящих данных.Это сложная проблема, потому что даже если вы зарегистрируете каждый отдельный шаг обработки, вы не заметите, что все запускается с задержкой, если только вы не можете посмеяться над серверной стороной. Здесь вы можете найти быструю реализацию BlockingCollection, использующую AutoResetEvent для запуска входящих данных.Я добавил к нему перегрузку Take(CancellationToken) следующим образом:

/// <summary>
/// Takes an item from the <see cref="FastBlockingCollection{T}"/>
/// </summary>
public T Take(CancellationToken token)
{
    T item;
    while (!queue.TryDequeue(out item))
    {
        waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
        token.ThrowIfCancellationRequested();
    }

    return item;
}

По сути, все.Может быть, не все применимо в вашем случае, например.если почти немедленный ответ не имеет решающего значения, обычный BlockingCollection также сделает это.

...