Могу ли я основать критически важное приложение на результатах этого теста, чтобы 100 потоков, читающих указатель, установленный основным потоком миллиард раз, никогда не видели разрыв?
Любые другие потенциальные проблемы, связанные с этим, кроме tearing?
Вот отдельная демонстрация, которая компилируется с g++ -g tear.cxx -o tear -pthread
.
#include <atomic>
#include <thread>
#include <vector>
using namespace std;
void* pvTearTest;
atomic<int> iTears( 0 );
void TearTest( void ) {
while (1) {
void* pv = (void*) pvTearTest;
intptr_t i = (intptr_t) pv;
if ( ( i >> 32 ) != ( i & 0xFFFFFFFF ) ) {
printf( "tear: pv = %p\n", pv );
iTears++;
}
if ( ( i >> 32 ) == 999999999 )
break;
}
}
int main( int argc, char** argv ) {
printf( "\n\nTEAR TEST: are normal pointer read/writes atomic?\n" );
vector<thread> athr;
// Create lots of threads and have them do the test simultaneously.
for ( int i = 0; i < 100; i++ )
athr.emplace_back( TearTest );
for ( int i = 0; i < 1000000000; i++ )
pvTearTest = (void*) (intptr_t)
( ( i % (1L<<32) ) * 0x100000001 );
for ( auto& thr: athr )
thr.join();
if ( iTears )
printf( "%d tears\n", iTears.load() );
else
printf( "\n\nTEAR TEST: SUCCESS, no tears\n" );
}
. Фактическое приложение - это malloc()
'ed, а иногда realloc()
' d массив (размер является степенью двойки (reallo c удваивает объем хранилища), которую многие дочерние потоки будут использовать для решения критически важных, но и высокопроизводительных задач.
Время от времени потребуется добавлять поток новая запись в массиве, и будет делать это, устанавливая следующую запись массива, чтобы указать на что-то, а затем увеличить atomic<int> iCount
. Наконец, он добавит данные к некоторым структурам данных, которые заставят другие потоки пытаться разыменовать эту ячейку.
Все это выглядит хорошо (за исключением того, что я не уверен, если увеличение количества гарантировано до того, как последует -atomi c updates) ... за исключением , с одной стороны: realloc()
обычно изменяет адрес массива, а далее освобождает старый, указатель на который все еще виден другим потокам.
ОК, поэтому вместо realloc()
я malloc()
создаю новый массив, вручную копирую содержимое, устанавливаю указатель на массив. Я бы освободил старый массив, но я понимаю, что другие потоки все еще могут обращаться к нему: они читают базу массива; Я освобождаю базу; третий поток размещает там что-то еще; Затем первый поток добавляет индексированное смещение к базе и ожидает действительный указатель. Я счастлив, что утечка тех, хотя. (Учитывая удвоение прироста, все старые массивы объединяются примерно с тем же размером, что и текущий массив, поэтому накладные расходы - это просто дополнительные 16 байт на элемент, и это память, которая вскоре никогда не будет снова использоваться.)
Итак, вот суть вопроса: как только я выделю больший массив, могу ли я написать его базовый адрес с записью без атома c, в полной безопасности? Или, несмотря на мой тест с миллиардным доступом, мне действительно нужно сделать его атомом c <> и таким образом замедлить все рабочие потоки для чтения этого атома c?
(поскольку это, безусловно, зависит от среды, мы Вы говорите об Intel 2012 или более поздней версии, g ++ 4–9 и Red Hat 2012 или более поздней версии.)
РЕДАКТИРОВАТЬ: вот модифицированная тестовая программа, которая намного более точно соответствует моему запланированному сценарию, только с небольшое количество записей. Я также добавил количество чтений. Я вижу при переключении с void * на atomi c I go с 2240 чтений / se c до 660 чтений / se c (с отключенной оптимизацией). Машинный язык для чтения отображается после источника.
#include <atomic>
#include <chrono>
#include <thread>
#include <vector>
using namespace std;
chrono::time_point<chrono::high_resolution_clock> tp1, tp2;
// void*: 1169.093u 0.027s 2:26.75 796.6% 0+0k 0+0io 0pf+0w
// atomic<void*>: 6656.864u 0.348s 13:56.18 796.1% 0+0k 0+0io 0pf+0w
// Different definitions of the target variable.
atomic<void*> pvTearTest;
//void* pvTearTest;
// Children sum the tears they find, and at end, total checks performed.
atomic<int> iTears( 0 );
atomic<uint64_t> iReads( 0 );
bool bEnd = false; // main thr sets true; children all finish.
void TearTest( void ) {
uint64_t i;
for ( i = 0; ! bEnd; i++ ) {
intptr_t iTearTest = (intptr_t) (void*) pvTearTest;
// Make sure top 4 and bottom 4 bytes are the same. If not it's a tear.
if ( ( iTearTest >> 32 ) != ( iTearTest & 0xFFFFFFFF ) ) {
printf( "tear: pv = %ux\n", iTearTest );
iTears++;
}
// Output periodically to prove we're seeing changing values.
if ( ( (i+1) % 50000000 ) == 0 )
printf( "got: pv = %lx\n", iTearTest );
}
iReads += i;
}
int main( int argc, char** argv ) {
printf( "\n\nTEAR TEST: are normal pointer read/writes atomic?\n" );
vector<thread> athr;
// Create lots of threads and have them do the test simultaneously.
for ( int i = 0; i < 100; i++ )
athr.emplace_back( TearTest );
tp1 = chrono::high_resolution_clock::now();
#if 0
// Change target as fast as possible for fixed number of updates.
for ( int i = 0; i < 1000000000; i++ )
pvTearTest = (void*) (intptr_t)
( ( i % (1L<<32) ) * 0x100000001 );
#else
// More like our actual app: change target only periodically, for fixed time.
for ( int i = 0; i < 100; i++ ) {
pvTearTest.store( (void*) (intptr_t) ( ( i % (1L<<32) ) * 0x100000001 ),
std::memory_order_release );
this_thread::sleep_for(10ms);
}
#endif
bEnd = true;
for ( auto& thr: athr )
thr.join();
tp2 = chrono::high_resolution_clock::now();
chrono::duration<double> dur = tp2 - tp1;
printf( "%ld reads in %.4f secs: %.2f reads/usec\n",
iReads.load(), dur.count(), iReads.load() / dur.count() / 1000000 );
if ( iTears )
printf( "%d tears\n", iTears.load() );
else
printf( "\n\nTEAR TEST: SUCCESS, no tears\n" );
}
Dump of assembler code for function TearTest():
0x0000000000401256 <+0>: push %rbp
0x0000000000401257 <+1>: mov %rsp,%rbp
0x000000000040125a <+4>: sub $0x10,%rsp
0x000000000040125e <+8>: movq $0x0,-0x8(%rbp)
0x0000000000401266 <+16>: movzbl 0x6e83(%rip),%eax # 0x4080f0 <bEnd>
0x000000000040126d <+23>: test %al,%al
0x000000000040126f <+25>: jne 0x40130c <TearTest()+182>
=> 0x0000000000401275 <+31>: mov $0x4080d8,%edi
0x000000000040127a <+36>: callq 0x40193a <std::atomic<void*>::operator void*() const>
0x000000000040127f <+41>: mov %rax,-0x10(%rbp)
0x0000000000401283 <+45>: mov -0x10(%rbp),%rax
0x0000000000401287 <+49>: sar $0x20,%rax
0x000000000040128b <+53>: mov -0x10(%rbp),%rdx
0x000000000040128f <+57>: mov %edx,%edx
0x0000000000401291 <+59>: cmp %rdx,%rax
0x0000000000401294 <+62>: je 0x4012bb <TearTest()+101>
0x0000000000401296 <+64>: mov -0x10(%rbp),%rax
0x000000000040129a <+68>: mov %rax,%rsi
0x000000000040129d <+71>: mov $0x40401a,%edi
0x00000000004012a2 <+76>: mov $0x0,%eax
0x00000000004012a7 <+81>: callq 0x401040 <printf@plt>
0x00000000004012ac <+86>: mov $0x0,%esi
0x00000000004012b1 <+91>: mov $0x4080e0,%edi
0x00000000004012b6 <+96>: callq 0x401954 <std::__atomic_base<int>::operator++(int)>
0x00000000004012bb <+101>: mov -0x8(%rbp),%rax
0x00000000004012bf <+105>: lea 0x1(%rax),%rcx
0x00000000004012c3 <+109>: movabs $0xabcc77118461cefd,%rdx
0x00000000004012cd <+119>: mov %rcx,%rax
0x00000000004012d0 <+122>: mul %rdx
0x00000000004012d3 <+125>: mov %rdx,%rax
0x00000000004012d6 <+128>: shr $0x19,%rax
0x00000000004012da <+132>: imul $0x2faf080,%rax,%rax
0x00000000004012e1 <+139>: sub %rax,%rcx
0x00000000004012e4 <+142>: mov %rcx,%rax
0x00000000004012e7 <+145>: test %rax,%rax
0x00000000004012ea <+148>: jne 0x401302 <TearTest()+172>
0x00000000004012ec <+150>: mov -0x10(%rbp),%rax
0x00000000004012f0 <+154>: mov %rax,%rsi
0x00000000004012f3 <+157>: mov $0x40402a,%edi
0x00000000004012f8 <+162>: mov $0x0,%eax
0x00000000004012fd <+167>: callq 0x401040 <printf@plt>
0x0000000000401302 <+172>: addq $0x1,-0x8(%rbp)
0x0000000000401307 <+177>: jmpq 0x401266 <TearTest()+16>
0x000000000040130c <+182>: mov -0x8(%rbp),%rax
0x0000000000401310 <+186>: mov %rax,%rsi
0x0000000000401313 <+189>: mov $0x4080e8,%edi
0x0000000000401318 <+194>: callq 0x401984 <std::__atomic_base<unsigned long>::operator+=(unsigned long)>
0x000000000040131d <+199>: nop
0x000000000040131e <+200>: leaveq
0x000000000040131f <+201>: retq