Последовательная обработка ConcurrentQueue и ограничение одним обработчиком сообщений.Правильный узор? - PullRequest
3 голосов
/ 23 декабря 2011

Я создаю многопоточное приложение в .net.

У меня есть поток, который прослушивает соединение (абстрактное, последовательное, tcp ...).

Когда он получает новое сообщение, он добавляет его через AddMessage. Который затем вызвать startSpool. startSpool проверяет, запущена ли катушка, и, если она есть, возвращает, в противном случае запускает ее в новом потоке. Причина этого в том, что сообщения должны обрабатываться последовательно, FIFO.

Итак, мои вопросы ... Я иду об этом правильным путем? Есть ли лучшие, более быстрые и дешевые модели там?

Мои извинения, если в моем коде есть опечатка, у меня были проблемы с копированием и вставкой.

    ConcurrentQueue<IMyMessage > messages = new ConcurrentQueue<IMyMessage>();

    const int maxSpoolInstances = 1;

    object lcurrentSpoolInstances;
    int currentSpoolInstances = 0;

    Thread spoolThread;

    public void AddMessage(IMyMessage message)
    {
        this.messages.Add(message);

        this.startSpool();
    }

    private void startSpool()
    {
        bool run = false;

        lock (lcurrentSpoolInstances)
        {
            if (currentSpoolInstances <= maxSpoolInstances)
            {
                this.currentSpoolInstances++;
                run = true;
            }
            else
            {
                return;
            }
        }

        if (run)
        {
            this.spoolThread = new Thread(new ThreadStart(spool));
            this.spoolThread.Start();
        }
    }

    private void spool()
    {
        Message.ITimingMessage message;

        while (this.messages.Count > 0)
        {
            // TODO: Is this below line necessary or does the TryDequeue cover this?
            message = null;

            this.messages.TryDequeue(out message);

            if (message != null)
            {
                // My long running thing that does something with this message.
            }
        }


        lock (lcurrentSpoolInstances)
        {
            this.currentSpoolInstances--;
        }
    }

Ответы [ 3 ]

4 голосов
/ 23 декабря 2011

Было бы проще использовать BlockingCollection<T> вместо ConcurrentQueue<T>.

Примерно так должно работать:

class MessageProcessor : IDisposable
{
    BlockingCollection<IMyMessage> messages = new BlockingCollection<IMyMessage>();

    public MessageProcessor()
    {
       // Move this to constructor to prevent race condition in existing code (you could start multiple threads...
       Task.Factory.StartNew(this.spool, TaskCreationOptions.LongRunning);
    }

    public void AddMessage(IMyMessage message)
    {
        this.messages.Add(message);
    }

    private void Spool()
    {
         foreach(IMyMessage message in this.messages.GetConsumingEnumerable())
         {
               // long running thing that does something with this message.
         }
    }

    public void FinishProcessing()
    {
         // This will tell the spooling you're done adding, so it shuts down
         this.messages.CompleteAdding();
    }

    void IDisposable.Dispose()
    {
        this.FinishProcessing();
    }
}

Редактировать: Если вы хотите поддерживать несколько потребителей, вы можете обработать это с помощью отдельного конструктора.Я бы изменил это на:

    public MessageProcessor(int numberOfConsumers = 1)
    {
        for (int i=0;i<numberOfConsumers;++i)
            StartConsumer();
    }

    private void StartConsumer()
    {
       // Move this to constructor to prevent race condition in existing code (you could start multiple threads...
       Task.Factory.StartNew(this.spool, TaskCreationOptions.LongRunning);
    }

Это позволит вам запустить любое количество потребителей.Обратите внимание, что это нарушает правило, согласно которому это должно быть строго FIFO - обработка может потенциально обработать элементы «numberOfConsumer» в блоках с этим изменением.

Несколько производителей уже поддерживаются.Вышеуказанное является поточно-ориентированным, поэтому любое количество потоков может вызывать Add(message) параллельно, без изменений.

1 голос
/ 24 декабря 2011

Я думаю, что ответ Рида - лучший путь, но ради академиков, вот пример, использующий параллельную очередь - у вас было несколько гонок в коде, который вы опубликовали (в зависимости от того, как вы обрабатываете увеличивающиеся currnetSpoolInstances)

Изменения, которые я сделал (ниже), были:

  • Переключен на задачу вместо потока (использует пул потоков вместо затрат на создание нового потока)
  • добавил код для увеличения / уменьшения количества экземпляров буфера
  • изменил "if currentSpoolInstances <= max ... на просто <, чтобы избежать слишком большого числа рабочих (вероятно, просто опечатка) </li>
  • изменил способ обработки пустых очередей, чтобы избежать гонки: я думаю, что у вас была гонка, в которой ваш цикл while мог проверить значение false (ваш поток начинает выходить), но в этот момент новый элементдобавлено (значит, ваша ветка катушки завершается, но количество катушек> 0, поэтому ваша очередь останавливается).

private ConcurrentQueue<IMyMessage> messages = new ConcurrentQueue<IMyMessage>();

const int maxSpoolInstances = 1;
object lcurrentSpoolInstances = new object();
int currentSpoolInstances = 0;

public void AddMessage(IMyMessage message)
{
    this.messages.Enqueue(message);
    this.startSpool();
}

private void startSpool()
{
    lock (lcurrentSpoolInstances)
    {
        if (currentSpoolInstances < maxSpoolInstances)
        {
            this.currentSpoolInstances++;
            Task.Factory.StartNew(spool, TaskCreationOptions.LongRunning);
        }
    }
}

private void spool()
{
    IMyMessage message;
    while (true)
    {
// you do not need to null message because it is an "out" parameter, had it been a "ref" parameter, you would want to null it.

        if(this.messages.TryDequeue(out message))
        {
            // My long running thing that does something with this message. 
        }
        else
        {
            lock (lcurrentSpoolInstances)
            {
                if (this.messages.IsEmpty)
                {
                    this.currentSpoolInstances--;
                    return;
                }
            }
        }
    }
} 


0 голосов
/ 15 января 2015

Отметьте «Шаблон трубопроводов»: http://msdn.microsoft.com/en-us/library/ff963548.aspx

  • Используйте BlockingCollection для «буферов».
  • Каждый процессор (например, ReadStrings, CorrectCase, ..) должен запускаться в задании.

enter image description here

НТН ..

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...