Совместим ли CorrelationManager.LogicalOperationStack с Parallel.For, задачами, потоками и т. Д. - PullRequest
9 голосов
/ 19 января 2011

См. Этот вопрос для справочной информации:

Как задачи в параллельной библиотеке задач влияют на ActivityID?

Этот вопрос задает вопрос, как задачи влияют на Trace.CorrelationManager.ActivityId .@Greg Samson ответил на свой вопрос тестовой программой, показывающей, что ActivityId надежен в контексте задач.Тестовая программа устанавливает ActivityId в начале делегата Task, спит для имитации работы, затем проверяет ActivityId в конце, чтобы убедиться, что это то же самое значение (то есть, что оно не было изменено другим потоком).Программа работает успешно.

При исследовании других «контекстных» опций для потоков, задач и параллельных операций (в конечном счете, для обеспечения лучшего контекста для ведения журнала) я столкнулся со странной проблемой с Trace.CorrelationManager.LogicalOperationStack (мне все равно было странно).Я скопировал свой «ответ» на его вопрос ниже.

Я думаю, что он адекватно описывает проблему, с которой я столкнулся (Trace.CorrelationManager.LogicalOperationStack, по-видимому, поврежден - или что-то - при использовании в контексте Parallel.For, но только если сам Parallel.For заключен в логическую операцию).

Вот мои вопросы:

  1. Должно ли Trace.CorrelationManager.LogicalOperationStack быть пригодным для использованияс Parallel.For?Если да, то должно ли это иметь значение, если логическая операция уже работает с Parallel.For?

  2. Есть ли "правильный" способ использовать LogicalOperationStack с Parallel.For?Могу ли я по-разному кодировать этот пример программы, чтобы она «работала»?Под «работами» я подразумеваю, что LogicalOperationStack всегда имеет ожидаемое количество записей, а сами записи являются ожидаемыми записями.

Я провел дополнительное тестирование с использованием потоков Threads и ThreadPool,но я должен был бы вернуться и повторить эти тесты, чтобы увидеть, сталкивался ли я с подобными проблемами.

Я скажу, что кажется, что потоки Task / Parallel и ThreadPool «наследуют» Trace.CorrelationManager.Значения ActivityId и Trace.CorrelationManager.LogicalOperationStack из родительского потока.Это ожидается, поскольку эти значения хранятся в CorrelationManager с использованием метода LogicalSetData CallContext (в отличие от SetData).

Снова, пожалуйста, вернитесь к этому вопросу, чтобы получить исходный контекст для"ответ", который я разместил ниже:

Как задачи в параллельной библиотеке задач влияют на ActivityID?

См. также этот похожий вопрос (который до сих пор не былответил) на форуме Microsoft Parallel Extensions:

http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

[НАЧАЛО ПАСТЫ]

Пожалуйста, прости меня за публикацию этого ответана самом деле это не ответ на ваш вопрос, однако, он связан с вашим вопросом, так как он касается поведения CorrelationManager и потоков / задач / и т. д.Я пытался использовать LogicalOperationStackStartLogicalOperation/StopLogicalOperation методы) CorrelationManager для обеспечения дополнительного контекста в многопоточных сценариях.

Я взял ваш пример и слегка его изменил, чтобы добавить возможность выполнять работу параллельно, используяParallel.For.Кроме того, я использую StartLogicalOperation/StopLogicalOperation для скобки (внутри) DoLongRunningWork.Концептуально, DoLongRunningWork делает что-то подобное каждый раз, когда выполняется:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

Я обнаружил, что если я добавлю эти логические операции в ваш код (более или менее как есть), все логические операциисинхронизироваться (всегда ожидаемое количество операций в стеке, а значения операций в стеке всегда соответствуют ожидаемым).

В некоторых моих собственных испытаниях я обнаружил, что это не всегда так. Стек логической операции становился «поврежденным». Лучшее объяснение, которое я мог бы придумать, заключается в том, что «слияние» задней части информации CallContext с «родительским» потоком при выходе из «дочернего» потока приводило к тому, что «старая» информация о контексте дочернего потока (логическая операция) была « наследуется другим дочерним потоком одного брата.

Проблема также может быть связана с тем фактом, что Parallel.For, по-видимому, использует основной поток (по крайней мере, в коде примера, как написано) в качестве одного из «рабочих потоков» (или как там их следует вызывать в параллельном режиме). домен). Всякий раз, когда выполняется DoLongRunningWork, новая логическая операция запускается (в начале) и останавливается (в конце) (то есть помещается в LogicalOperationStack и откатывается назад от него). Если у основного потока уже есть действующая логическая операция, и если DoLongRunningWork выполняется В ОСНОВНОЙ РЕЗЬБЕ, то запускается новая логическая операция, поэтому в LogicalOperationStack основного потока теперь выполняется ДВА операции. Любые последующие исполнения DoLongRunningWork (пока эта «итерация» DoLongRunningWork выполняется в главном потоке) (по-видимому) будут наследовать LogicalOperationStack основного потока (который теперь имеет две операции, а не только одну ожидаемую операцию).

Мне потребовалось много времени, чтобы понять, почему поведение LogicalOperationStack в моем примере отличается от моего модифицированного варианта вашего примера. Наконец, я увидел, что в своем коде я заключил в скобки всю программу в виде логической операции, тогда как в моей модифицированной версии тестовой программы я этого не сделал. Подразумевается, что в моей тестовой программе каждый раз, когда выполнялась моя «работа» (аналог DoLongRunningWork), уже действовала логическая операция. В моей модифицированной версии вашей тестовой программы я не заключил в скобки всю программу в логической операции.

Итак, когда я изменил вашу тестовую программу, чтобы заключить в нее всю программу в логической операции И если я использую Parallel.For, я столкнулся с точно такой же проблемой.

Используя приведенную выше концептуальную модель, она будет успешно работать:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

Хотя это в конечном итоге будет подтверждено из-за явно не синхронизированной LogicalOperationStack:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

Вот мой пример программы. Он похож на ваш в том, что у него есть метод DoLongRunningWork, который управляет ActivityId, а также LogicalOperationStack. У меня также есть два варианта пинания DoLongRunningWork. В одном аромате используются задачи, в другом - Parallel.For. Каждый вариант также может быть выполнен так, что вся параллельная операция заключена в логическую операцию или нет. Таким образом, существует всего 4 способа выполнения параллельной операции. Чтобы попробовать каждый из них, просто раскомментируйте нужный метод «Использовать ...», перекомпилируйте и запустите. UseTasks, UseTasks(true) и UseParallelFor должны выполняться до конца. UseParallelFor(true) будет утверждаться в какой-то момент, потому что LogicalOperationStack не имеет ожидаемого количества записей.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

Весь этот вопрос о том, можно ли использовать LogicalOperationStack с Parallel.For (и / или другими конструкциями потоков / задач) или как его можно использовать, вероятно, заслуживает отдельного вопроса. Может быть, я отправлю вопрос. В то же время мне интересно, есть ли у вас какие-либо мысли по этому поводу (или, если вам интересно, вы рассматривали возможность использования LogicalOperationStack, поскольку ActivityId представляется безопасным).

[КОНЕЦ ПАСТЫ]

У кого-нибудь есть мысли по этому поводу?

Ответы [ 2 ]

5 голосов
/ 07 февраля 2011

[Начать обновление]

Я также задавал этот вопрос на Параллельные расширения Microsoft для форума поддержки .Net и в итоге получил ответ от Стивена Туба.Оказывается, в LogicalCallContext есть ошибка , которая приводит к повреждению LogicalOperationStack.Есть также хорошее описание (в ответе Стивена на ответ, который я сделал на его ответ), в котором дается краткое описание того, как работает Parallel.For, в отношении раздачи заданий и почему это делает Parallel.For восприимчивым к ошибке.

В своем ответе ниже я предполагаю, что LogicalOperationStack не совместим с Parallel.For, поскольку Parallel.For использует основной поток в качестве одного из «рабочих» потоков.Исходя из объяснений Стивена, мои предположения были неверными.Parallel.For использует основной поток как один из «рабочих» потоков, но он не просто используется «как есть».Первая задача запускается в основном потоке, но выполняется так, как если бы она запускалась в новом потоке.Прочитайте описание Стивена для получения дополнительной информации.

[Конец обновления]

Из того, что я могу сказать, ответ следующий:

Оба ActivityId иLogicalOperationStack хранится через CallContext.LogicalSetData .Это означает, что эти значения будут «перетекать» в любые «дочерние» потоки.Это довольно круто, например, вы можете установить ActivityId в точке входа на многопоточный сервер (скажем, вызов службы), и все потоки, которые в конечном итоге запускаются из этой точки входа, могут быть частью одной и той же «активности».Аналогично, логические операции (через LogicalOperationStack) также передаются в дочерние потоки.

Что касается Trace.CorrelationManager.ActivityId:

ActivityId, похоже, совместим со всеми моделями потоков, которые я тестировалэто с помощью: непосредственного использования потоков, использования ThreadPool, использования задач, использования Parallel. *.Во всех случаях ActivityId имеет ожидаемое значение.

Что касается Trace.CorrelationManager.LogicalOperationStack:

Кажется, что LogicalOperationStack совместим с большинством моделей потоков, но НЕ с Parallel. *.Используя потоки напрямую, ThreadPool и Tasks, LogicalOperationStack (как манипулируется в примере кода, приведенного в моем вопросе) сохраняет свою целостность.Во всех случаях содержимое LogicalOperationStack соответствует ожидаемому.

LogicalOperationStack НЕ совместимо с Parallel.For.Если логическая операция «в силе», то есть если вы вызвали CorrelationManager.StartLogicalOperation до начала операции Parallel. *, А затем вы запускаете новую логическую операцию в контексте Parallels. * (То есть в делегате), тогда LogicalOperationStack будет поврежден.(Я должен сказать, что он, вероятно, будет поврежден. Parallel. * Может не создавать никаких дополнительных потоков, что означает, что LogicalOperationStack будет безопасным).

Проблема связана с тем, что Parallel. * Использует основнойпоток (или, возможно, более правильно, поток, который запускает параллельную операцию) как один из его «рабочих» потоков.Это означает, что, поскольку «логические операции» запускаются и останавливаются в «рабочем» потоке, который совпадает с «основным» потоком, LogicalOperationStack «основного» потока изменяется.Даже если вызывающий код (т. Е. Делегат) правильно поддерживает стек (гарантируя, что каждая операция StartLogicalOperation «останавливается» с помощью соответствующей операции StopLogicalOperation), стек «основных» потоков изменяется.В конечном счете, мне кажется (во всяком случае, мне), что LogicalOperationStack «основного» потока по существу изменяется двумя разными «логическими» потоками: «основным» потоком и «рабочим» потоком, оба из которых оказываются ЖЕнить.

Я не знаю глубинных особенностей того, почему это не работает (по крайней мере, как я ожидал бы, что это сработает).Мое лучшее предположение состоит в том, что каждый раз, когда делегат выполняется в потоке (который не совпадает с основным потоком), поток «наследует» текущее состояние LogicalOperationStack основного потока.Если делегат в настоящее время выполняется в главном потоке (используется повторно в качестве рабочего потока) и запустил логическую операцию, то один (или более чем один) из других распараллеленных делегатов «унаследует» LogicalOperationStack основного потока, который теперь имеетодна (или несколько) новых логических операций в действии!

FWIW, я реализовал (главным образом для тестирования, я на самом деле не использую его в настоящее время), следующий «логический стек» для имитации LogicalOperationStack, ноэто так, что он будет работать с Parallel. * Не стесняйтесь попробовать и / или использовать его.Чтобы проверить, замените вызовы

Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation

в примере кода из моего исходного вопроса на вызовы

LogicalOperation.OperationStack.Push()/Pop().


//OperationStack.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using System.Runtime.Remoting.Messaging;

namespace LogicalOperation
{
  public static class OperationStack
  {
    private const string OperationStackSlot = "OperationStackSlot";

    public static IDisposable Push(string operation)
    {
      OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      OperationStackItem op = new OperationStackItem(parent, operation);
      CallContext.LogicalSetData(OperationStackSlot, op);
      return op;
    }

    public static object Pop()
    {
      OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;

      if (current != null)
      {
        CallContext.LogicalSetData(OperationStackSlot, current.Parent);
        return current.Operation;
      }
      else
      {
        CallContext.FreeNamedDataSlot(OperationStackSlot);
      }
      return null;
    }

    public static object Peek()
    {
      OperationStackItem top = Top();
      return top != null ? top.Operation : null;
    }

    internal static OperationStackItem Top()
    {
      OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      return top;
    }

    public static IEnumerable<object> Operations()
    {
      OperationStackItem current = Top();
      while (current != null)
      {
        yield return current.Operation;
        current = current.Parent;
      }
    }

    public static int Count
    {
      get
      {
        OperationStackItem top = Top();
        return top == null ? 0 : top.Depth;
      }
    }

    public static IEnumerable<string> OperationStrings()
    {
      foreach (object o in Operations())
      {
        yield return o.ToString();
      }
    }
  }
}


//OperationStackItem.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace LogicalOperation
{
  public class OperationStackItem : IDisposable
  {
    private OperationStackItem parent = null;
    private object operation;
    private int depth;
    private bool disposed = false;

    internal OperationStackItem(OperationStackItem parentOperation, object operation)
    {
      parent = parentOperation;
      this.operation = operation;
      depth = parent == null ? 1 : parent.Depth + 1;
    }

    internal object Operation { get { return operation; } }
    internal int Depth { get { return depth; } }

    internal OperationStackItem Parent { get { return parent; } }

    public override string ToString()
    {
      return operation != null ? operation.ToString() : "";
    }

    #region IDisposable Members

    public void Dispose()
    {
      if (disposed) return;

      OperationStack.Pop();

      disposed = true;
    }

    #endregion
  }
}

Это было вдохновлено объектами области действия, описанными Брентом ВандерМейдом здесь: http://www.dnrtv.com/default.aspx?showNum=114

Вы можете использовать этот класс следующим образом:

public void MyFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFunc"))
  {
    MyOtherFunc();
  }
}

public void MyOtherFunc()
{
  using (LogicalOperation.OperationStack.Push("MyOtherFunc"))
  {
    MyFinalFunc();
  }
}

public void MyFinalFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFinalFunc"))
  {
    Console.WriteLine("Hello");
  }
}
2 голосов
/ 20 февраля 2012

Я искал способ получить логический стек, который должен легко работать в приложении, интенсивно использующем TPL.Я решил использовать LogicalOperationStack, потому что он делал все необходимое, не меняя существующий код.Но потом я прочитал об ошибке в LogicalCallContext:

https://connect.microsoft.com/VisualStudio/feedback/details/609929/logicalcallcontext-clone-bug-when-correlationmanager-slot-is-present

Так что я попытался найти обходной путь для этой ошибки, и я думаю, что он работает для TPL (спасибо ILSpy):

public static class FixLogicalOperationStackBug
{
    private static bool _fixed = false;

    public static void Fix()
    {
        if (!_fixed)
        {
            _fixed = true;

            Type taskType = typeof(Task);
            var s_ecCallbackField = taskType.GetFields(BindingFlags.Static | BindingFlags.NonPublic).First(f => f.Name == "s_ecCallback");
            ContextCallback s_ecCallback = (ContextCallback)s_ecCallbackField.GetValue(null);

            ContextCallback injectedCallback = new ContextCallback(obj =>
            {
                // Next line will set the private field m_IsCorrelationMgr of LogicalCallContext which isn't cloned
                CallContext.LogicalSetData("System.Diagnostics.Trace.CorrelationManagerSlot", Trace.CorrelationManager.LogicalOperationStack);
                s_ecCallback(obj);
            });

            s_ecCallbackField.SetValue(null, injectedCallback);
        }
    }
}
...