Утечка / сбой ресурса Threadpool / WaitHandle - PullRequest
0 голосов
/ 17 июля 2010

Я думаю, что мне может понадобиться переосмыслить свой дизайн.Я с трудом сужаю ошибку, из-за которой мой компьютер полностью зависает, иногда выбрасывая HRESULT 0x8007000E от VS 2010.

У меня есть консольное приложение (которое я позже преобразую в службу)это обрабатывает передачу файлов на основе очереди базы данных.

Я регулирую потоки, разрешенные для передачи.Это связано с тем, что некоторые системы, к которым мы подключаемся, могут содержать только определенное количество подключений от определенных учетных записей.

Например, система A может принимать только 3 одновременных подключения (что означает 3 отдельных потока).Каждый из этих потоков имеет свой уникальный объект соединения, поэтому нам не следует сталкиваться с какими-либо проблемами синхронизации, поскольку они не разделяют соединение.

Мы хотим обрабатывать файлы из этих систем циклично.Так, например, мы допустим 3 соединения, которые могут передавать до 100 файлов за соединение.Это означает, что для перемещения 1000 файлов из системы A мы можем обрабатывать только 300 файлов за цикл, поскольку разрешено 3 потока по 100 файлов в каждом.Таким образом, за время существования этого перевода у нас будет 10 потоков.Мы можем запустить только 3 одновременно.Таким образом, будет 3 цикла, и последний цикл будет использовать только 1 поток для передачи последних 100 файлов.(3 потока x 100 файлов = 300 файлов за цикл)

Текущая примерная архитектура:

  1. System.Threading.Timer проверяет очередь каждые 5 секунд, чтобы что-то сделатьвызывая GetScheduledTask ()
  2. Если нет ничего, GetScheduledTask () просто ничего не делает
  3. Если есть работа, создайте поток ThreadPool для обработки работы [Рабочий поток A]
  4. Рабочая нить A видит, что для передачи имеется 1000 файлов
  5. Рабочая нить A видит, что в систему может быть запущено только 3 потока, из которых она получает файлы
  6. Рабочая нить A запускает триновые рабочие потоки [B, C, D] и передачи
  7. рабочий поток A ожидает B, C, D [WaitHandle.WaitAll(transfersArray)]
  8. рабочий поток A видит, что в очереди еще больше файлов(сейчас должно быть 700)
  9. Рабочий поток A создает новый массив для ожидания на [transfersArray = new TransferArray[3], что является максимумом для системы A, но может варьироваться в системе
  10. Рабочий поток A запускает три новыхрабочие потоки [B, C, D] и их ожидание [WaitHandle.WaitAll(transfersArray)]
  11. Процесс повторяется до тех пор, пока не останется файлов для перемещения.
  12. Рабочая нить A сигнализирует о том, что это сделано

Я использую ManualResetEvent для обработкисигнализация.

Мои вопросы:

  1. Существуют ли какие-либо явные обстоятельства, которые могут вызвать утечку ресурса или проблему, с которой я столкнулся?
  2. Должен ли я выполнить цикл черезмассив после каждого WaitHandle.WaitAll(array) и вызова array[index].Dispose()?
  3. Количество дескрипторов в диспетчере задач для этого процесса медленно увеличивается
  4. Я вызываю первоначальное создание рабочего потока A из системы.Threading.Timer.Будут ли какие-то проблемы с этим?Код для этого таймера:

(некоторый код класса для планирования)

private ManualResetEvent _ResetEvent;

private void Start()
{
    _IsAlive = true;
    ManualResetEvent transferResetEvent = new ManualResetEvent(false);
    //Set the scheduler timer to 5 second intervals
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}

private void ScheduledTasks_Tick(object state)
{
    ManualResetEvent resetEvent = null;
    try
    {
        resetEvent = (ManualResetEvent)state;
        //Block timer until GetScheduledTasks() finishes
        _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
        GetScheduledTasks();
    }
    finally
    {
        _ScheduledTasks.Change(5000, 5000);
        Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
        resetEvent.Set();
    }
}


private void GetScheduledTask()
{
    try 
    { 
        //Check to see if the database connection is still up
        if (!_IsAlive)
        {
            //Handle
            _ConnectionLostNotification = true;
            return;
        }

        //Get scheduled records from the database
        ISchedulerTask task = null;

        using (DataTable dt = FastSql.ExecuteDataTable(
                _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
                new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
        {
            if (dt != null)
            {
                if (dt.Rows.Count == 1)
                {  //Only 1 row is allowed
                    DataRow dr = dt.Rows[0];

                    //Get task information
                    TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
                    task = ScheduledTaskFactory.CreateScheduledTask(taskType);

                    task.Description = dr["Description"].ToString();
                    task.IsEnabled = (bool)dr["IsEnabled"];
                    task.IsProcessing = (bool)dr["IsProcessing"];
                    task.IsManualLaunch = (bool)dr["IsManualLaunch"];
                    task.ProcessMachineName = dr["ProcessMachineName"].ToString();
                    task.NextRun = (DateTime)dr["NextRun"];
                    task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
                    task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
                    task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
                    task.SleepMinutes = (int)dr["SleepMinutes"];
                    task.ScheduleId = (int)dr["ScheduleId"];
                    task.CurrentRuns = (int)dr["CurrentRuns"];
                    task.TotalRuns = (int)dr["TotalRuns"];

                    SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
                    //Queue up task to worker thread and start
                    ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);     
                }
            }
        }

    }
    catch (Exception ex)
    {
        //Handle
    }
}

private void ThreadProc(object taskObject)
{
    SchedulerTask task = (SchedulerTask)taskObject;
    ScheduledTaskEngine engine = null;
    try
    {
        engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
        engine.StartTask(task.Task);    
    }
    catch (Exception ex)
    {
        //Handle
    }
    finally
    {
        task.TaskResetEvent.Set();
        task.TaskResetEvent.Dispose();
    }
}

Ответы [ 4 ]

2 голосов
/ 17 июля 2010

0x8007000E - ошибка нехватки памяти. Это и количество дескрипторов указывают на утечку ресурсов. Убедитесь, что вы избавляетесь от каждого объекта, который реализует IDisposable. Это включает в себя массивы ManualResetEvent s, которые вы используете.

Если у вас есть время, вы можете также использовать класс .NET 4.0 Task; он был разработан для более сложных сценариев, таких как этот. Определив дочерние Task объекты, вы можете уменьшить общее количество потоков (потоки довольно дороги не только из-за планирования, но и из-за их стекового пространства).

1 голос
/ 26 января 2011

Я ищу ответы на аналогичную проблему (количество обращений увеличивается со временем).

Я взглянул на архитектуру вашего приложения и хотел бы предложить вам кое-что, что может вам помочь:

Слышали ли вы о IOCP (входных и выходных портах завершения).

Я не уверен в сложности реализации этого с использованием C #, но в C / C ++ это просто.Используя это, вы создаете уникальный пул потоков (количество потоков в этом пуле обычно определяется как 2 x количество процессоров или ядер процессоров на ПК или сервере). Этот пул связывается с дескриптором IOCP, и пул выполняетРабота.См. Справку по следующим функциям: CreateIoCompletionPort ();PostQueuedCompletionStatus ();GetQueuedCompletionStatus ();

В общем случае создание и выход из потоков на лету могут занимать много времени и приводить к снижению производительности и фрагментации памяти.Есть тысячи литературы о IOCP в MSDN и в Google.

0 голосов
/ 13 сентября 2010

Оказывается, источник этой странной проблемы не был связан с архитектурой, а скорее из-за преобразования решения с 3,5 на 4,0.Я заново создал решение, не выполняя никаких изменений кода, и проблема больше не возникала.

0 голосов
/ 17 июля 2010

Я думаю, вы должны пересмотреть свою архитектуру в целом.Тот факт, что вы можете иметь только 3 одновременных соединения, почти умоляет вас использовать 1 поток для генерации списка файлов и 3 потока для их обработки.Поток вашего производителя вставит все файлы в очередь, а 3 потока потребителя выведут из очереди и продолжат обработку по мере поступления элементов в очередь.Очередь блокировки может значительно упростить код.Если вы используете .NET 4.0, вы можете воспользоваться классом BlockingCollection .

public class Example
{
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

    public void Start()
    {
        var threads = new Thread[] 
            { 
                new Thread(Producer), 
                new Thread(Consumer), 
                new Thread(Consumer), 
                new Thread(Consumer) 
            };
        foreach (Thread thread in threads)
        {
            thread.Start();
        }
    }

    private void Producer()
    {
        while (true)
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
            ScheduledTask task = GetScheduledTask();
            if (task != null)
            {
                foreach (string file in task.Files)
                {
                    m_Queue.Add(task);
                }
            }
        }
    }

    private void Consumer()
    {
        // Make a connection to the resource that is assigned to this thread only.
        while (true)
        {
            string file = m_Queue.Take();
            // Process the file.
        }
    }
}

В приведенном выше примере я определенно упростил некоторые вещи, но я надеюсь, что вы поймете общую идею,Обратите внимание, что это намного проще, так как не так много способов синхронизации потоков (большинство из них будет встроено в очередь блокировки) и, конечно, нет использования WaitHandle объектов.Очевидно, что вам нужно было бы добавить правильные механизмы, чтобы корректно завершать потоки, но это должно быть довольно просто.

...