Я думаю, что мне может понадобиться переосмыслить свой дизайн.Я с трудом сужаю ошибку, из-за которой мой компьютер полностью зависает, иногда выбрасывая HRESULT 0x8007000E от VS 2010.
У меня есть консольное приложение (которое я позже преобразую в службу)это обрабатывает передачу файлов на основе очереди базы данных.
Я регулирую потоки, разрешенные для передачи.Это связано с тем, что некоторые системы, к которым мы подключаемся, могут содержать только определенное количество подключений от определенных учетных записей.
Например, система A может принимать только 3 одновременных подключения (что означает 3 отдельных потока).Каждый из этих потоков имеет свой уникальный объект соединения, поэтому нам не следует сталкиваться с какими-либо проблемами синхронизации, поскольку они не разделяют соединение.
Мы хотим обрабатывать файлы из этих систем циклично.Так, например, мы допустим 3 соединения, которые могут передавать до 100 файлов за соединение.Это означает, что для перемещения 1000 файлов из системы A мы можем обрабатывать только 300 файлов за цикл, поскольку разрешено 3 потока по 100 файлов в каждом.Таким образом, за время существования этого перевода у нас будет 10 потоков.Мы можем запустить только 3 одновременно.Таким образом, будет 3 цикла, и последний цикл будет использовать только 1 поток для передачи последних 100 файлов.(3 потока x 100 файлов = 300 файлов за цикл)
Текущая примерная архитектура:
- System.Threading.Timer проверяет очередь каждые 5 секунд, чтобы что-то сделатьвызывая GetScheduledTask ()
- Если нет ничего, GetScheduledTask () просто ничего не делает
- Если есть работа, создайте поток ThreadPool для обработки работы [Рабочий поток A]
- Рабочая нить A видит, что для передачи имеется 1000 файлов
- Рабочая нить A видит, что в систему может быть запущено только 3 потока, из которых она получает файлы
- Рабочая нить A запускает триновые рабочие потоки [B, C, D] и передачи
- рабочий поток A ожидает B, C, D
[WaitHandle.WaitAll(transfersArray)]
- рабочий поток A видит, что в очереди еще больше файлов(сейчас должно быть 700)
- Рабочий поток A создает новый массив для ожидания на
[transfersArray = new TransferArray[3]
, что является максимумом для системы A, но может варьироваться в системе - Рабочий поток A запускает три новыхрабочие потоки [B, C, D] и их ожидание
[WaitHandle.WaitAll(transfersArray)]
- Процесс повторяется до тех пор, пока не останется файлов для перемещения.
- Рабочая нить A сигнализирует о том, что это сделано
Я использую ManualResetEvent для обработкисигнализация.
Мои вопросы:
- Существуют ли какие-либо явные обстоятельства, которые могут вызвать утечку ресурса или проблему, с которой я столкнулся?
- Должен ли я выполнить цикл черезмассив после каждого
WaitHandle.WaitAll(array)
и вызова array[index].Dispose()?
- Количество дескрипторов в диспетчере задач для этого процесса медленно увеличивается
- Я вызываю первоначальное создание рабочего потока A из системы.Threading.Timer.Будут ли какие-то проблемы с этим?Код для этого таймера:
(некоторый код класса для планирования)
private ManualResetEvent _ResetEvent;
private void Start()
{
_IsAlive = true;
ManualResetEvent transferResetEvent = new ManualResetEvent(false);
//Set the scheduler timer to 5 second intervals
_ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}
private void ScheduledTasks_Tick(object state)
{
ManualResetEvent resetEvent = null;
try
{
resetEvent = (ManualResetEvent)state;
//Block timer until GetScheduledTasks() finishes
_ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
GetScheduledTasks();
}
finally
{
_ScheduledTasks.Change(5000, 5000);
Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
resetEvent.Set();
}
}
private void GetScheduledTask()
{
try
{
//Check to see if the database connection is still up
if (!_IsAlive)
{
//Handle
_ConnectionLostNotification = true;
return;
}
//Get scheduled records from the database
ISchedulerTask task = null;
using (DataTable dt = FastSql.ExecuteDataTable(
_ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
{
if (dt != null)
{
if (dt.Rows.Count == 1)
{ //Only 1 row is allowed
DataRow dr = dt.Rows[0];
//Get task information
TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
task = ScheduledTaskFactory.CreateScheduledTask(taskType);
task.Description = dr["Description"].ToString();
task.IsEnabled = (bool)dr["IsEnabled"];
task.IsProcessing = (bool)dr["IsProcessing"];
task.IsManualLaunch = (bool)dr["IsManualLaunch"];
task.ProcessMachineName = dr["ProcessMachineName"].ToString();
task.NextRun = (DateTime)dr["NextRun"];
task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
task.SleepMinutes = (int)dr["SleepMinutes"];
task.ScheduleId = (int)dr["ScheduleId"];
task.CurrentRuns = (int)dr["CurrentRuns"];
task.TotalRuns = (int)dr["TotalRuns"];
SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
//Queue up task to worker thread and start
ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);
}
}
}
}
catch (Exception ex)
{
//Handle
}
}
private void ThreadProc(object taskObject)
{
SchedulerTask task = (SchedulerTask)taskObject;
ScheduledTaskEngine engine = null;
try
{
engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
engine.StartTask(task.Task);
}
catch (Exception ex)
{
//Handle
}
finally
{
task.TaskResetEvent.Set();
task.TaskResetEvent.Dispose();
}
}