Требует ли использование мьютексов или атомиков специальной обработки при запуске потока? - PullRequest
0 голосов
/ 12 апреля 2019

Я пытаюсь запустить несколько потоков, в которых каждый поток работает с элементом, на который ссылается общий индекс, в общий вектор. Каждый поток запускается со ссылкой на общий вектор и индекс, и некоторая форма синхронизации используется для управления обновлением индекса, так что каждый поток работает с уникальным индексом и все индексы в векторе в конечном итоге покрываются. Кажется, что все работает для всех показателей, кроме первых нескольких. Если запущено 4 потока, первые 3 индекса последовательно пропускаются. Либо я делаю что-то не так с синхронизацией, либо есть что-то особенное в запуске потока, которая делает первые попытки синхронизации неудачными.

Я использую Visual Studio 2017, набор инструментов c ++ v141 и SDK версии 10.0.17134.0. Также я использую стандартную библиотеку lib, а не boost (или что-то еще).

Вот код в главном ...

vector<string> dirs;
string ipath;
//dirs and ipath get filled in with some valid path strings...
const int64_t maxthreads = 4;
vector<thread> mythreads;
atomic<int64_t> common_index{-1};
for(int64_t i = 0l i < maxthreads; ++i)
{ 
 mythreads.push_back(thread(runthreads,ref(dirs),cref(ipath),ref(common_index)));
}
for(int64_t i = 0; i < maxthreads; ++i)
{
 mythreads[i].join();
}

а вот код внутри runthreads

void runthreads(vector<string>& dirs, const string& ipath, atomic<int64_t>& cindex)
{
   try
   {
      const int64_t max_index = dirs.size();
      while(true)
      {
        int64_t use_index;
        int64_t tindex = cindex.load();
        do
        {
           use_index = tindex + 1;
        } while (!cindex.compare_exchange_weak(tindex, use_index));

        if(use_index < max_index)
        {
          string use_path = ipath + dirs[use_index];
          //do work on use_path
        }
        else
        {
          break;
        }
      }
   }
   catch (...)
   {
   }
}

Идея состояла в том, чтобы каждый поток мог получить текущее значение индекса, установить желаемое новое значение (с приращением на единицу), попытаться обновить общий индекс атомарным способом, но только в том случае, если текущее значение соответствует ожидаемому значению. В случае успеха сохраненное значение use_index должно быть текущим индексом для работы, и код продолжается. Я ожидаю, что каждый индекс будет покрыт, но первые 3 (когда запущено 4 потока) пропускаются. Я не понимаю, что не так с подходом.

Я попытался использовать глобальный атомарный элемент для индекса вместо передачи ссылки, и я также попытался просто использовать неатомарный индекс и заключить обновление индекса в блок с мьютексом и lock_guard. Я также попытался добавить что-то вроде atomic_thread_fence, хотя мне кажется, что мне это не нужно, поскольку атомарные операции, которые я использую по умолчанию, имеют memory_order_seq_cst. В конечном счете, я всегда веду себя одинаково ... пропуская первые три индекса.

Если я добавлю паузу между запуском новых потоков, например, добавив

thread::sleep_for(50ms);

после строки создания потока эти начальные индексы обрабатываются. Я предполагаю, что наиболее вероятная проблема заключается в том, что я что-то неправильно понимаю в своей реализации, но я не знаю, что это такое. В противном случае мне интересно, требуется ли специальная обработка при запуске потока (что-то кроме вставки задержек с помощью «sleep_for»), чтобы эти подходы к синхронизации работали правильно.

...