Оптимизация компилятора ломает многопоточный код - PullRequest
8 голосов
/ 12 ноября 2010

Узнав, что переменные shared в настоящее время не защищены барьерами памяти , я столкнулся с другой проблемой.Либо я делаю что-то не так, либо существующая оптимизация компилятора в dmd может нарушить многопоточный код путем изменения порядка чтения shared переменных.

Например, когда я компилирую исполняемый файл с dmd -O(полная оптимизация), компилятор успешно оптимизирует локальную переменную o в этом коде (где cas - это функция сравнения и замены из core.atomic)

shared uint cnt;
void atomicInc  ( ) { uint o; do { o = cnt; } while ( !cas( &cnt, o, o + 1 ) );}

примерно так (см. Разборка ниже):

shared uint cnt;
void atomicInc  ( ) { while ( !cas( &cnt, cnt, cnt + 1 ) ) { } }

В «оптимизированном» коде cnt считывается дважды из памяти, тем самым рискуя тем, что другой поток изменил cntмежду.Оптимизация в основном разрушает алгоритм сравнения и обмена.

Является ли это ошибкой или есть правильный способ достижения желаемого результата?Единственный обходной путь, который я нашел до сих пор, заключается в реализации кода с использованием ассемблера.

Полный тестовый код и дополнительная информация
Для полноты здесь приведен полный тестовый код, который показываетобе проблемы (без памяти-барьеров, и проблема оптимизации).Он выдает следующие выходные данные на трех разных компьютерах Windows для dmd 2.049 и dmd 2.050 (при условии, что алгоритм Деккера не блокируется, что может произойти):

dmd -O -run optbug.d
CAS   : failed
Dekker: failed

И цикл внутри atomicInc компилируетсяк этому с полной оптимизацией:

; cnt is stored at 447C10h
; while ( !cas( &cnt, o, o + 1 ) ) o = cnt;
; 1) prepare call cas( &cnt, o, o + 1 ): &cnt and o go to stack, o+1 to eax
402027: mov    ecx,447C10h         ; ecx = &cnt
40202C: mov    eax,[447C10h]     ; eax = o1 = cnt
402031: inc    eax                 ; eax = o1 + 1 (third parameter)
402032: push   ecx                 ; push &cnt (first parameter)
    ; next instruction pushes current value of cnt onto stack
    ; as second parameter o instead of re-using o1
402033: push   [447C10h]    
402039: call   4020BC              ; 2) call cas    
40203E: xor    al,1                ; 3) test success
402040: jne    402027              ; no success try again
; end of main loop

Вот код теста:

import core.atomic;
import core.thread;
import std.stdio;

enum loops = 0xFFFF;
shared uint cnt;

/* *****************************************************************************
 Implement atomicOp!("+=")(cnt, 1U); with CAS. The code below doesn't work with
 the "-O" compiler flag because cnt is read twice while calling cas and another
 thread can modify cnt in between.
*/
enum threads = 8;

void atomicInc  ( ) { uint o; do { o = cnt; } while ( !cas( &cnt, o, o + 1 ) );}
void threadFunc ( ) { foreach (i; 0..loops) atomicInc; }

void testCas ( ) {
    cnt = 0;
    auto tgCas = new ThreadGroup;
    foreach (i; 0..threads) tgCas.create(&threadFunc);
    tgCas.joinAll;
    writeln( "CAS   : ", cnt == loops * threads ? "passed" : "failed" );
}

/* *****************************************************************************
 Dekker's algorithm. Fails on ia32 (other than atom) because ia32 can re-order 
 read before write. Most likely fails on many other architectures.
*/
shared bool flag1 = false;
shared bool flag2 = false;
shared bool turn2 = false;   // avoids starvation by executing 1 and 2 in turns

void dekkerInc ( ) {
    flag1 = true;
    while ( flag2 ) if ( turn2 ) {
        flag1 = false; while ( turn2 )  {  /* wait until my turn */ }
        flag1 = true;
    }
    cnt++;                   // shouldn't work without a cast
    turn2 = true; flag1 = false;
}

void dekkerDec ( ) {
    flag2 = true;
    while ( flag1 ) if ( !turn2 ) {
        flag2 = false; while ( !turn2 ) { /* wait until my turn */ }
        flag2 = true;
    }
    cnt--;                   // shouldn't work without a cast
    turn2 = false; flag2 = false;
}

void threadDekkerInc ( ) { foreach (i; 0..loops) dekkerInc; }
void threadDekkerDec ( ) { foreach (i; 0..loops) dekkerDec; }

void testDekker ( ) {
    cnt = 0;
    auto tgDekker = new ThreadGroup;
    tgDekker.create( &threadDekkerInc );
    tgDekker.create( &threadDekkerDec );
    tgDekker.joinAll;
    writeln( "Dekker: ", cnt == 0 ? "passed" : "failed" );
}

/* ************************************************************************** */
void main() {
    testCas;
    testDekker;
}

Ответы [ 2 ]

4 голосов
/ 16 марта 2012

Несмотря на то, что проблемы все еще существуют, core.atomic теперь предоставляет atomicLoad, что позволяет сделать относительно простой обходной путь.Чтобы пример cas работал, достаточно загрузить cnt атомарно:

void atomicInc  ( ) { 
    uint o; 
    do {
         o = atomicLoad(cnt); 
    } while ( !cas( &cnt, o, o + 1 ) );
}

Аналогично, чтобы алгоритм Деккера работал:

// ...
while ( atomicLoad(flag2) ) if ( turn2 ) {
// ...
while ( atomicLoad(flag1) ) if ( !turn2 ) {
// ...

Для архитектур, отличных от ia32 (игнорируя строковые операции и SSE), которые также могут переупорядочивать операции чтения

  • относительно операций чтения
  • или записи относительно операций записи
  • или операции записи и чтения в ту же памятьlocation

потребуются дополнительные барьеры памяти.

3 голосов
/ 12 ноября 2010

Да, код в ассемблере.Если вы пропустите использование функции cas () и просто напишите всю свою функцию atomicInt в сборке, это будет всего лишь несколько строк кода.Пока вы этого не сделаете, вы, вероятно, будете бороться с оптимизацией компилятора.

Кроме того, вы можете использовать инструкцию x86 LOCK INC вместо CAS, и вы сможете сократить функцию до одной или двух строк сборки.

...