У меня возникла проблема с управлением потоками в .Net 4.0 C #, и моих знаний о потоках недостаточно для ее решения, поэтому я разместил ее здесь, ожидая, что кто-нибудь может дать мне какой-нибудь совет, пожалуйста.
Сценарий следующий:
У нас есть служба Windows на C # framework 4.0, которая (1) подключается через сокет к серверу для получения файла .PCM, (2) затем преобразует его в файл .WAV, (3) отправляет его по электронной почте - SMTP и наконец (4) уведомить исходный сервер об успешной отправке.
Сервер, на котором была установлена служба, имеет 8 процессоров и 8 ГБ или ОЗУ.
Чтобы разрешить многопроцессорность, я построил сервис с 4-мя потоками, каждый из которых выполняет каждую задачу, о которой я упоминал ранее.
В коде у меня есть классы и методы для каждой задачи, поэтому я создаю потоки и вызываю методы следующим образом:
Thread eachThread = new Thread(object.PerformTask);
Внутри каждого метода у меня есть while, которое проверяет, живо ли соединение сокета, и продолжает выборку или обработку данных в зависимости от их типа.
while (_socket.Connected){
//perform task
}
Проблема в том, что по мере установки большего количества служб (одна и та же служба Windows реплицируется и направляется между двумя конечными точками на сервере для получения файлов через сокет), потребление ЦП резко увеличивается, каждая служба продолжает работать и обрабатывать файлы, но момент, когда потребление ресурсов процессора слишком велико, и сервер просто рухнул.
Вопрос: что бы вы предложили мне для обработки этого сценария, я имею в виду в общих чертах, что может быть хорошим подходом для обработки этих чрезвычайно востребованных задач обработки, чтобы избежать коллапса сервера при загрузке процессора?
Спасибо.
PS .: Если кому-то нужна более подробная информация о сценарии, пожалуйста, дайте мне знать.
Редактировать 1
При сбое CPU я имею в виду, что сервер работает слишком медленно, и мы должны его перезапустить.
Редактировать 2
Здесь я публикую некоторую часть кода, чтобы вы могли понять, как он запрограммирован:
while(true){
//starting the service
try
{
IPEndPoint endPoint = conn.SettingConnection();
string id = _objProp.Parametros.IdApp;
using (socket = conn.Connect(endPoint))
{
while (!socket.Connected)
{
_log.SetLog("INFO", "Conectando socket...");
socket = conn.Connect(endPoint);
//if the connection failed, wait 5 seconds for a new try.
if (!socket.Connected)
{
Thread.Sleep(5000);
}
}
proInThread = new Thread(proIn.ThreadRun);
conInThread = new Thread(conIn.ThreadRun);
conOutThread = new Thread(conOut.ThreadRun);
proInThread.Start();
conInThread.Start();
conOutThread.Start();
proInThread.Join();
conInThread.Join();
conOutThread.Join();
}
}
}
Редактировать 3
Тема 1
while (_socket.Connected)
{
пытаться
{
var conn = new AppConection (ref _objPropiedades);
try
{
string message = conn.ReceiveMessage(_socket);
lock (((ICollection)_queue).SyncRoot)
{
_queue.Enqueue(message);
_syncEvents.NewItemEvent.Set();
_syncEvents.NewResetEvent.Set();
}
lock (((ICollection)_total_rec).SyncRoot)
{
_total_rec.Add("1");
}
}
catch (SocketException ex)
{
//log exception
}
catch (IndexOutOfRangeException ex)
{
//log exception
}
catch (Exception ex)
{
//log exception
}
//message received
}
catch (Exception ex)
{
//logging error
}
}
//release ANY instance that could be using memory
_socket.Dispose();
log = null;
Тема 2
while (_socket.Connected)
{
пытаться{
_syncEvents.NewItemEventOut.WaitOne ();
if (_socket.Connected)
{
lock (((ICollection)_queue).SyncRoot)
{
total_queue = _queue.Count();
}
int i = 0;
while (i < total_queue)
{
//EMail Emails;
string mail = "";
lock (((ICollection)_queue).SyncRoot)
{
mail = _queue.Dequeue();
i = i + 1;
}
try
{
conn.SendMessage(_socket, mail);
_syncEvents.NewResetEvent.Set();
}
catch (SocketException ex)
{
//log exception
}
}
}
else
{
//log exception
_syncEvents.NewAbortEvent.Set();
Thread.CurrentThread.Abort();
}
}
catch (InvalidOperationException e)
{
//log exception
}
catch (Exception e)
{
//log exception
}
}
//release ANY instance that could be using memory
_socket.Dispose();
conn = null;
log = null;
Тема 3
while (_socket.Connected)
{
int total_queue = 0;
try
{
_syncEvents.NewItemEvent.WaitOne();
lock (((ICollection) _queue).SyncRoot)
{
total_queue = _queue.Count();
}
int i = 0;
while (i < total_queue)
{
if (mgthreads.GetThreatdAct() <
mgthreads.GetMaxThread ())
{
string message = "";
блокировка (((ICollection) _queue) .SyncRoot)
{
message = _queue.Dequeue();
i = i + 1;
}
count++;
lock (((ICollection) _queueO).SyncRoot)
{
app.SetParameters(_socket, _id,
сообщение, _queueO, _syncEvents,
_total_Env, _total_err);
}
Thread producerThread = new
Тема (app.ThreadJob) {Имя =
"ProducerThread_" +
DateTime.Now.ToString ( "ddMMyyyyhhmmss"),
Priority = ThreadPriority.AboveNormal
};
producerThread.Start ();
producerThread.Join();
mgthreads.IncThreatdAct(producerThread);
}
mgthreads.DecThreatdAct();
}
mgthreads.DecThreatdAct();
}
catch (InvalidOperationException e)
{
}
catch (Exception e)
{
}
Thread.Sleep(500);
}
//release ANY instance that could be using memory
_socket.Dispose();
app = null;
log = null;
mgthreads = null;
Тема 4
MessageVO mesVo =
fac.ParseMessageXml (_message);