Я боролся с тем, что должно быть фундаментальным неправильным пониманием того, как работает атомика в C ++.Я написал код ниже для реализации быстрого кольцевого буфера с использованием атомарных переменных для индексов, чтобы несколько потоков могли записывать и читать из буфера.Я сократил код до этого простого случая (который, как я понимаю, все еще немного длинный. Извините.).Если я запускаю это на Linux или Mac OS X, это будет работать некоторое время, но оно также будет генерировать исключения как минимум в 10% случаев.Похоже, что он работает очень быстро, а затем замедляется, и, возможно, даже ускоряется снова, также предполагая, что что-то не так.Я не могу понять недостаток в моей логике.Нужно ли где-нибудь забор?
Вот простое описание того, что он пытается сделать: переменные атомного индекса увеличиваются с помощью метода compare_exchange_weak.Это должно гарантировать эксклюзивный доступ к слоту, из которого был получен индекс.На самом деле нужны два индекса, поэтому, когда мы обертываем кольцевой буфер, значения не перезаписываются.Более подробная информация включена в комментарии.
#include <mutex>
#include <atomic>
#include <iostream>
#include <cstdint>
#include <vector>
#include <thread>
using namespace std;
const uint64_t Nevents = 1000000;
std::atomic<uint64_t> Nwritten(0);
std::atomic<uint64_t> Nread(0);
#define MAX_EVENTS 10
mutex MTX;
std::atomic<uint32_t> iread{0}; // The slot that the next thread will try to read from
std::atomic<uint32_t> iwrite{0}; // The slot that the next thread will try to write to
std::atomic<uint32_t> ibegin{0}; // The slot indicating the beginning of the read region
std::atomic<uint32_t> iend{0}; // The slot indicating one-past-the-end of the read region
std::atomic<uint64_t> EVENT_QUEUE[MAX_EVENTS];
//-------------------------------
// WriteThreadATOMIC
//-------------------------------
void WriteThreadATOMIC(void)
{
MTX.lock();
MTX.unlock();
while( Nwritten < Nevents ){
// Copy (atomic) iwrite index to local variable and calculate index
// of next slot after it
uint32_t idx = iwrite;
uint32_t inext = (idx + 1) % MAX_EVENTS;
if(inext == ibegin){
// Queue is full
continue;
}
// At this point it looks like slot "idx" is available to write to.
// The next call ensures only one thread actually does write to it
// since the compare_exchange_weak will succeed for only one.
if(iwrite.compare_exchange_weak(idx, inext))
{
// OK, we've claimed exclusive access to the slot. We've also
// bumped the iwrite index so another writer thread can try
// writing to the next slot. Now we write to the slot.
if(EVENT_QUEUE[idx] != 0) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -1;} // Dummy check. This should NEVER happen!
EVENT_QUEUE[idx] = 1;
Nwritten++;
if(Nread>Nwritten) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -3;} // Dummy check. This should NEVER happen!
// The idx slot now contains valid data so bump the iend index to
// let reader threads know. Note: if multiple writer threads are
// in play, this may spin waiting for another to bump iend to us
// before we can bump it to the next slot.
uint32_t save_idx = idx;
while(!iend.compare_exchange_weak(idx, inext)) idx = save_idx;
}
}
lock_guard<mutex> lck(MTX);
cout << "WriteThreadATOMIC done" << endl;
}
//-------------------------------
// ReadThreadATOMIC
//-------------------------------
void ReadThreadATOMIC(void)
{
MTX.lock();
MTX.unlock();
while( Nread < Nevents ){
uint32_t idx = iread;
if(idx == iend) {
// Queue is empty
continue;
}
uint32_t inext = (idx + 1) % MAX_EVENTS;
// At this point it looks like slot "idx" is available to read from.
// The next call ensures only one thread actually does read from it
// since the compare_exchange_weak will succeed for only one.
if( iread.compare_exchange_weak(idx, inext) )
{
// Similar to above, we now have exclusive access to this slot
// for reading.
if(EVENT_QUEUE[idx] != 1) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -2;} // Dummy check. This should NEVER happen!
EVENT_QUEUE[idx] = 0;
Nread++;
if(Nread>Nwritten) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -4;} // Dummy check. This should NEVER happen!
// Bump ibegin freeing idx up for writing
uint32_t save_idx = idx;
while(!ibegin.compare_exchange_weak(idx, inext)) idx = save_idx;
}
}
lock_guard<mutex> lck(MTX);
cout << "ReadThreadATOMIC done" << endl;
}
//-------------------------------
// main
//-------------------------------
int main(int narg, char *argv[])
{
int Nwrite_threads = 4;
int Nread_threads = 4;
for(int i=0; i<MAX_EVENTS; i++) EVENT_QUEUE[i] = 0;
MTX.lock(); // Hold off threads until all are created
// Launch writer and reader threads
vector<std::thread *> atomic_threads;
for(int i=0; i<Nwrite_threads; i++){
atomic_threads.push_back( new std::thread(WriteThreadATOMIC) );
}
for(int i=0; i<Nread_threads; i++){
atomic_threads.push_back( new std::thread(ReadThreadATOMIC) );
}
// Release all threads and wait for them to finish
MTX.unlock();
while( Nread < Nevents) {
std::this_thread::sleep_for(std::chrono::microseconds(1000000));
cout << "Nwritten: " << Nwritten << " Nread: " << Nread << endl;
}
// Join threads
for(auto t : atomic_threads) t->join();
}
Когда я обнаружил это в отладчике, это обычно происходит из-за неправильного значения в слоте EVENT_QUEUE.Иногда, хотя счет Nread превышает Nwritten, кажется, что это невозможно.Я не думаю, что мне нужен забор, потому что все атомное, но я не могу сказать в данный момент, так как я должен подвергнуть сомнению все, что, я думаю, я знаю.
Будем благодарны за любые предложения или идеи.