Могут ли задачи страдать от медленного планирования, даже если ThreadPool.GetAvailableThreads () указывает на изобилие? - PullRequest
0 голосов
/ 03 апреля 2019

У нас работает сервер, который получает данные, помещает их в queueA. Другой поток получает один элемент за раз из queueA, обрабатывает его и помещает в queueB.

Эта настройка фактически зеркально отражена, так что оба зеркальных сервера получают одинаковые данные и обрабатывают их одинаковым образом в настройке активного активного резервирования.

Примерно раз в год сообщения одного из серверов массово накапливаются в queueA, тогда как другой сервер прекрасно обрабатывает те же данные.

Таким образом, проблема, похоже, заключается в коде, который берет из queueA, обрабатывает и помещает в queueB. Там ничего особенного не происходит, кроме (совершенно ненужного) использования библиотеки задач, как показано в урезанной и упрощенной версии ниже.

public IAsyncResult BeginReceive()
{
  Task<object> task = new Task<object>(_ =>
  {
    object message;
    if (!queueA.TryDequeue(out message))
    {
      if (queueA.IsEmpty)
        waitQueueA.WaitOne(10); // waitQueueA is properly signaled whenever an item is put into queueA
    }
    return message;
  }, null, TaskCreationOptions.PreferFairness);

  task.ContinueWith((t) =>
  {
    object receivedMessage = t.Result;
    if (receivedMessage != null)
    {
      lock (bLock) // bLock is only used by this piece of code
      {
        queueB.Enqueue(receivedMessage);
      }
    }
    else
    {
      Thread.Sleep(1);
    }
    BeginReceive(OnReceive, channel);
  });

  task.Start();
  return task;
}

public object EndReceive(IAsyncResult result)
{
  Task<object> task = (Task<object>) result;
  return task.Result;
}

Игнорирование многих идиосинкразий кода (лично я создал бы для этого отдельный выделенный поток и выполнил бы все вышеперечисленное в одном большом цикле while (true) { } без участия Task), какие обстоятельства могут сделать он работает так плохо, что цикл вращается со скоростью в основном 15 итераций в секунду, максимально до 50 итераций в секунду? Мы регистрируем ThreadPool.GetAvailableThreads() каждые 5 секунд, и это указывает на тысячи доступных потоков по всему.

Этот код работает нормально (достаточно) большую часть времени, но когда он терпит неудачу, он, кажется, терпит неудачу с самого начала программы до ее конца, который находится в диапазоне часа или около того, когда память исчерпана на queueA и его предметы. Похоже, что программа может быть переведена в какое-то странное состояние, из которого она не сможет восстановиться.

Вот график количества элементов, обрабатываемых в секунду, для хорошей машины и плохой машины, горизонтальная ось - это ось времени. (Обратите внимание, что вертикальная ось является логарифмической)

enter image description here

Диаграмма показывает, например, что «плохой» сервер в большинстве случаев имеет верхний предел около 17/18 элементов в секунду, тогда как «хороший» сервер способен выполнять 3700 элементов в секунду в зависимости от скорости получения и ввода новых элементов. queueA.

Поскольку я не слишком знаком со сложностями библиотеки задач, мне интересно, может ли эта проблема вызвать комбинацию PreferFairness и асинхронного (и, следовательно, не очень) "рекурсивного" вызова BeginReceive, при некоторых случайных обстоятельствах. Любая другая идея, как добраться до сути этого?

N.B. Я удалил пару try {} catch { error.log(); } конструкций, чтобы упростить его. Нет зарегистрированных ошибок, поэтому я уверен, что код не генерирует исключения. И queueA не монотонно растет, иногда он немного уменьшается, так что цикл кажется живым, хотя и медленным.

1 Ответ

0 голосов
/ 04 апреля 2019

Здесь - очень хорошее чтение по теме.

Когда задание запланировано для планировщика по умолчанию, планировщик проверяет, является ли текущий поток, из которогоЗадача в очереди - это поток ThreadPool, имеющий собственную локальную очередь.Если это не так, рабочий элемент будет поставлен в очередь в глобальную очередь.Если это так, планировщик также проверит, содержит ли значение TaskCreationOptions для задачи флаг PreferFairness, который по умолчанию не включен.Если флаг установлен, даже если у потока есть собственная локальная очередь, планировщик все равно будет ставить задачу в очередь, а не в локальную очередь.Таким образом, эта задача будет считаться справедливой наряду со всеми другими рабочими элементами, поставленными в очередь в глобальном масштабе.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...