Как задачи в параллельной библиотеке задач влияют на ActivityID? - PullRequest
11 голосов
/ 03 декабря 2010

Перед использованием библиотеки параллельных задач я часто использовал CorrelationManager.ActivityId для отслеживания отчетов об отслеживании / ошибках с несколькими потоками.

ActivityId хранится в локальном хранилище потоков, поэтому каждый поток получает свою собственную копию,Идея состоит в том, что когда вы запускаете поток (действие), вы назначаете новый ActivityId.ActivityId будет записан в журналы с любой другой информацией о трассировке, что позволит выделить информацию трассировки для одного «Activity».Это действительно полезно с WCF, так как ActivityId может быть перенесен в компонент службы.

Вот пример того, о чем я говорю:

static void Main(string[] args)
{
    ThreadPool.QueueUserWorkItem(new WaitCallback((o) =>
    {
        DoWork();
    }));
}

static void DoWork()
{
    try
    {
        Trace.CorrelationManager.ActivityId = Guid.NewGuid();
        //The functions below contain tracing which logs the ActivityID.
        CallFunction1();
        CallFunction2();
        CallFunction3();
    }
    catch (Exception ex)
    {
        Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString());
    }
}

Теперь, с TPLНасколько я понимаю, несколько задач разделяют потоки.Означает ли это, что ActivityId склонен к повторной инициализации промежуточной задачи (другой задачей)?Существует ли новый механизм для отслеживания активности?

Ответы [ 2 ]

6 голосов
/ 15 декабря 2010

Я провел несколько экспериментов, и оказалось, что предположение в моем вопросе неверно - несколько задач, созданных с помощью TPL, не запускаются в одном потоке одновременно.

ThreadLocalStorage безопасен дляиспользовать с TPL в .NET 4.0, поскольку поток может использоваться только одной задачей за раз.

Предположение о том, что задачи могут совместно использовать потоки, было основано на интервью, которое я слышал о c # 5.0 on DotNetRocks (извините, я не могу вспомнить, какое шоу было) - поэтому мой вопрос может (или не может) стать актуальным в ближайшее время.

Мой эксперимент начинаетсяколичество заданий и записи, сколько заданий было выполнено, сколько времени они заняли и сколько потоков было использовано.Код ниже, если кто-то хотел бы повторить его.

class Program
{
    static void Main(string[] args)
    {
        int totalThreads = 100;
        TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
        Task task = null;
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();
        Task[] allTasks = new Task[totalThreads];
        for (int i = 0; i < totalThreads; i++)
        {
            task = Task.Factory.StartNew(() =>
           {
               DoLongRunningWork();
           }, taskCreationOpt);

            allTasks[i] = task;
        }

        Task.WaitAll(allTasks);
        stopwatch.Stop();

        Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
        Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
        Console.ReadKey();
    }


    private static List<int> threadIds = new List<int>();
    private static object locker = new object();
    private static void DoLongRunningWork()
    {
        lock (locker)
        {
            //Keep a record of the managed thread used.
            if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
                threadIds.Add(Thread.CurrentThread.ManagedThreadId);
        }
        Guid g1 = Guid.NewGuid();
        Trace.CorrelationManager.ActivityId = g1;
        Thread.Sleep(3000);
        Guid g2 = Trace.CorrelationManager.ActivityId;
        Debug.Assert(g1.Equals(g2));
    }
}

Вывод (конечно, это будет зависеть от компьютера) был:

Completed 100 tasks in 23097 milliseconds
Used 23 threads

Изменение taskCreationOpt на TaskCreationOptions.LongRunningдал разные результаты:

Completed 100 tasks in 3458 milliseconds 
Used 100 threads
3 голосов
/ 19 января 2011

Пожалуйста, прости, что я опубликовал это как ответ, так как это не совсем ответ на твой вопрос, однако, он связан с твоим вопросом, так как он касается поведения CorrelationManager и потоков / задач / и т. Д. Я рассматривал использование LogicalOperationStackStartLogicalOperation/StopLogicalOperation) метода CorrelationManager для обеспечения дополнительного контекста в многопоточных сценариях.

Я взял ваш пример и немного изменил его, чтобы добавить возможность выполнять работу параллельно, используя Parallel.For. Также я использую StartLogicalOperation/StopLogicalOperation для скобок (внутренне) DoLongRunningWork. Концептуально, DoLongRunningWork делает что-то подобное каждый раз, когда выполняется:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

Я обнаружил, что если я добавлю эти логические операции в ваш код (более или менее как есть), все логические операции останутся синхронизированными (всегда ожидаемое количество операций в стеке и значения операций в стеке всегда, как и ожидалось).

В некоторых моих собственных испытаниях я обнаружил, что это не всегда так. Стек логической операции становился «поврежденным». Лучшее объяснение, которое я мог бы придумать, заключается в том, что «слияние» задней части информации CallContext с «родительским» потоком при выходе из «дочернего» потока приводило к тому, что «старая» информация о контексте дочернего потока (логическая операция) была « наследуется "другим дочерним потоком одного брата.

Проблема также может быть связана с тем фактом, что Parallel.For, по-видимому, использует основной поток (по крайней мере, в коде примера, как написано) в качестве одного из «рабочих потоков» (или как там их следует вызывать в параллельном режиме). домен). Всякий раз, когда выполняется DoLongRunningWork, новая логическая операция запускается (в начале) и останавливается (в конце) (то есть помещается в LogicalOperationStack и откатывается назад от него). Если у основного потока уже есть действующая логическая операция, и если DoLongRunningWork выполняется В ОСНОВНОЙ РЕЗЬБЕ, то запускается новая логическая операция, поэтому в LogicalOperationStack основного потока теперь выполняется ДВА операции. Любые последующие исполнения DoLongRunningWork (пока эта «итерация» DoLongRunningWork выполняется в главном потоке) (по-видимому) будут наследовать LogicalOperationStack основного потока (который теперь имеет две операции, а не только одну ожидаемую операцию).

Мне потребовалось много времени, чтобы понять, почему поведение LogicalOperationStack в моем примере отличается от моего модифицированного варианта вашего примера. Наконец, я увидел, что в своем коде я заключил в скобки всю программу в виде логической операции, тогда как в моей модифицированной версии тестовой программы я этого не сделал. Подразумевается, что в моей тестовой программе каждый раз, когда выполнялась моя «работа» (аналог DoLongRunningWork), уже действовала логическая операция. В моей модифицированной версии вашей тестовой программы я не заключил в скобки всю программу в логической операции.

Итак, когда я изменил вашу тестовую программу, чтобы заключить в нее всю программу в логической операции И если я использую Parallel.For, я столкнулся с точно такой же проблемой.

Используя приведенную выше концептуальную модель, она будет успешно работать:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

Хотя это в конечном итоге будет подтверждено из-за несинхронизации LogicalOperationStack:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

Вот мой пример программы. Он похож на ваш в том, что у него есть метод DoLongRunningWork, который управляет ActivityId, а также LogicalOperationStack. У меня также есть два варианта пинания DoLongRunningWork. В одном аромате используются задачи, в другом - Parallel.For. Каждый вариант также может быть выполнен так, что вся параллельная операция заключена в логическую операцию или нет. Таким образом, существует всего 4 способа выполнения параллельной операции. Чтобы попробовать каждый из них, просто раскомментируйте нужный метод «Использовать ...», перекомпилируйте и запустите. UseTasks, UseTasks(true) и UseParallelFor должны выполняться до конца. UseParallelFor(true) будет утверждаться в какой-то момент, потому что LogicalOperationStack не имеет ожидаемого количества записей.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

Весь этот вопрос о том, можно ли использовать LogicalOperationStack с Parallel.For (и / или другими потоками / конструкциями задач) или как его можно использовать, вероятно, заслуживает отдельного вопроса.Может быть, я отправлю вопрос.В то же время мне интересно, есть ли у вас какие-либо мысли по этому поводу (или, если вам интересно, вы рассматривали возможность использования LogicalOperationStack, поскольку ActivityId представляется безопасным).

[EDIT]

См. Мой ответ этот вопрос для получения дополнительной информации об использовании LogicalOperationStack и / или CallContext.LogicalSetData с некоторыми из различных consttructs Thread / ThreadPool / Task / Parallel.

См. также мой вопрос здесь о SO для LogicalOperationStackи параллельные расширения: Совместим ли CorrelationManager.LogicalOperationStack с Parallel.For, задачами, потоками и т. д.

Наконец, см. также мой вопрос здесь, на форуме Microsoft по параллельным расширениям: http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

В моем тестировании похоже, что Trace.CorrelationManager.LogicalOperationStack может быть поврежден при использовании Parallel.For или Parallel.Invoke, если вы запускаете логическую операцию в главном потоке, а затем запускаете / останавливаете логические операции в делегате.В моих тестах (см. Одну из двух ссылок выше) LogicalOperationStack всегда должен содержать ровно 2 записи при выполнении DoLongRunningWork (если я запускаю логическую операцию в главном потоке перед тем, как запустить DoLongRunningWork с использованием различных методов).Таким образом, под "поврежденным" я подразумеваю, что в конечном итоге LogicalOperationStack будет иметь более двух записей.

Из того, что я могу сказать, это, вероятно, связано с тем, что Parallel.For и Parallel.Invoke используют основной поток в качестве одного из «рабочих» потоков для выполнения действия DoLongRunningWork.

Использование стека, хранящегося в CallContext.LogicalSetData, чтобы имитировать поведение LogicalOperationStack (аналогично LogicalThreadContext.Stacks log4net, который сохраняется через CallContext.SetData), дает еще худшие результаты.Если я использую такой стек для поддержания контекста, он становится поврежденным (то есть не имеет ожидаемого количества записей) почти во всех сценариях, где у меня есть «логическая операция» в главном потоке и логическая операция в каждой итерации/ выполнение делегата DoLongRunningWork.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...