Согласованность данных в многопоточной среде без одновременного доступа к данным - PullRequest
0 голосов
/ 17 июня 2020

Обеспечивает ли барьер памяти согласованность данных между потоками, когда нет блокировки и нет одновременного доступа к данным, кроме как из родительского потока?

Вот мой сценарий:

  • Основной поток запускает несколько дочерних потоков
  • дочерние потоки ждут, пока состояние будет готово
  • Главный поток начинает работать с глобальным набором данных и уведомляет дочерние потоки о готовности состояния. Затем основной поток ожидает обработки дочернего элемента.
  • каждый дочерний поток работает с другим подмножеством глобального набора данных (либо экземпляры объектов полностью разные, либо потоки работают с другим конечным членом / частью память объекта) ==> каждый адрес памяти читается / записывается не более чем ОДНИМ дочерним потоком
  • , когда дочерний поток fini sh является основной работой, он уведомляет основной поток. Дочерний поток продолжает работать с определенными c собственными данными.
  • когда все дочерние потоки завершают sh свою основную работу, основной поток возобновляет свою работу с глобальным набором данных, измененным дочерними потоками

Я хочу убедиться, что:

  • i. каждый дочерний поток видит свежее / чистое состояние при выполнении своей основной работы (то есть все записи (из основного потока), выполненные перед запуском дочернего потока, видны дочернему потоку)
  • ii. после возобновления основной поток видит свежее / чистое состояние (то есть все записи (из дочерних потоков), выполненные дочерним потоком во время их основной работы, видны основному потоку)

Вот как выглядит код:

    public class State
    {
        public string Data1 { get; set; }
        public string Data2 { get; set; }

        public int[] SomeArray { get; set; }
    }

    static void Main(string[] args)
    {
        var finishedCounter = 0L;
        var ready = 0L;
        var state = new State();

        var thread1 = new Thread(o =>
        {
            while (Interlocked.Read(ref ready) != 1)
            {
                //waiting for initialization
            }
            var input = (State)o;
            //Assert.AreEqual("Initial Data1 From MainThread", input.Data1);
            Thread.MemoryBarrier();//TMB 1.1 Force read value pushed from mainThread

            //Core work
            Assert.AreEqual("Initial Data1 From MainThread", input.Data1);
            input.Data1 = "Modified by Thread 1";
            input.SomeArray[1] = 11;

            Thread.MemoryBarrier();//TMB 1.2 Force storing all written value in order to be visible for the Main threads

            Interlocked.Increment(ref finishedCounter);
            while (true)
            {
                //Non-core work (using data specific to this thread)
            }
        });
        var thread2 = new Thread(o =>
        {
            while (Interlocked.Read(ref ready) != 1)
            {
                //waiting for initialization
            }
            var input = (State)o;
            //Assert.AreEqual("Initial Data2 From MainThread", input.Data2);
            Thread.MemoryBarrier();//TMB 2.1 Force read value pushed from mainThread

            //Core work
            Assert.AreEqual("Initial Data2 From MainThread", input.Data2);
            input.Data2 = "Modified by Thread 2";
            input.SomeArray[2] = 22;

            Thread.MemoryBarrier();//TMB 2.2 Force storing all written value in order to be visible for the Main threads
            Interlocked.Increment(ref finishedCounter);
            while (true)
            {
                //Non-core work (using data specific to this thread)
            }
        });

        thread1.Start(state);
        thread2.Start(state);

        state.Data1 = "Initial Data1 From MainThread";
        state.Data2 = "Initial Data2 From MainThread";
        state.SomeArray = new[] { 0, -1, -2 };
        Thread.MemoryBarrier();//TMB 0.1 Force storing all written value in order to be visible for the child threads

        Interlocked.Increment(ref ready);//child thread will process

        while (Interlocked.Read(ref finishedCounter) != 2)//let's wait for the childs threads to finish their core work
        {
        }

        //Assert.AreEqual("Modified by Thread 1", state.Data1);
        //Assert.AreEqual("Modified by Thread 2", state.Data2);

        Thread.MemoryBarrier();//TMB 0.1 Force retrieving all written value from the child threads

        Assert.AreEqual("Modified by Thread 1", state.Data1);
        Assert.AreEqual("Modified by Thread 2", state.Data2);
        Assert.AreEqual(0, state.SomeArray[0]);
        Assert.AreEqual(11, state.SomeArray[1]);
        Assert.AreEqual(22, state.SomeArray[2]);

        Console.WriteLine("Done");
        Console.ReadLine();
    }

Вопросы:

  1. Всегда ли утверждения в приведенном выше коде верны?
  2. Могут ли закомментированные утверждения в приведенном выше коде быть ложными?
  3. Обеспечивает ли 6 Thread.MemoryBarrier точку i. и ii. выше?
  4. Актуальны ли комментарии к TMB?
  5. Зависит ли он от архитектуры? (Я работаю как над процессором x86 / x64)

Ответы [ 2 ]

2 голосов
/ 17 июня 2020

(Это ответ на обновленную версию вопроса с рабочими потоками и неприятными спин-ожиданиями вместо выхода / thread.Join)


Могут ли закомментированные утверждения в приведенном выше коде быть ложными?

Нет, все ваши Thread.MemoryBarrier() кажутся избыточными и просто замедляют вас. ( Я не очень внимательно читал ваш код, но я думаю, что у вас есть потоки, ожидающие увидеть результат операции Interlocked из другого потока, прежде чем они прочитают / запишут другие данные.)

Interlocked.Increment(ref ready); - это релиз / операция получения (фактически полный барьер памяти, например Thread.MemoryBarrier()). Новое значение ref не будет видно, пока не будут видны все предыдущие хранилища.

Interlocked.Read - это операция получения: более поздние загрузки могут быть загружены только после значения вы получаете от этого. https://preshing.com/20120913/acquire-and-release-semantics/

Это дает вам освобождение / получение синхронизации без необходимости каких-либо явных / автономных барьеров памяти. ( Использует ли Interlocked.CompareExchange барьер памяти? ссылается на стандарт ECMA-335, поэтому мы знаем, что это переносимая гарантия, а не деталь реализации x86.)


Все оборудование модели памяти, через которые проходят потоки C ++ / C#, согласованы с кешем; вам не нужна явная очистка, чтобы видеть данные по потокам. Вам просто нужно убедиться, что компилятор не хранит значение в регистре и не хранит и не загружает его вообще.

В гипотетическом случае, когда требуется явная очистка, Interlocked сделает это за вас, чтобы сохранить модель памяти языка.

2 голосов
/ 17 июня 2020

(примечание редактора: это ответ на исходную версию вопроса, где выход из потока / thread.Join обеспечивал синхронизацию вместо операций с блокировкой. Я считаю, что ответ в целом такой же для новой версии)

From - это потокобезопасные массивы :

Я считаю, что если каждый поток работает только с отдельной частью массива, все будет хорошо. Если вы собираетесь обмениваться данными (то есть передавать их между потоками), вам понадобится какой-то барьер памяти, чтобы избежать проблем с моделью памяти.

...

Jon Skeet

Исходя из этого, я полагаю, что ответы будут:

  1. Да
  2. Вероятно, нет. Я считаю, что thread.Join должен будет создать как минимум барьер памяти. Но я не могу найти, что это задокументировано.
  3. Я не думаю, что барьеры памяти необходимы для обеспечения согласованного поведения в этом случае, поскольку никакие параллельные потоки не читают или не записывают одни и те же данные.
  4. Нет , а не если барьеры памяти не нужны.
  5. Я вполне уверен, что x86 и x64 ведут себя одинаково. Если бы модель памяти была изменена, это вызвало бы всевозможные проблемы при переносе старого кода. ARM использует более слабую модель памяти, но пока вы придерживаетесь . Net модели памяти , все будет в порядке.

В целом я бы поддержал слегка параноидальный подход , код может измениться, и проблемы с параллелизмом может быть трудно обнаружить. Некоторые практические правила:

  • По возможности предпочитать неизменяемость
  • По возможности использовать существующие конструкции (parallel.For, ConcurrentCollections, task.Run)
  • Использовать блокировку при чтении или записи общих полей.
  • используйте блокировку, если другие методы не применимы, но сохраняйте критическую секцию как можно меньше, предпочтительно выполняя ограниченный объем кода.
...