Я пытаюсь запустить несколько потоков, в которых каждый поток работает с элементом, на который ссылается общий индекс, в общий вектор. Каждый поток запускается со ссылкой на общий вектор и индекс, и некоторая форма синхронизации используется для управления обновлением индекса, так что каждый поток работает с уникальным индексом и все индексы в векторе в конечном итоге покрываются. Кажется, что все работает для всех показателей, кроме первых нескольких. Если запущено 4 потока, первые 3 индекса последовательно пропускаются. Либо я делаю что-то не так с синхронизацией, либо есть что-то особенное в запуске потока, которая делает первые попытки синхронизации неудачными.
Я использую Visual Studio 2017, набор инструментов c ++ v141 и SDK версии 10.0.17134.0.
Также я использую стандартную библиотеку lib, а не boost (или что-то еще).
Вот код в главном ...
vector<string> dirs;
string ipath;
//dirs and ipath get filled in with some valid path strings...
const int64_t maxthreads = 4;
vector<thread> mythreads;
atomic<int64_t> common_index{-1};
for(int64_t i = 0l i < maxthreads; ++i)
{
mythreads.push_back(thread(runthreads,ref(dirs),cref(ipath),ref(common_index)));
}
for(int64_t i = 0; i < maxthreads; ++i)
{
mythreads[i].join();
}
а вот код внутри runthreads
void runthreads(vector<string>& dirs, const string& ipath, atomic<int64_t>& cindex)
{
try
{
const int64_t max_index = dirs.size();
while(true)
{
int64_t use_index;
int64_t tindex = cindex.load();
do
{
use_index = tindex + 1;
} while (!cindex.compare_exchange_weak(tindex, use_index));
if(use_index < max_index)
{
string use_path = ipath + dirs[use_index];
//do work on use_path
}
else
{
break;
}
}
}
catch (...)
{
}
}
Идея состояла в том, чтобы каждый поток мог получить текущее значение индекса, установить желаемое новое значение (с приращением на единицу), попытаться обновить общий индекс атомарным способом, но только в том случае, если текущее значение соответствует ожидаемому значению. В случае успеха сохраненное значение use_index должно быть текущим индексом для работы, и код продолжается. Я ожидаю, что каждый индекс будет покрыт, но первые 3 (когда запущено 4 потока) пропускаются. Я не понимаю, что не так с подходом.
Я попытался использовать глобальный атомарный элемент для индекса вместо передачи ссылки, и я также попытался просто использовать неатомарный индекс и заключить обновление индекса в блок с мьютексом и lock_guard. Я также попытался добавить что-то вроде atomic_thread_fence, хотя мне кажется, что мне это не нужно, поскольку атомарные операции, которые я использую по умолчанию, имеют memory_order_seq_cst. В конечном счете, я всегда веду себя одинаково ... пропуская первые три индекса.
Если я добавлю паузу между запуском новых потоков, например, добавив
thread::sleep_for(50ms);
после строки создания потока эти начальные индексы обрабатываются. Я предполагаю, что наиболее вероятная проблема заключается в том, что я что-то неправильно понимаю в своей реализации, но я не знаю, что это такое. В противном случае мне интересно, требуется ли специальная обработка при запуске потока (что-то кроме вставки задержек с помощью «sleep_for»), чтобы эти подходы к синхронизации работали правильно.