std :: atomic vs статическая переменная для синхронизации потоков - PullRequest
0 голосов
/ 24 апреля 2018

У меня есть несколько потоков, которые работают с несколькими элементами данных.Потоки должны выводить результаты в том же порядке, в котором я передаю данные потокам.То есть:

Thread #1: give data - start processing
Thread #2: give data - start processing
Thread #3: give data - start processing
...
Thread #n: give data - start processing

Результаты должны быть получены в том же порядке, в котором данные были переданы потокам, независимо от того, какой поток завершил обработку первым.А именно:

Thread #1: put data 
Thread #2: put data
...

Чтобы различать потоки и управлять ими, я дал каждому из них идентификатор (0,1,2,...,n).Я использую идентификаторы для назначения данных каждому потоку, чтобы он мог их обработать.

for(int i=0; i<thread_count; i++)
    give_data(i); // i is id and the function knows where to get data from

Я хочу, чтобы потоки использовали общий токен, который определяет, какой поток должен получить результат.Все тела потоков идентичны, тело выглядит так:

while(true){
    auto data = get_data();
    result = process_data(data);
    while(token != this_id) spin;
    put_data(result); // this is a synchronized call 
    update_token(token);
}

Моя проблема связана с token.Сначала я попробовал нормальную ссылку (int & token), и она, очевидно, не может работать (и я этого не ожидал).Во всяком случае, я использовал статическую переменную, и потоки не всегда получают самую последнюю.Я был удивлен, увидев, что одна нить доминирует во всем.Всякий раз, когда поток обновляет токен, он теряет свой ход, позволяя другому потоку выставить свой результат и так далее.Однако у меня доминировал один поток, как если бы токен всегда устанавливался на свой собственный идентификатор и не обновлялся.

Если бы мне пришлось угадывать, я бы сказал, что это проблема с кэшированием.Однако я не уверен.

В любом случае, я думаю об использовании std::atomic<int> в качестве своего токена.Будет ли это работать?Если нет, что еще я должен рассмотреть делать?Что может быть лучше для синхронизации этих потоков?

Дополнительно: это похоже на плохой дизайн, и я не уверен, как это сделать лучше.Любые предложения будут очень признательны.

Ответы [ 2 ]

0 голосов
/ 24 апреля 2018

Макс отвечает отлично.Если бы у меня была возможность использовать OpenMP, учитывая время, я бы это сделал.Тем не менее, я не поэтому пишу этот ответ на мой вопрос.

В моем предыдущем проекте это зависело от потоков для синхронизации друг с другом, и это не кажется лучшей идеей, так какможет пойти не так.Вместо этого я решил позволить менеджеру синхронизировать их результаты (идея пришла из последнего фрагмента кода Макса).

void give_threads_data(){
    vector<pair<data, promise<result>*> promises(threads.size());
    vector<future<result>> futures(threads.size());
    for(int i=0; i<threads.size(); i++){
        data d = get_data();
        threads[i].put_data(d, promises[i]);
        futures[i] = promises[i].get_future();
    }

    for(int i=0; i<futures.size(); i++){
        result = futures[i].get();
        // handle result
    }
}

Таким образом, я смог получить результаты так же, как я отправил их в цепочку.Тело нити стало намного чище:

void thread_body(){
    while(true){
        pair<data, promise<result>*> item = queue.get(); // blocking call
        data d = item.first;
        promise<result>* promise = item.second;

        result r = process_data(d);
        promise->set_value(r);
    }
}

Нет игры, и результаты превосходны.В следующий раз, когда я буду заниматься многопоточностью, я рассмотрю OpenMP.

0 голосов
/ 24 апреля 2018

В любом случае, я использовал статическую переменную, и потоки не всегда получают последнюю.Я был удивлен, увидев, что один поток доминирует над всем

Да, несколько потоков, обращающихся к одному и тому же несинхронизированному значению, по крайней мере, один из них записывает в него, это гонка данных , которая не определенаповедение в соответствии со стандартом C ++.Все может произойти.

Я думаю об использовании std :: atomic в качестве моего токена.Будет ли это работать?

Да.Это предотвратит любую гонку данных на токене.Я не вижу никакой другой прямой проблемы в вашем псевдокоде, поэтому с этой точки зрения это выглядит хорошо.

это похоже на плохой дизайн, и я не уверен, как это сделать лучше.Любые предложения будут очень признательны.

Весь дизайн выглядит несколько странно, но это зависит от вашей библиотеки потоков, если есть более простой способ выразить это.Например, с OpenMP вы могли бы сделать это за один проход (логика give_data и get_data слишком неясна, чтобы сделать это полным):

#pragma omp parallel
{
    int threadCount = omp_get_num_threads();
#pragma omp single
    for (int i = 0; i < threadCount; ++i)
        give_data(i);

#pragma omp ordered for ordered schedule(static)
    for (int i = 0; i < threadCount; ++i)
    {
        auto data = get_data();
        result = process_data(data);
#pragma omp ordered
        put_data(result); // this is a synchronized call 
    }
}

Директива ordered заставляет put_data вызовы должны выполняться точно в том же порядке (один за другим), как если бы цикл был последовательным, в то время как потоки все еще могут выполнять предварительную обработку данных параллельно.

В действительности все может быть еще прощеOpenMP, если все, что вы действительно хотели сделать, это сделать один большой цикл обработки данных параллельно с упорядоченными записями:

#pragma omp parallel for ordered schedule(static)
for (int i = 0; i < dataItemCount; ++i)
{
    auto data = get_data(i); // whatever this would entail
    auto result = process_data(data);
#pragma omp ordered
    put_data(result); // this is a synchronized call    
}

Не похоже, что вам требуется распределение элементов данных, чтобыпо порядку, но если вы действительно это сделаете, то этот подход не будет работать так же легко, потому что вы можете иметь только один упорядоченный раздел на упорядоченный цикл.

...