Пожалуйста, прости, что я опубликовал это как ответ, так как это не совсем ответ на твой вопрос, однако, он связан с твоим вопросом, так как он касается поведения CorrelationManager и потоков / задач / и т. Д. Я рассматривал использование LogicalOperationStack
(и StartLogicalOperation/StopLogicalOperation
) метода CorrelationManager для обеспечения дополнительного контекста в многопоточных сценариях.
Я взял ваш пример и немного изменил его, чтобы добавить возможность выполнять работу параллельно, используя Parallel.For. Также я использую StartLogicalOperation/StopLogicalOperation
для скобок (внутренне) DoLongRunningWork
. Концептуально, DoLongRunningWork
делает что-то подобное каждый раз, когда выполняется:
DoLongRunningWork
StartLogicalOperation
Thread.Sleep(3000)
StopLogicalOperation
Я обнаружил, что если я добавлю эти логические операции в ваш код (более или менее как есть), все логические операции останутся синхронизированными (всегда ожидаемое количество операций в стеке и значения операций в стеке всегда, как и ожидалось).
В некоторых моих собственных испытаниях я обнаружил, что это не всегда так. Стек логической операции становился «поврежденным». Лучшее объяснение, которое я мог бы придумать, заключается в том, что «слияние» задней части информации CallContext с «родительским» потоком при выходе из «дочернего» потока приводило к тому, что «старая» информация о контексте дочернего потока (логическая операция) была « наследуется "другим дочерним потоком одного брата.
Проблема также может быть связана с тем фактом, что Parallel.For, по-видимому, использует основной поток (по крайней мере, в коде примера, как написано) в качестве одного из «рабочих потоков» (или как там их следует вызывать в параллельном режиме). домен). Всякий раз, когда выполняется DoLongRunningWork, новая логическая операция запускается (в начале) и останавливается (в конце) (то есть помещается в LogicalOperationStack и откатывается назад от него). Если у основного потока уже есть действующая логическая операция, и если DoLongRunningWork выполняется В ОСНОВНОЙ РЕЗЬБЕ, то запускается новая логическая операция, поэтому в LogicalOperationStack основного потока теперь выполняется ДВА операции. Любые последующие исполнения DoLongRunningWork (пока эта «итерация» DoLongRunningWork выполняется в главном потоке) (по-видимому) будут наследовать LogicalOperationStack основного потока (который теперь имеет две операции, а не только одну ожидаемую операцию).
Мне потребовалось много времени, чтобы понять, почему поведение LogicalOperationStack в моем примере отличается от моего модифицированного варианта вашего примера. Наконец, я увидел, что в своем коде я заключил в скобки всю программу в виде логической операции, тогда как в моей модифицированной версии тестовой программы я этого не сделал. Подразумевается, что в моей тестовой программе каждый раз, когда выполнялась моя «работа» (аналог DoLongRunningWork), уже действовала логическая операция. В моей модифицированной версии вашей тестовой программы я не заключил в скобки всю программу в логической операции.
Итак, когда я изменил вашу тестовую программу, чтобы заключить в нее всю программу в логической операции И если я использую Parallel.For, я столкнулся с точно такой же проблемой.
Используя приведенную выше концептуальную модель, она будет успешно работать:
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
Хотя это в конечном итоге будет подтверждено из-за несинхронизации LogicalOperationStack:
StartLogicalOperation
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
StopLogicalOperation
Вот мой пример программы. Он похож на ваш в том, что у него есть метод DoLongRunningWork, который управляет ActivityId, а также LogicalOperationStack. У меня также есть два варианта пинания DoLongRunningWork. В одном аромате используются задачи, в другом - Parallel.For. Каждый вариант также может быть выполнен так, что вся параллельная операция заключена в логическую операцию или нет. Таким образом, существует всего 4 способа выполнения параллельной операции. Чтобы попробовать каждый из них, просто раскомментируйте нужный метод «Использовать ...», перекомпилируйте и запустите. UseTasks
, UseTasks(true)
и UseParallelFor
должны выполняться до конца. UseParallelFor(true)
будет утверждаться в какой-то момент, потому что LogicalOperationStack не имеет ожидаемого количества записей.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace CorrelationManagerParallelTest
{
class Program
{
static void Main(string[] args)
{
//UseParallelFor(true) will assert because LogicalOperationStack will not have expected
//number of entries, all others will run to completion.
UseTasks(); //Equivalent to original test program with only the parallelized
//operation bracketed in logical operation.
////UseTasks(true); //Bracket entire UseTasks method in logical operation
////UseParallelFor(); //Equivalent to original test program, but use Parallel.For
//rather than Tasks. Bracket only the parallelized
//operation in logical operation.
////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
}
private static List<int> threadIds = new List<int>();
private static object locker = new object();
private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;
private static int mainThreadUsedInDelegate = 0;
// baseCount is the expected number of entries in the LogicalOperationStack
// at the time that DoLongRunningWork starts. If the entire operation is bracketed
// externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise,
// it will be 0.
private static void DoLongRunningWork(int baseCount)
{
lock (locker)
{
//Keep a record of the managed thread used.
if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
mainThreadUsedInDelegate++;
}
}
Guid lo1 = Guid.NewGuid();
Trace.CorrelationManager.StartLogicalOperation(lo1);
Guid g1 = Guid.NewGuid();
Trace.CorrelationManager.ActivityId = g1;
Thread.Sleep(3000);
Guid g2 = Trace.CorrelationManager.ActivityId;
Debug.Assert(g1.Equals(g2));
//This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
//in effect when the Parallel.For operation was started.
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));
Trace.CorrelationManager.StopLogicalOperation();
}
private static void UseTasks(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
Task task = null;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Task[] allTasks = new Task[totalThreads];
for (int i = 0; i < totalThreads; i++)
{
task = Task.Factory.StartNew(() =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
}, taskCreationOpt);
allTasks[i] = task;
}
Task.WaitAll(allTasks);
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
private static void UseParallelFor(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Parallel.For(0, totalThreads, i =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
});
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
}
}
Весь этот вопрос о том, можно ли использовать LogicalOperationStack с Parallel.For (и / или другими потоками / конструкциями задач) или как его можно использовать, вероятно, заслуживает отдельного вопроса.Может быть, я отправлю вопрос.В то же время мне интересно, есть ли у вас какие-либо мысли по этому поводу (или, если вам интересно, вы рассматривали возможность использования LogicalOperationStack, поскольку ActivityId представляется безопасным).
[EDIT]
См. Мой ответ этот вопрос для получения дополнительной информации об использовании LogicalOperationStack и / или CallContext.LogicalSetData с некоторыми из различных consttructs Thread / ThreadPool / Task / Parallel.
См. также мой вопрос здесь о SO для LogicalOperationStackи параллельные расширения: Совместим ли CorrelationManager.LogicalOperationStack с Parallel.For, задачами, потоками и т. д.
Наконец, см. также мой вопрос здесь, на форуме Microsoft по параллельным расширениям: http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9
В моем тестировании похоже, что Trace.CorrelationManager.LogicalOperationStack может быть поврежден при использовании Parallel.For или Parallel.Invoke, если вы запускаете логическую операцию в главном потоке, а затем запускаете / останавливаете логические операции в делегате.В моих тестах (см. Одну из двух ссылок выше) LogicalOperationStack всегда должен содержать ровно 2 записи при выполнении DoLongRunningWork (если я запускаю логическую операцию в главном потоке перед тем, как запустить DoLongRunningWork с использованием различных методов).Таким образом, под "поврежденным" я подразумеваю, что в конечном итоге LogicalOperationStack будет иметь более двух записей.
Из того, что я могу сказать, это, вероятно, связано с тем, что Parallel.For и Parallel.Invoke используют основной поток в качестве одного из «рабочих» потоков для выполнения действия DoLongRunningWork.
Использование стека, хранящегося в CallContext.LogicalSetData, чтобы имитировать поведение LogicalOperationStack (аналогично LogicalThreadContext.Stacks log4net, который сохраняется через CallContext.SetData), дает еще худшие результаты.Если я использую такой стек для поддержания контекста, он становится поврежденным (то есть не имеет ожидаемого количества записей) почти во всех сценариях, где у меня есть «логическая операция» в главном потоке и логическая операция в каждой итерации/ выполнение делегата DoLongRunningWork.