Как надежно связать поток с экземпляром объекта в параллелизме :: parallel_for? - PullRequest
0 голосов
/ 05 декабря 2018

У меня есть вектор из M изображений, который должен обрабатываться параллельно до N потоков, где N - это параметр, установленный пользователем.

У меня также есть вектор из N Detector экземпляров, которые занимаются обработкой, но каждый экземпляр должен выполняться в своем собственном потоке (то есть, если два потока вызывают detect() в одном и том же экземпляре перед предыдущимзвонок окончен, плохие вещи будут происходить).

Detector - это автономный класс (который я могу изменить при необходимости) с помощью одного метода void Detector::detect(cv::Mat image), который я вызываю и который изменяет внутреннее состояние детектора на время (длительного) процесса обнаружения (отсюда необходимость предотвращения параллельных вызовов detect() из разных потоков).

Первоначально я реализовал это с помощью OpenMP как:

#pragma omp parallel for num_threads(N)
for(int i=0; i<M; i++)
{
    detectors[omp_get_thread_num()].detect(images[i]);
}

Однако, поскольку обнаружение может генерировать исключения, я подумалвместо этого использовать parallel_for PPL, который идет с перехватом исключения потока в основном потоке.

Проблема в том, что я не могу найти эквивалент omp_get_thread_num, который я могу использовать для сопоставленияDetector для определенного потока:

concurrency::CurrentScheduler::Create( concurrency::SchedulerPolicy( 2, 
concurrency::MinConcurrency, 1, concurrency::MaxConcurrency, N ) );
concurrency::parallel_for(0, M, [&](int i)
{
    detectors[?????].detect(images[i]);
});
concurrency::CurrentScheduler::Detach(); // clear scheduler

Как я могу гарантировать, что один поток всегда использует один и тот же экземпляр из пула детекторов?Или, если это неправильный подход, как я могу отобразить выполнение detect() по пулу детекторов, которые у меня уже есть?

1 Ответ

0 голосов
/ 05 декабря 2018

По предложению @NathanOliver я решил использовать concurrent_queue для решения проблемы:

using namespace concurrency;
CurrentScheduler::Create( SchedulerPolicy( 2, 
MinConcurrency, 1, MaxConcurrency, N ) );
concurrent_queue<std::shared_ptr<Detector>> detectors_queue;
for(auto& det : obj->instances)
{
    detectors_queue.push(det);
}
parallel_for(0, M, [&](int i)
{
    std::shared_ptr<Detector> thread_det = nullptr;
    while(!detectors_queue.try_pop(thread_det))
    {
        wait(100);
    }
    thread_det->detect(images[i]);
    detectors_queue.push(thread_det);
});
CurrentScheduler::Detach(); // clear scheduler
...