Обработка потоков для многопроцессорной обработки - PullRequest
0 голосов
/ 23 мая 2011

У меня возникла проблема с управлением потоками в .Net 4.0 C #, и моих знаний о потоках недостаточно для ее решения, поэтому я разместил ее здесь, ожидая, что кто-нибудь может дать мне какой-нибудь совет, пожалуйста.

Сценарий следующий:

У нас есть служба Windows на C # framework 4.0, которая (1) подключается через сокет к серверу для получения файла .PCM, (2) затем преобразует его в файл .WAV, (3) отправляет его по электронной почте - SMTP и наконец (4) уведомить исходный сервер об успешной отправке.

Сервер, на котором была установлена ​​служба, имеет 8 процессоров и 8 ГБ или ОЗУ.

Чтобы разрешить многопроцессорность, я построил сервис с 4-мя потоками, каждый из которых выполняет каждую задачу, о которой я упоминал ранее.

В коде у меня есть классы и методы для каждой задачи, поэтому я создаю потоки и вызываю методы следующим образом:

 Thread eachThread = new Thread(object.PerformTask);

Внутри каждого метода у меня есть while, которое проверяет, живо ли соединение сокета, и продолжает выборку или обработку данных в зависимости от их типа.

while (_socket.Connected){ 
//perform task
}

Проблема в том, что по мере установки большего количества служб (одна и та же служба Windows реплицируется и направляется между двумя конечными точками на сервере для получения файлов через сокет), потребление ЦП резко увеличивается, каждая служба продолжает работать и обрабатывать файлы, но момент, когда потребление ресурсов процессора слишком велико, и сервер просто рухнул.

Вопрос: что бы вы предложили мне для обработки этого сценария, я имею в виду в общих чертах, что может быть хорошим подходом для обработки этих чрезвычайно востребованных задач обработки, чтобы избежать коллапса сервера при загрузке процессора?

Спасибо.

PS .: Если кому-то нужна более подробная информация о сценарии, пожалуйста, дайте мне знать.

Редактировать 1

При сбое CPU я имею в виду, что сервер работает слишком медленно, и мы должны его перезапустить.

Редактировать 2

Здесь я публикую некоторую часть кода, чтобы вы могли понять, как он запрограммирован:

while(true){
            //starting the service    
            try
            {
                IPEndPoint endPoint = conn.SettingConnection();
                string id = _objProp.Parametros.IdApp;

                using (socket = conn.Connect(endPoint))
                {
                    while (!socket.Connected)
                    {
                        _log.SetLog("INFO", "Conectando socket...");
                        socket = conn.Connect(endPoint);

                        //if the connection failed, wait 5 seconds for a new try.
                        if (!socket.Connected)
                        {
                            Thread.Sleep(5000);
                        }
                    }

                    proInThread = new Thread(proIn.ThreadRun);
                    conInThread = new Thread(conIn.ThreadRun);
                    conOutThread = new Thread(conOut.ThreadRun);

                    proInThread.Start();
                    conInThread.Start();
                    conOutThread.Start();

                    proInThread.Join();
                    conInThread.Join();
                    conOutThread.Join();
                }
          }
     }

Редактировать 3

  • Тема 1

    while (_socket.Connected) { пытаться { var conn = new AppConection (ref _objPropiedades);

                    try
                    {
                        string message = conn.ReceiveMessage(_socket);
                        lock (((ICollection)_queue).SyncRoot)
                        {
                            _queue.Enqueue(message);
                            _syncEvents.NewItemEvent.Set();
                            _syncEvents.NewResetEvent.Set();
                        }
                        lock (((ICollection)_total_rec).SyncRoot)
                        {
    
                            _total_rec.Add("1");
                        }
                    }
                    catch (SocketException ex)
                    {
                        //log exception
    
                    }
                    catch (IndexOutOfRangeException ex)
                    {
                        //log exception
                    }
                    catch (Exception ex)
                    {
                       //log exception
    
                    }
                    //message received
    
                }
                catch (Exception ex)
                {
                   //logging error
                }
            }
    
            //release ANY instance that could be using memory
            _socket.Dispose();
            log = null;
    
  • Тема 2

    while (_socket.Connected) { пытаться{ _syncEvents.NewItemEventOut.WaitOne ();

                        if (_socket.Connected)
                        {
                            lock (((ICollection)_queue).SyncRoot)
                            {
    
                                total_queue = _queue.Count();
    
                            }
    
                            int i = 0;
                            while (i < total_queue)
                            {
                                //EMail Emails;
                                string mail = "";
                                lock (((ICollection)_queue).SyncRoot)
                                {
    
                                    mail = _queue.Dequeue();
    
                                    i = i + 1;
                                }
                                try
                                {
                                    conn.SendMessage(_socket, mail);
                                    _syncEvents.NewResetEvent.Set();
                                }
                                catch (SocketException ex)
                                {
                                    //log exception
                                }
                            }
                        }
                        else
                        {
                            //log exception
    
                            _syncEvents.NewAbortEvent.Set();
                            Thread.CurrentThread.Abort();
                        }
                    }
                    catch (InvalidOperationException e)
                    {
                        //log exception
                    }
                    catch (Exception e)
                    {
                        //log exception
                    }
            }
    
            //release ANY instance that could be using memory
            _socket.Dispose();
            conn = null;
            log = null;
    
  • Тема 3

    while (_socket.Connected) {

                    int total_queue = 0;
                    try
                {
                    _syncEvents.NewItemEvent.WaitOne();
                    lock (((ICollection) _queue).SyncRoot)
                    {
                        total_queue = _queue.Count();
                    }
                    int i = 0;
                    while (i < total_queue)
                    {
                        if (mgthreads.GetThreatdAct() <
    

    mgthreads.GetMaxThread ()) { string message = ""; блокировка (((ICollection) _queue) .SyncRoot) {

                                message = _queue.Dequeue();
                                i = i + 1;
    
                            }
                            count++;
                            lock (((ICollection) _queueO).SyncRoot)
                            {
                                app.SetParameters(_socket, _id,
    

    сообщение, _queueO, _syncEvents, _total_Env, _total_err); }

                            Thread producerThread = new
    

    Тема (app.ThreadJob) {Имя = "ProducerThread_" + DateTime.Now.ToString ( "ddMMyyyyhhmmss"), Priority = ThreadPriority.AboveNormal }; producerThread.Start ();

                            producerThread.Join();
    
                            mgthreads.IncThreatdAct(producerThread);
                        }
                        mgthreads.DecThreatdAct();
                    }
                    mgthreads.DecThreatdAct();
                }
                catch (InvalidOperationException e)
                {
    
                }
                catch (Exception e)
                {
    
                }
                Thread.Sleep(500);
            }
    
            //release ANY instance that could be using memory
            _socket.Dispose();
            app = null;
            log = null;
            mgthreads = null;
    
  • Тема 4

    MessageVO mesVo = fac.ParseMessageXml (_message);

Ответы [ 2 ]

0 голосов
/ 23 мая 2011

Обычно не должно быть сбоев из-за высокой загрузки процессора. В то время как любой из потоков ожидает что-то удаленное (например, чтобы удаленный сервер ответил на запрос), этот поток не использует ресурс ЦП. Но пока он что-то делает, он соответственно использует процессор. В упомянутой выше задаче отсутствует высокая загрузка ЦП (поскольку сохранение файла PCM в формате WAV не требует сложного алгоритма), поэтому высокая загрузка ЦП является признаком ошибки в программировании.

0 голосов
/ 23 мая 2011

Я бы понизил приоритет потока и сделал бы, чтобы все потоки проходили через семафор, который ограничивает параллелизм для Environment.ProcessorCount. Это не идеальное решение, но, похоже, в данном случае этого достаточно и это легко исправить.

Редактировать: думая об этом, вы должны сложить 10 сервисов в один процесс, потому что в противном случае у вас не будет централизованного контроля за запущенными потоками. Если у вас есть 10 независимых процессов, они не могут координировать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...