Как я могу запустить определенную функцию потока асинхронно в C / C ++? - PullRequest
1 голос
/ 11 сентября 2010

Настройка производительности: запись данных в несколько каналов

Сейчас я делаю это в одном потоке:

for(unsigned int i = 0; i < myvector.size();)
{
    tmp_pipe = myvector[i];
    fSuccess = WriteFile( tmp_pipe, &Time, sizeof(double), &dwWritten, NULL );
    if(!fSuccess)
    {
        myvector.erase(myvector.begin()+i);
        printf("Client pipe closed\r\n");
        continue;
    }
    fSuccess = WriteFile( tmp_pipe, &BufferLen, sizeof(long), &dwWritten, NULL );
    if(!fSuccess)
    {
        myvector.erase(myvector.begin()+i);
        printf("Client pipe closed\r\n");
        continue;
    }
    fSuccess = WriteFile( tmp_pipe, pBuffer, BufferLen, &dwWritten, NULL );
    if(!fSuccess)
    {
        myvector.erase(myvector.begin()+i);
        printf("Client pipe closed\r\n");
        continue;
    }
    i++;
}

В результате первый pipe получает данные быстрее, а последний pipe медленнее.

Я думаю о том, чтобы сделать это в отдельных потоках, поэтому каждый pipe одинаково обрабатывается.

Но как я могу асинхронно запускать определенную функцию потока (основной поток должен немедленно получить возврат) в c / c ++?

Ответы [ 5 ]

2 голосов
/ 11 сентября 2010

Вы можете использовать функцию CreateThread для создания новой резьбы и передачи дескриптора трубы в качестве параметра функции резьбы:

DWORD PipeThread(LPVOID param) {
  HANDLE hPipe = (HANDLE)param;
  // Do the WriteFile operations here
  return 0;
}

for(unsigned int i = 0; i < myvector.size(); i++)
  CreateThread(NULL, 0, PipeThread, myvector[i], 0, NULL);

Обратите внимание, что векторный класс не является поточно-ориентированным, поэтому вы столкнетесь с проблемами с myvector.erase, если не синхронизируете доступ к ним, например. используя критическую секцию.


Обновление: Поскольку вы упомянули высокую частоту, вы можете использовать порты завершения ввода / вывода вместо отдельного потока для каждой трубы. Затем вы можете использовать перекрывающийся ввод-вывод с WriteFile для асинхронной записи, и вы можете иметь только один дополнительный поток, который прослушивает завершение записи:

// Initial setup: add pipe handles to a completion port
HANDLE hPort = CreateCompletionPort(myvector[0], NULL, 0, 1);
for (unsigned int i = 1; i < myvector.size(); i++)
  CreateCompletionPort(myvector[i], hPort, 0, 0);

// Start thread
CreateThread(NULL, 0, PipeThread, hPort, 0, NULL);

// Do this as many times as you want
for(unsigned int i = 0; i < myvector.size(); i++) {
  OVERLAPPED *ov = new OVERLAPPED;
  ZeroMemory(ov, sizeof ov);
  WriteFile(myvector[i], buffer, size, NULL, ov);
  // If pipe handle was closed, WriteFile will fail immediately
  // Otherwise I/O is performed asynchronously
}

// Close the completion port at the end
// This should automatically free the thread
CloseHandle(hPort);

---

DWRD PipeThread(LPVOID param) {
  HANDLE hPort = (HANDLE)param;
  DWORD nBytes;
  ULONG_PTR key;
  LPOVERLAPPED ov;

  // Continuously loop for I/O completion
  while (GetQueuedCompletionStatus(hPort, &nBytes, &key, &ov, INFINITE)) {
    if (ov != NULL) {
      delete ov;
      // Do anything else you may want to do here
    }
  }

  return 0;
}
0 голосов
/ 14 сентября 2010

Чтобы конкретно ответить на ваш вопрос "как я могу асинхронно запускать определенную функцию потока (основной поток должен немедленно получить возврат) в c / c ++?"

Вы можете сделать это легко, разделив два действия. Создайте рабочий пул потоков, инициализированный каналом, с которым он должен общаться, и напишите функцию, которая будет планировать новое задание для этих потоков. Это будет мгновенно по сравнению с вашими записями канала, и все потоки получат свою работу и начнут запись в каналы, которыми он управляет, в то время как ваш основной поток может продолжить свою работу.

Это будет простое решение, если число ваших клиентов невелико. в противном случае создание одного потока для каждого клиента не будет сильно масштабироваться после точки, и ваш сервер будет перегружен большим количеством переключений контекста и конфликтами потоков.

В этом сценарии для очень серверного дизайна вы должны серьезно подумать над решением Касабланки. Он создает только один поток для прослушивания уведомлений о завершении и является наиболее эффективной версией сервера в win 2003 для создания серверов в Windows.

0 голосов
/ 12 сентября 2010

Я бы подумал о том, чтобы подготовить данные таким образом, чтобы уменьшить количество вызовов ввода-вывода. Зная, что данные записываются наиболее эффективным способом с наименьшим количеством возможных вызовов ввода-вывода, я рассмотрел бы использование асинхронного ввода-вывода. Если производительность все еще недостаточно высока, подумайте о добавлении дополнительных нитей в дизайн.

Один из способов уменьшить количество записей - это использовать структуру, объединяющую все данные, чтобы вместо трех можно было использовать одну запись. Прагма понадобится, чтобы избавиться от любых дополнительных отступов / выравниваний, которые может добавить компилятор.

#pragma pack(push,1)
struct PipeData {
   double _time;
   long _buffer_len;
   char* _buffer;
};
#pragma pack(pop)

PipeData data;
int data_len = sizeof(double) + sizeof(long) + <yourbufferlen>;

for(unsigned int i = 0; i < myvector.size();)
{
    tmp_pipe = myvector[i];
    fSuccess = WriteFile( tmp_pipe, &data, data_len, &dwWritten, NULL );
    if(!fSuccess)
    {
        myvector.erase(myvector.begin()+i);
        printf("Client pipe closed\r\n");
        continue;
    }
    i++;
}
0 голосов
/ 11 сентября 2010

Это именованные трубы? Если это так, вы можете использовать FILE_FLAG_OVERLAPPED при их создании, что позволяет выполнять асинхронную запись без обработки потоков. Вот пример из MSDN .

Если это анонимные каналы, перекрывающийся ввод-вывод не поддерживается , поэтому это может быть хорошей причиной для переключения на именованные каналы.

Другим вариантом может быть поставить в очередь рабочий элемент для каждой записи, но это не гарантирует, что все три записи будут выполняться одновременно.

0 голосов
/ 11 сентября 2010

Есть ли у вас writev() в наличии? Если это так, вы можете сократить три операции записи до одной на канал, что будет более эффективным. Это также немного упрощает обработку ошибок, но вы можете свернуть то, что вам нужно:

for (unsigned int i = 0; i < myvector.size(); i++)
{
    tmp_pipe = myvector[i];
    if (!WriteFile(tmp_pipe, &Time,      sizeof(double), &dwWritten, NULL) ||
        !WriteFile(tmp_pipe, &BufferLen, sizeof(long),   &dwWritten, NULL) ||
        !WriteFile(tmp_pipe, pBuffer,    BufferLen,      &dwWritten, NULL))
    {
        myvector.erase(myvector.begin()+i);
        printf("Client pipe closed\r\n");
    }
}

Это проще читать по-разному, потому что обработка ошибок составляет 1/3, поэтому операционный код менее скрыт.

Конечно, вы все равно хотели бы обернуть этот код в многопоточный код, поэтому операции записи будут обрабатываться отдельными потоками. Вы бы организовали для каждой нити свою трубу; они делят доступ только для чтения к времени и буферу. Каждый поток будет писать и возвращать статус по завершении. Родительский поток будет ожидать каждого из дочерних потоков, и если дочерний поток сообщит о сбое, соответствующий канал клиента будет удален из вектора. Поскольку только родительский поток будет манипулировать вектором, проблем с потоками здесь не возникает.

В общих чертах:

 for (i = 0; i < myvector.size(); i++)
      tid[i] = thread create (myvector[i], write_to_pipe);
 for (i = 0; i < myvector.size(); i++)
 {
      status = wait for thread(tid[i]);
      if (status != success)
           myvector.erase(...);
 }

Массив (или вектор) tid содержит идентификаторы потоков. Функция write_to_pipe() является основной функцией потока; он выполняет запись в канал, который ему передают, и выходит с соответствующим статусом.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...