Попытка асинхронного ввода-вывода с потоками Win32 - PullRequest
4 голосов
/ 07 марта 2011

Я пишу ПО для последовательного порта для Windows. Для повышения производительности я пытаюсь преобразовать подпрограммы в асинхронный ввод-вывод. У меня есть код, и он работает довольно хорошо, но я начинающий в этом, и я хотел бы еще улучшить производительность программы. Во время стресс-тестов программы (т. Е. Передачи данных в / из порта как можно быстрее при высокой скорости передачи данных) загрузка ЦП становится довольно высокой.

Если у кого-то есть опыт асинхронного ввода-вывода и многопоточности в Windows, я был бы признателен, если бы вы взглянули на мою программу. У меня есть две основные проблемы:

  • Правильно ли реализован асинхронный ввод-вывод? Я нашел в сети довольно надежный источник, предлагающий передавать пользовательские данные в функции обратного вызова, реализовав собственную структуру OVERLAPPED со своими собственными данными в конце. Кажется, это работает просто отлично, но для меня это выглядит немного «хакерски». Кроме того, производительность программы не сильно улучшилась, когда я перешел из синхронного / опрошенного в асинхронный / обратный вызов, что заставляет меня подозревать, что я что-то делаю не так.

  • Разумно ли использовать STL std :: deque для буферов данных FIFO? Поскольку программа в настоящее время написана, я разрешаю получать только 1 байт данных за один раз, прежде чем они должны быть обработаны. Поскольку я не знаю, сколько данных я получу, это может быть бесконечное количество. Я предполагаю, что это 1 байт за раз приведет к вялому поведению за линиями deque, когда он должен распределять данные. И я не верю, что deque также является поточно-ориентированным (не так ли?) Если использование STL deque не является нормальным, есть ли какие-либо предложения для лучшего использования типа данных? Круговой кольцевой буфер на основе статического массива?

Любые другие отзывы о коде также приветствуются.


Последовательные подпрограммы реализованы таким образом, что у меня есть родительский класс под названием «Comport», который обрабатывает все, что связано с последовательным вводом / выводом. От этого класса я наследую другой класс под названием «ThreadedComport», который является многопоточной версией.

Класс ThreadedComport (соответствующие его части)

class ThreadedComport : public Comport
{
  private:

    HANDLE        _hthread_port;                 /* thread handle      */
    HANDLE        _hmutex_port;                  /* COM port access    */
    HANDLE        _hmutex_send;                  /* send buffer access */
    HANDLE        _hmutex_rec;                   /* rec buffer access  */

    deque<uint8>  _send_buf;
    deque<uint8>  _rec_buf;
    uint16        _data_sent;
    uint16        _data_received;

    HANDLE        _hevent_kill_thread;
    HANDLE        _hevent_open;
    HANDLE        _hevent_close;
    HANDLE        _hevent_write_done;
    HANDLE        _hevent_read_done;
    HANDLE        _hevent_ext_send;              /* notifies external thread */
    HANDLE        _hevent_ext_receive;           /* notifies external thread */

    typedef struct
    {
      OVERLAPPED       overlapped;
      ThreadedComport* caller;                  /* add user data to struct */
    } OVERLAPPED_overlap;

    OVERLAPPED_overlap _send_overlapped;
    OVERLAPPED_overlap _rec_overlapped;
    uint8*             _write_data;
    uint8              _read_data;
    DWORD              _bytes_read;

    static DWORD WINAPI _tranceiver_thread (LPVOID param);
    void                _send_data         (void);
    void                _receive_data      (void);
    DWORD               _wait_for_io       (void);

    static void WINAPI  _send_callback     (DWORD dwErrorCode,
                                            DWORD dwNumberOfBytesTransfered,
                                            LPOVERLAPPED lpOverlapped);
    static void WINAPI  _receive_callback  (DWORD dwErrorCode,
                                            DWORD dwNumberOfBytesTransfered,
                                            LPOVERLAPPED lpOverlapped);

};

Подпрограмма основного потока, созданная с помощью CreateThread ():

DWORD WINAPI ThreadedComport::_tranceiver_thread (LPVOID param)
{
  ThreadedComport* caller = (ThreadedComport*) param;

  HANDLE handle_array [3] =
  {
    caller->_hevent_kill_thread,                 /* WAIT_OBJECT_0 */
    caller->_hevent_open,                        /* WAIT_OBJECT_1 */
    caller->_hevent_close                        /* WAIT_OBJECT_2 */
  };

  DWORD result;

  do
  {
    /* wait for anything to happen */
    result = WaitForMultipleObjects(3,
                                    handle_array,
                                    false,       /* dont wait for all */
                                    INFINITE);

    if(result == WAIT_OBJECT_1 )                 /* open? */
    {
      do                                         /* while port is open, work */
      {
        caller->_send_data();
        caller->_receive_data();
        result = caller->_wait_for_io();         /* will wait for the same 3 as in handle_array above,
                                                    plus all read/write specific events */

      } while (result != WAIT_OBJECT_0 &&        /* while not kill thread */
               result != WAIT_OBJECT_2);         /* while not close port */
    }
    else if(result == WAIT_OBJECT_2)             /* close? */
    {
      ;                                          /* do nothing */
    }

  } while (result != WAIT_OBJECT_0);             /* kill thread? */

  return 0;
}

, который в свою очередь вызывает следующие три функции:

void ThreadedComport::_send_data (void)
{
  uint32 send_buf_size;

  if(_send_buf.size() != 0)                      // anything to send?
  {
    WaitForSingleObject(_hmutex_port, INFINITE);
      if(_is_open)                               // double-check port
      {
        bool result;

        WaitForSingleObject(_hmutex_send, INFINITE);
          _data_sent = 0;
          send_buf_size = _send_buf.size();
          if(send_buf_size > (uint32)_MAX_MESSAGE_LENGTH)
          {
            send_buf_size = _MAX_MESSAGE_LENGTH;
          }
          _write_data = new uint8 [send_buf_size];


          for(uint32 i=0; i<send_buf_size; i++)
          {
            _write_data[i] = _send_buf.front();
            _send_buf.pop_front();
          }
          _send_buf.clear();
        ReleaseMutex(_hmutex_send);


        result = WriteFileEx (_hcom,              // handle to output file
                              (void*)_write_data, // pointer to input buffer
                              send_buf_size,      // number of bytes to write
                              (LPOVERLAPPED)&_send_overlapped, // pointer to async. i/o data
                              (LPOVERLAPPED_COMPLETION_ROUTINE )&_send_callback);

        SleepEx(INFINITE, true);                 // Allow callback to come

        if(result == false)
        {
          // error handling here
        }

      } // if(_is_open)
    ReleaseMutex(_hmutex_port);
  }
  else /* nothing to send */
  {
    SetEvent(_hevent_write_done);                // Skip write
  }
}


void ThreadedComport::_receive_data (void)
{
  WaitForSingleObject(_hmutex_port, INFINITE);

    if(_is_open)
    {
      BOOL  result;

      _bytes_read = 0;
      result = ReadFileEx (_hcom,                  // handle to output file
                           (void*)&_read_data,     // pointer to input buffer
                           1,                      // number of bytes to read
                           (OVERLAPPED*)&_rec_overlapped, // pointer to async. i/o data
                           (LPOVERLAPPED_COMPLETION_ROUTINE )&_receive_callback);

      SleepEx(INFINITE, true);                     // Allow callback to come

      if(result == FALSE)
      {
        DWORD last_error = GetLastError();
        if(last_error == ERROR_OPERATION_ABORTED)  // disconnected ?
        {
          close();                                 // close the port
        }
      }
    }

  ReleaseMutex(_hmutex_port);
}



DWORD ThreadedComport::_wait_for_io (void)
{
  DWORD result;
  bool  is_write_done = false;
  bool  is_read_done  = false;

  HANDLE handle_array [5] =
  {
    _hevent_kill_thread,
    _hevent_open,
    _hevent_close,
    _hevent_write_done,
    _hevent_read_done
  };


  do /* COM port message pump running until sending / receiving is done */
  {
    result = WaitForMultipleObjects(5,
                        handle_array,
                        false,                     /* dont wait for all */
                        INFINITE);

    if(result <= WAIT_OBJECT_2)
    {
      break;                                       /* abort */
    }
    else if(result == WAIT_OBJECT_3)               /* write done */
    {
      is_write_done = true;
      SetEvent(_hevent_ext_send);
    }
    else if(result == WAIT_OBJECT_4)               /* read done */
    {
      is_read_done = true;

      if(_bytes_read > 0)
      {
        uint32 errors = 0;

        WaitForSingleObject(_hmutex_rec, INFINITE);
          _rec_buf.push_back((uint8)_read_data);
          _data_received += _bytes_read;

          while((uint16)_rec_buf.size() > _MAX_MESSAGE_LENGTH)
          {
            _rec_buf.pop_front();
          }

        ReleaseMutex(_hmutex_rec);
        _bytes_read = 0;

        ClearCommError(_hcom, &errors, NULL);
        SetEvent(_hevent_ext_receive);
      }
    }
  } while(!is_write_done || !is_read_done);

  return result;
}

Функции обратного вызова асинхронного ввода / вывода:

void WINAPI ThreadedComport::_send_callback (DWORD dwErrorCode,
                                             DWORD dwNumberOfBytesTransfered,
                                             LPOVERLAPPED lpOverlapped)
{
  ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;

  if(dwErrorCode == 0)                           // no errors
  {
    if(dwNumberOfBytesTransfered > 0)
    {
      _this->_data_sent = dwNumberOfBytesTransfered;
    }
  }


  delete [] _this->_write_data;                  /* always clean this up */
  SetEvent(lpOverlapped->hEvent);
}


void WINAPI ThreadedComport::_receive_callback (DWORD dwErrorCode,
                                                DWORD dwNumberOfBytesTransfered,
                                                LPOVERLAPPED lpOverlapped)
{
  if(dwErrorCode == 0)                           // no errors
  {
    if(dwNumberOfBytesTransfered > 0)
    {
      ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;
      _this->_bytes_read = dwNumberOfBytesTransfered;
    }
  }

  SetEvent(lpOverlapped->hEvent);
}

Ответы [ 3 ]

7 голосов
/ 07 марта 2011

Первый вопрос прост.Метод не хакерский;Вы владеете OVERLAPPED памятью и всем, что за ней следует.Лучше всего это описал Рэймонд Чен: http://blogs.msdn.com/b/oldnewthing/archive/2010/12/17/10106259.aspx

Вы могли бы ожидать улучшения производительности только в том случае, если у вас есть лучшие вещи в ожидании завершения ввода-вывода.Если все, что вы делаете, это SleepEx, вы увидите, что только CPU% снизится.Подсказка в названии «перекрывается» - она ​​позволяет перекрывать вычисления и ввод / вывод.

std::deque<unsigned char> может обрабатывать данные FIFO без больших проблем.Вероятно, он будет перерабатывать фрагменты размером 4 КБ (точное число, определенное с помощью обширного профилирования, все сделано для вас).

[править] Я рассмотрел ваш код немного дальше, и он кажется излишне сложным.Начнем с того, что одним из основных преимуществ асинхронного ввода-вывода является то, что вам не нужны все эти потоки.Потоки позволяют использовать больше ядер, но вы имеете дело с медленным устройством ввода-вывода.Достаточно даже одного ядра, , если не тратит все свое время на ожидание.И это именно то, для чего используется перекрывающийся ввод-вывод.Вы просто выделяете один поток на всю работу ввода-вывода для порта.Поскольку это единственный поток, ему не нужен мьютекс для доступа к этому порту.

OTOH, вы бы хотели, чтобы мьютекс вокруг deque<uint8> объектов, так как потоки производителя / потребителя не совпадают с потоком comport.

1 голос
/ 07 марта 2011

Я думаю, что ваш код имеет неоптимальный дизайн.

  • Я полагаю, вы разделяете слишком много структур данных со слишком большим количеством потоков.Я думаю, что вы должны поместить всю обработку ввода-вывода последовательного устройства для одного порта в один поток и поместить синхронизированную очередь команд / данных между потоком ввода-вывода и всеми клиентскими потоками.Попросите поток ввода-вывода следить за командами / данными в очереди.

  • Вы, кажется, выделяете и освобождаете некоторые буферы для каждого отправленного события.Избегайте этого.Если вы храните все операции ввода-вывода в одном потоке, вы можете повторно использовать один буфер.В любом случае вы ограничиваете размер сообщения, вы можете просто предварительно выделить один достаточно большой буфер.

  • Размещение байтов, которые вы хотите отправить в std::deque, является неоптимальным.Вы должны сериализовать их в непрерывный блок памяти для WriteFile().Вместо этого, если вы используете какую-либо очередь обмена данными / данных между одним потоком ввода-вывода и другими потоками, клиентские потоки могут одновременно обеспечивать непрерывный кусок памяти.

  • Чтение 1 байтав то же время кажется глупым тоже.Если это не работает для последовательных устройств, вы можете предоставить достаточно большой буфер для ReadFileEx().Возвращает количество байтов, которые ему удалось прочитать.Он не должен блокировать, AFAIK, если, конечно, я не прав.

  • Вы ожидаете, пока перекрывающийся ввод-вывод завершит использование вызова SleepEx().Какой смысл в перекрывающемся IO, если вы только что оказались синхронными?

1 голос
/ 07 марта 2011

Я не вижу причин для использования асинхронного ввода-вывода в подобных проектах.Асинхронный ввод / вывод хорош, когда вы обрабатываете большое количество сокетов или у вас есть работа во время ожидания данных, но, насколько я могу судить, вы имеете дело только с одним сокетом и не выполняете никакой работы между.

Кроме того, просто для сведения, вы обычно используете порт завершения ввода-вывода для обработки асинхронного ввода-вывода.Я не уверен, есть ли ситуации, когда использование порта завершения ввода-вывода отрицательно влияет на производительность.

Но да, использование асинхронного ввода-вывода выглядит нормально.Реализация собственной OVERLAPPED структуры выглядит как хак, но это правильно;нет другого способа связать ваши собственные данные с завершением.

В Boost также есть реализация циклического буфера, хотя я не уверен, что это потокобезопасно.Однако ни один из стандартных контейнеров библиотеки не является поточно-ориентированным.

...