Многопоточный доступ и редактирование одного и того же двойного массива - PullRequest
1 голос
/ 12 апреля 2020

Мне нужно перебирать каждую двойку в массиве, чтобы выполнить "сглаживание Лапласа", "смешивание значений" с соседними двойниками.

Я сохраню сохраненные значения в массиве временных клонов, обновив оригинал в конец.

Псевдокод:

double[] A = new double[1000];
// Filling A with values...

double[] B = A.Clone as double[];

for(int loops=0;loops<10;loops++){ // start of the loop

    for(int i=0;i<1000;i++){ // iterating through all doubles in the array
    // Parallel.For(0, 1000, (i) => {

       double v= A[i];
       B[i]-=v;
       B[i+1]+=v/2;
       B[i-1]+=v/2;
       // here i'm going out of array bounds, i know. Pseudo code, not relevant.

    }
    // });
}
A = B.Clone as double[];

С for работает правильно. «Сглаживание» значений в массиве.

С Parallel.For() У меня есть некоторые проблемы с доступом syn c: потоки сталкиваются, а некоторые значения на самом деле не сохраняются правильно. Потоки обращаются к массиву и редактируют его по одному и тому же индексу много раз.

(я не проверял это в линейном массиве, на самом деле я работаю с многомерным массивом [x, y, z] ..)

Как я могу решить эту проблему?

Я думал создать отдельный массив для каждого потока и сделать сумму позже ... но мне нужно знать индекс потока, и я не знаю ' не нашел где-нибудь в сети. (Мне все еще интересно, существует ли «индекс потока» даже с совершенно другим решением ...).

Я приму любое решение.

Ответы [ 3 ]

1 голос
/ 13 апреля 2020

Возможно, вам нужна одна из более сложных перегрузок метода Parallel.For:

public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive,
    ParallelOptions parallelOptions, Func<TLocal> localInit,
    Func<int, ParallelLoopState, TLocal, TLocal> body,
    Action<TLocal> localFinally);

Выполняет a для l oop с локальными данными потока в какие итерации могут выполняться параллельно, можно настроить параметры l oop, а состояние l oop можно отслеживать и манипулировать.

Это выглядит довольно пугающе со всеми различными лямбдами надеется. Идея состоит в том, чтобы каждый поток работал с локальными данными и, наконец, объединял данные в конце. Вот как вы можете использовать этот метод для решения вашей проблемы:

double[] A = new double[1000];
double[] B = (double[])A.Clone();
object locker = new object();
var parallelOptions = new ParallelOptions()
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.For(0, A.Length, parallelOptions,
    localInit: () => new double[A.Length], // create temp array per thread
    body: (i, state, temp) =>
    {
        double v = A[i];
        temp[i] -= v;
        temp[i + 1] += v / 2;
        temp[i - 1] += v / 2;
        return temp; // return a reference to the same temp array
    }, localFinally: (localB) =>
    {
        // Can be called in parallel with other threads, so we need to lock
        lock (locker)
        {
            for (int i = 0; i < localB.Length; i++)
            {
                B[i] += localB[i];
            }
        }
    });

Я должен упомянуть, что рабочая нагрузка в приведенном выше примере слишком гранулированная, поэтому я не ожидаю значительного улучшения производительности от распараллеливания. Надеюсь, ваша фактическая нагрузка будет более короткой. Например, если у вас есть два вложенных цикла, распараллеливание только внешнего l oop будет работать очень хорошо, потому что внутреннее l oop обеспечит столь необходимую краткость.


Альтернативное решение: Вместо создания вспомогательных массивов для каждого потока, вы можете просто обновить непосредственно массив B и использовать блокировки только при обработке индекса в опасной зоне вблизи границ разделов :

Parallel.ForEach(Partitioner.Create(0, A.Length), parallelOptions, range =>
{
    bool lockTaken = false;
    try
    {
        for (int i = range.Item1; i < range.Item2; i++)
        {
            bool shouldLock = i < range.Item1 + 1 || i >= range.Item2 - 1;
            if (shouldLock) Monitor.Enter(locker, ref lockTaken);
            double v = A[i];
            B[i] -= v;
            B[i + 1] += v / 2;
            B[i - 1] += v / 2;
            if (shouldLock) { Monitor.Exit(locker); lockTaken = false; }
        }
    }
    finally
    {
        if (lockTaken) Monitor.Exit(locker);
    }
});
0 голосов
/ 13 апреля 2020

Хорошо, похоже, что модуль может решить почти все мои проблемы. Вот действительно упрощенная версия рабочего кода: (большой скрипт - 3d и незаконченный ...)

private void RunScript(bool Go, ref object Results)
  {
    if(Go){
      LaplacianSmooth(100);
      // Needed to restart "RunScript" over and over
      this.Component.ExpireSolution(true);
    }
    else{
      A = new double[count];
      A[100] = 10000;
      A[500] = 10000;
    }
    Results = A;
  }

  // <Custom additional code> 
  public static int T = Environment.ProcessorCount;
  public static int count = 1000;
  public double[] A = new double[count];
  public double[,] B = new double[count, T];

  public void LaplacianSmooth(int loops){
    for(int loop = 0;loop < loops;loop++){

      B = new double[count, T];

      // Copying values to first column of temp multidimensional-array
      Parallel.For(0, count, new ParallelOptions { MaxDegreeOfParallelism = T }, i => {
        B[i, 0] = A[i];
        });

      // Applying Laplacian smoothing
      Parallel.For(0, count, new ParallelOptions { MaxDegreeOfParallelism = T }, i => {
        int t = i % 16;
        // Wrapped next and previous element indexes
        int n = (i + 1) % count;
        int p = (i + count - 1) % count;
        double v = A[i] * 0.5;
        B[i, t] -= v;
        B[p, t] += v / 2;
        B[n, t] += v / 2;
        });

      // Copying values back to main array
      Parallel.For(0, count, new ParallelOptions { MaxDegreeOfParallelism = T }, i => {
        double val = 0;
        for(int t = 0;t < T;t++){
          val += B[i, t];
        }
        A[i] = val;
        });
    }
  }

Multithreaded linear laplacian smooth on grasshopper

Нет " столкновения »с потоками, что подтверждается результатом« Массового сложения »(сумма), который постоянен на уровне 20000.

Спасибо всем за советы!

0 голосов
/ 12 апреля 2020

У меня есть некоторые проблемы с доступом c.

Использовать блокировку.

double v = A[i];
lock (B)
{
   B[i] -= v;
   B[i+1] += v/2;
   B[i-1] += v/2;
}

В этом случае только один Thread может получить доступ к экземпляру целевого массива в в то же время.

Или вот популярный шаблон syn c, использующий один объект только для блокировки. Полезно, когда объект данных должен быть переназначен внутри блокировки.

1) Определить некоторый экземпляр объекта вне l oop

object syncRoot = new object();

2) Использовать блокировку

// some code
lock (syncRoot)
{
    // this part of code will be executed only by one Thread at the same time
}
// more code

Параллельный пример

Queue<Job> jobs = new Queue<Job>();
// Job is some data you want to process

//...

object syncRoot = new object();
List<Task> workers = new List<Task>();

// set max count of concurrent tasks here
int maxThreads = Environment.ProcessorCount * 2;

using (SemaphoreSlim semaphore = new SemaphoreSlim(maxThreads))
{
    while (jobs.Count > 0)
    {
        semaphore.Wait();
        Job currentJob = jobs.Dequeue();
        workers.Add(Task.Run(() =>
        {
            JobResult jobResult = DoJob(currentJob); // data processor method
            lock (syncRoot)
            {
                commonStorage.Add(jobResult);
                // write jobResult to common storage here
            }
            semaphore.Release();
        }));
    }
    if (Task.WaitAll(workers.ToArray(), 120000))
    {
        // all workers done their jobs, results may be processed here
    }
    else
    {
        // timeout occured
    }
}

Вы можете использовать любой массив l oop вместо Queue.

Task - это не просто Thread создание, но с использованием ThreadPool.

...