У нас работает сервер, который получает данные, помещает их в queueA
. Другой поток получает один элемент за раз из queueA
, обрабатывает его и помещает в queueB
.
Эта настройка фактически зеркально отражена, так что оба зеркальных сервера получают одинаковые данные и обрабатывают их одинаковым образом в настройке активного активного резервирования.
Примерно раз в год сообщения одного из серверов массово накапливаются в queueA
, тогда как другой сервер прекрасно обрабатывает те же данные.
Таким образом, проблема, похоже, заключается в коде, который берет из queueA
, обрабатывает и помещает в queueB
. Там ничего особенного не происходит, кроме (совершенно ненужного) использования библиотеки задач, как показано в урезанной и упрощенной версии ниже.
public IAsyncResult BeginReceive()
{
Task<object> task = new Task<object>(_ =>
{
object message;
if (!queueA.TryDequeue(out message))
{
if (queueA.IsEmpty)
waitQueueA.WaitOne(10); // waitQueueA is properly signaled whenever an item is put into queueA
}
return message;
}, null, TaskCreationOptions.PreferFairness);
task.ContinueWith((t) =>
{
object receivedMessage = t.Result;
if (receivedMessage != null)
{
lock (bLock) // bLock is only used by this piece of code
{
queueB.Enqueue(receivedMessage);
}
}
else
{
Thread.Sleep(1);
}
BeginReceive(OnReceive, channel);
});
task.Start();
return task;
}
public object EndReceive(IAsyncResult result)
{
Task<object> task = (Task<object>) result;
return task.Result;
}
Игнорирование многих идиосинкразий кода (лично я создал бы для этого отдельный выделенный поток и выполнил бы все вышеперечисленное в одном большом цикле while (true) { }
без участия Task
), какие обстоятельства могут сделать он работает так плохо, что цикл вращается со скоростью в основном 15 итераций в секунду, максимально до 50 итераций в секунду? Мы регистрируем ThreadPool.GetAvailableThreads()
каждые 5 секунд, и это указывает на тысячи доступных потоков по всему.
Этот код работает нормально (достаточно) большую часть времени, но когда он терпит неудачу, он, кажется, терпит неудачу с самого начала программы до ее конца, который находится в диапазоне часа или около того, когда память исчерпана на queueA
и его предметы. Похоже, что программа может быть переведена в какое-то странное состояние, из которого она не сможет восстановиться.
Вот график количества элементов, обрабатываемых в секунду, для хорошей машины и плохой машины, горизонтальная ось - это ось времени. (Обратите внимание, что вертикальная ось является логарифмической)

Диаграмма показывает, например, что «плохой» сервер в большинстве случаев имеет верхний предел около 17/18 элементов в секунду, тогда как «хороший» сервер способен выполнять 3700 элементов в секунду в зависимости от скорости получения и ввода новых элементов. queueA
.
Поскольку я не слишком знаком со сложностями библиотеки задач, мне интересно, может ли эта проблема вызвать комбинацию PreferFairness
и асинхронного (и, следовательно, не очень) "рекурсивного" вызова BeginReceive
, при некоторых случайных обстоятельствах. Любая другая идея, как добраться до сути этого?
N.B. Я удалил пару try {} catch { error.log(); }
конструкций, чтобы упростить его. Нет зарегистрированных ошибок, поэтому я уверен, что код не генерирует исключения. И queueA
не монотонно растет, иногда он немного уменьшается, так что цикл кажется живым, хотя и медленным.