Как поместить существующую функцию в собственный непрерывный (циклический) поток и передать ей обновленные параметры? - PullRequest
1 голос
/ 14 мая 2019

У меня есть консольное приложение OpenCV, которое принимает кадры камеры из порта и отображает их на экране, с дополнительными процедурами обработки изображений, которые сначала выполняются для них. Цикл main () является непрерывным, то есть использует while (true), и на каждом проходе он получает новое изображение, которое ожидает его размещения в Mat. Мне нужно поддерживать как минимум 30 кадров в секунду в main (), чтобы входящие кадры не отбрасывались.

Обычно это не проблема, если у меня нет интенсивной обработки, но когда я это сделаю, я бы хотел хотя бы разгрузить некоторые из более простых подпрограмм в свои собственные потоки, чтобы они не загружали линейное время ЦП. Они могут работать независимо, «захватывая» любой кадр и воздействуя на него, и отображать свои результаты асинхронно с main (). Например, подпрограмма гистограммы и подпрограмма, которая вычисляет глобальные значения регулировки контрастности / смещения.

Я видел простой пример, использующий синтаксис , где 3 подпрограммы запускаются в своих собственных потоках в main (), они работают независимо, затем все воссоединяются в конце main (), затем выполнение останавливается. Этот пример приведен ниже, и я включил метод 1 в свое приложение, поскольку он выглядит наиболее простым. (Понятия не имею, что такое лямбда).

// CPP program to demonstrate multithreading using three different callables.
#include <iostream>
#include <thread>
using namespace std;

// A dummy function
void foo(int Z)
{
  for (int i = 0; i < Z; i++) {
    cout << "Thread using function pointer as callable\n";
  }
}

// A callable object
class thread_obj {
public:
  void operator()(int x)
  {
    for (int i = 0; i < x; i++)
      cout << "Thread using function object as callable\n";
  }
};

int main()
{
  cout << "Threads 1 and 2 and 3 operating independently" << endl;

  // This thread is launched by using function pointer as callable
  thread th1(foo, 3);

  // This thread is launched by using function object as callable
  thread th2(thread_obj(), 3);

  // Define a Lambda Expression
  auto f = [](int x) {
    for (int i = 0; i < x; i++)
      cout << "Thread using lambda expression as callable\n";
  };

  // This thread is launched by using lamda expression as callable
  thread th3(f, 3);

  // Wait for the threads to finish
  // Wait for thread t1 to finish
  th1.join();

  // Wait for thread t2 to finish
  th2.join();

  // Wait for thread t3 to finish
  th3.join();

  return 0;
}

Для однопроходного потока я могу сделать выше, и это имеет смысл. Но мне нужен поток, который вызывается и получает несколько динамических входных параметров (включая Mat и некоторые значения int / double / bool), которые обычно меняются при каждом цикле вокруг main (). Я хочу, чтобы поток оставался открытым и был повторно запущен в каждом цикле и приводил к новым результатам. Эти результаты могут оставаться в потоке (как в случае гистограммы, которая отображается только в независимом окне), или они могут состоять из нескольких вычисленных значений, которые передаются обратно в main () для использования в другом месте (как в процедуре контрастности / смещения). Но ни в коем случае не нужно синхронизировать результаты с тем, что делает main ().

Например, подпрограмма с гистограммой с резьбой может получить изображение Mat, некоторые значения масштабирования и некоторые логические элементы управления, которые говорят ему начать обработку данных изображения в гистограмму и отображать ее с помощью imshow ().

Если я поместил всю подпрограмму гистограммы в цикл while (true), то поток никогда не завершится, но тогда мне нужно, чтобы он видел изменения параметров, чтобы он выполнялся снова, и я не знаю, как это сделать. , Я предполагаю, что использовать глобальные переменные не нужно и не элегантно, плюс я не уверен насчет пересечения потоков. Также меня смущает вопрос о том, должен ли я передавать в него литералы или использовать указатели / адреса (* и &) для минимизации перестановки памяти.

Есть ли у кого-нибудь простой и понятный пример такого поведения? Предпочтительно использовать , если это вообще возможно. Я не опытный программист, поэтому, пожалуйста, не углубляйтесь в эзотерику или жаргон, который понимают только разработчики. Большое спасибо!

Ответы [ 4 ]

0 голосов
/ 16 мая 2019

Я выложу новый ответ в ответ на ваши дополнения.Вы должны добавлять свои изменения в вопрос, а не в ответ, чтобы люди, которые попадают на эту страницу, были менее запутанными.

Есть много вопросов, которые вы спрашиваете о многопоточных средах, на которые нет простых ответов,То, о чем вы говорите, является сложной проблемой.Вы хотите, чтобы некоторое количество потоков обрабатывало информацию, которая обновляется исходным потоком.Затем вы хотите, чтобы иметь возможность поделиться этой информацией обратно в исходную ветку.

Во-первых, detach() не гарантирует, что поток очищается или остановлен.Что он делает, так это позволяет thread объекту выйти из области видимости (уничтожить), в то же время позволяя фактическому потоку продолжить выполнение.Это может быть очень опасно с запущенными потоками, которые никогда не останавливаются среди других проблем.

Во-вторых, для видимости переменных и использования объекта потока поток может видеть переменные, так как он принимает ссылку в качестве своих параметров.Используемый вами конструктор потока принимает функцию и параметры, необходимые для этой функции.В ответе Фареанора функции принимают ссылку.Переменная находится в области действия исходного потока main(), и функция, выполняемая этим потоком, имеет ссылку на эту переменную.Вот почему оба потока могут видеть и обновлять эту переменную.Нет необходимости «передавать его обратно» как main(), и поток имеет ту же переменную.Хотя это приведет к условиям гонки.Что происходит, когда один поток находится в процессе записи в переменную, а другой читает его?

Существует множество проблем, которые могут возникнуть при использовании потоков таким способом без синхронизации.Это действительно стоит усилий?Если это так, вам следует провести некоторое исследование и поиграть с хорошими примерами многопоточных приложений.

Подводя итог, основная проблема, с которой вы столкнетесь (исключая вышеупомянутые условия гонки), заключается в том, что вы не можете контролировать то, чтокаждый поток выполняется и когда.Ваша ОС решит это с помощью планировщика и переключения контекста.Нет гарантии, что порожденный поток выполнит свой код способом, совпадающим с вашим main потоком.Например, ваше исполнение может выглядеть следующим образом.

Legend: - queued for execution, * Executing, R read, W write
MAIN: ****W*****W*****W-------------------***R***R***R***
T1  : -----------------***R**----------------------------
T2  : -----------------------**R***R**W------------------
T3  :----------------------------------------------------

Main может выполнять цикл несколько раз и записывать переменные несколько раз, прежде чем другие что-либо сделают.Поток может прочитать значение переменной, но затем его выполнение ставится в очередь, прежде чем он получает возможность записи (T1).Поток может прочитать переменную дважды, но она не была обновлена ​​на main между чтениями (T3).Мы можем иметь поток, который не выполняется в течение определенного периода времени (T4).Если у вас есть независимые потоки без синхронизации, ничего не гарантируется о порядке выполнения и объеме выполнения.

0 голосов
/ 14 мая 2019

Я думаю, что есть некоторая путаница с тем, что именно потоки делают в вашем примере и что вы хотите сделать.

Просто для пояснения, у вас есть метод main(), который вызывается неоднократно, вчто вам нужно сделать некоторую обработку?Вы хотели бы поместить эту обработку в потоки, которые (для повышения скорости?) Полагаются на некоторые данные, которые изменяются каждый раз, когда вызывается main()?

Если это так, то использование потоков таким способом все равно будет "время загрузки линейного ЦП" .Это потому, что join() будет ждать окончания потоков.Таким образом, в этом примере три потока отправляются для выполнения некоторой работы.Выполнение в main() будет приостановлено до завершения трех потоков.Я попытаюсь проиллюстрировать это на следующем рисунке.

--- main thread -----|-----|-----|--wait^--wait^ --wait^------return.
                     **T1**|*****|return^      ^       ^
                           **T2**|*******return^       ^
                                 ***T3***********return^

Этот шаблон все еще полезен для быстрой обработки большого количества информации.Три потока создаются (перезапускаются или как угодно) при каждом вызове main() и используются для параллельной обработки информации.Потоки также должны завершать каждый основной вызов.Это общий шаблон для обработки графики / информации.

Итак, для того, что вы хотите сделать, и используйте метод один для потоков, просто создайте функцию для каждого отдельного потока, который вы хотите порождать (с любыми типами параметров, которые вам нужны), затем создайте поток с этой функцией и передайте ему параметры, которые необходимо обработать.

Если вам нужно обмениваться результатами между потоками или обратно, то это еще одна банка червей.Вам нужно провести некоторое исследование мьютексов или даже фьючерсов или других способов передачи информации с потоками, чтобы избежать условий гонки

Если вам нужно порождать отдельные потоки, которыене объединяются в основном и работают независимо, тогда это еще одна большая банка червей.Я думаю, что предоставленный шаблон более полезен, чем наличие нескольких отдельных потоков.

0 голосов
/ 15 мая 2019

Я обнаружил операцию .detach () и нашел что-то, что, кажется, работает, по крайней мере, когда я запускаю весь свой код, я получаю, казалось бы, очень стабильную работу, я не замечаю разгон памяти или использование процессора, и все открывается / закрывается / завершается без ошибок. Я поместил псевдокод ниже, чтобы продемонстрировать его суть.

Обеспечивает ли операция detach (), что каждый поток уничтожает себя после завершения? Я не горжусь использованием глобалов, но это первая попытка, которая пока позволяет мне работать на полной скорости (30 кадров в секунду). Я заметил, что с помощью этих потоков мое общее время вокруг цикла main () значительно сокращается, так что теперь во времени преобладает ожидание готовности следующего изображения с веб-камеры, как и ожидалось.

//  ...

// globals
bool g_histodone = true;
string g_histwindow = "histogram";
Mat g_webcamImage;


// thread routine
void histogram(Mat Image, double offset, double gain, bool agc)
{
  Mat scaledImage;

  if (agc) {
    resizedImage.convertTo(scaledImage, -1, 1, offset);
    resizedImage.convertTo(scaledImage, -1, gain);
  }
  else
    resizedImage.copyTo(scaledImage);

  // compute histogram image
  // ...

  // display it
  imshow(g_histwindow, histImage);

  // notify main()
  g_histodone = true;
}


void get_webcam_image(void)
{
  // put images into g_webcamImage (actually a group of rotating buffers) as they come in from webcam
}


int main(int argc, const char *argv[])
{
  bool agc = true;
  bool histo = false;
  bool histwindow_not_destroyed = false;
  double offset = 0;
  double gain = 1;
  Mat localImage;

  thread th_capture = thread(get_webcam_image); // start thread to get webcam images
  thread th_histo = thread(histogram, localImage, offset, gain, agc); // start thread for histogram display

  while (true) {

    // wait here to grab the next camera image from buffers and put it into a local Mat
    g_webcamImage.copyTo(localImage);


    // do some optional processing to determine gain/offset (would be its own thread too)
    if (agc) {
      offset = result 1 from agc thread
      gain = result 2 from agc thread

      // apply scaling to the Mat
      localImage.convertTo(localImage, -1, 1, offset);
      localImage.convertTo(localImage, -1, gain);
    }

    // compute and show histogram
    if (histo) {
      if (g_histodone) { // previous thread is finished, start another
        g_histodone = false;
        if (th_histo.joinable())
          th_histo.join();
        th_histo = thread(histogram, localImage, offset, gain, agc);
      }
    }
    // clean up
    else if (histwindow_not_destroyed) {
      destroyWindow(g_histwindow);
      histwindow_not_destroyed = false; // handshaking
    }

    // ...

    // get user keyboard input
    if (key-to-toggle-histogram-pressed) {
      histo = !histo;
      if (histo)
        namedWindow(g_histwindow, WINDOW_AUTOSIZE); // only want window to appear when enabled
      else
        histwindow_not_destroyed = true; // handshaking
    }
    else if (key-to-toggle-agc-pressed)
      agc = !agc;
    else if (key-to-terminate-app-pressed) {
      th_capture.detach(); // terminate the webcam capture thread
      if (th_histo.joinable())
        th_histo.join(); // terminate the histo thread
      return 0;
    }

  }

}

Что касается подхода Фареанора, он выглядит лучше, так как он передает значения в один поток, как я и собирался, но не знал точно, как это сделать; Я не знал потока, если объявленный вне цикла while () мог автоматически видеть переменные при их обновлении (это потому, что поток использует адрес, а не буквальное значение?) Я догадывался, что переменные передаются статически только при время, когда поток был объявлен. Так что приятно знать, я попробую.

Любой совет, как получить переменные, передаваемые из потока в main (), такие как мои вычисленные значения усиления и смещения agc? Если я объявлю структуру, содержащую два двойных числа, то должно ли это работать?

thread th_agc (agcparams = calc_agc, localImage);

где calc_agc - моя подпрограмма типа struct_agcparams для потока, localImage - это Mat, а agcparams содержит двойники усиления / смещения, которые я пытаюсь передать обратно.

или это так?

agcparams = thread th_agc (calc_agc, localImage);

0 голосов
/ 14 мая 2019

Я не уверен, хорошо ли я понял, что вы хотите сделать, но, думаю, вы хотите запускать потоки, значения параметров которых обновляются на каждой "итерации", верно?

Если это так, вы можете объявить параметры в вашей функции main() и передать их по ссылке на ваши потоки. Функция main() выполнит обновление, и потоки примут это во внимание.

Что-то в этом роде:

//...
void foo (int &a, bool &stop) // pass by reference
{
    while(!stop)
    {
        // Do something with a
    }
}
void bar(char &c, bool &b, bool &stop) // pass by reference too
{
    while(!stop)
    {
        // Do something with c and b
    }
}
//...
int main()
{
    std::atomic <bool> stop_threads(false);

    // declare your parameters (and initialize them the way you want)
    int param1(0);
    char param2('a');
    bool param3(false);

    // launch your threads
    thread th1(foo, std::ref(param1), std::ref(stop_threads));
    thread th2(bar, std::ref(param2), std::ref(param3), std::ref(stop_threads));

    // Update the parameters in your while loop
    while(/* Your stop condition */)
    {
        // Update the value of param1, param2 and param3
        // ...
    }

    // When exiting the while loop, the process is ended (no more data to process)
    stop_threads = true;

    th1.join();
    th2.join();

    return 0;
}

Я добавил дополнительные boolean в параметры каждой функции, которая должна запускаться в разных потоках, чтобы они правильно останавливались в конце.

Надеюсь, это то, что вы искали :)
Не стесняйтесь, сообщите мне, если я неправильно понял вашу проблему.


EDIT:

Я забыл упомянуть очень важную вещь. Каждый раз, когда я писал «Сделайте что-нибудь с этой переменной ...» или «Обновите значение этой переменной», вам нужно использовать механизм взаимного исключения общих переменных (взаимное исключение), чтобы избежать доступа разных потоков к одной и той же переменной в в то же время. И я изменил bool stop_threads на std::atomic <bool> stop_threads, чтобы избежать состояния гонки с флагом остановки, общим для всех потоков.

...