Как читать из Windows Named Pipe с таймаутом? - PullRequest
0 голосов
/ 30 октября 2018

Я пытаюсь перенести что-то очень простое в Unix на Windows: чтение из именованного канала (fifo) с таймаутом, когда ничего не произошло. Я пытаюсь сделать это простым способом (используя PeekNamedPipe), без использования перекрывающегося ввода-вывода. Возможно ли это?

Ниже приведен полный пример. На данный момент это не работает: записанная строка не получена, а читатель получает

ERROR_NO_DATA 232 (0xE8) Труба закрывается

после первого чтения, и писатель получает это после нескольких попыток

ERROR_SEM_TIMEOUT 121 (0x79) Истекло время ожидания семафора

Автор теста (клиент канала):

#include "Windows.h"
#include <iostream>
#include <stdio.h>
#include <errno.h>

int main()
{
    HANDLE hpipe;
    DWORD written;
    const char *pname = "\\\\.\\pipe\\isp-control";
    char msg[] = "play asdf.wav";

    printf("waiting for pipe %s\n", pname);
    bool stat = WaitNamedPipeA(pname, 4000);
    printf("  --> %d, error %d\n", stat, GetLastError());

    if (stat)
    {
        hpipe = CreateFileA(pname, GENERIC_WRITE | GENERIC_READ,
            0, NULL, OPEN_EXISTING, 0, NULL);

        if (hpipe != INVALID_HANDLE_VALUE)
        {
            bool stat = WriteFile(hpipe, msg, strlen(msg) + 1, &written, NULL);
            printf("  wrote %d bytes of %d: '%s'\n", written, strlen(msg) + 1, msg);
            printf("  --> %d, error %d\n", stat, GetLastError());

            CloseHandle(hpipe);
        }
        else
        {
            printf("error %d opening pipe (handle %d)\n", GetLastError(), (int) hpipe);
            return 1;
        }
    }
    return 0;
}

Тестовый ридер (труба сервер и клиент):

#include <stdio.h>
#include <iostream>
#include <string>
#include "pipeio.hpp"

int main (int argc, char *argv[])
{
  std::string pipename = "\\\\.\\pipe\\isp-control";    // command input filename
  ISP::WindowsPipeIO io(pipename, 2);

  while (true)
  {
    // wait for pipe input in command string or timeout
    std::string cmdstr = io.wait();
    printf("got response: '%s'\n", cmdstr.c_str());
  }
}

и класс pipeio.hpp, в первом #ifdef я оставил версию Unix, чтобы показать, чего я пытаюсь достичь:

#ifndef _PIPEIO_HPP_
#define _PIPEIO_HPP_

namespace ISP {

/// interface for wrapper around named pipe
class PipeIO
{
public:
  //virtual ~PipeIO ();
  virtual std::string wait () = 0;
};

#ifndef WIN32  /////////// Unix version
#include <errno.h>

/// MacOS/Linux wrapper around named pipe
class UnixPipeIO : public PipeIO
{
public:
  UnixPipeIO (std::string name, double timeout)
  : name_(name)
  {
    // open control and status pipes
    filedes_ = open(name_.c_str(), O_RDWR | O_NONBLOCK);
    if (filedes_ < 0)
      throw std::runtime_error("can't open pipe");

    timeout_.tv_sec  = timeout; // todo: use fractional
    timeout_.tv_usec = 0;
  }

  std::string wait ()
  {
    FD_ZERO(&input_);
    FD_SET(filedes_, &input_);

    // wait for commands
    //db printf("waiting for fd %d...\n", filedes_);
    int ready = select(filedes_ + 1, &input_, NULL, NULL, &timeout_);

    if (ready > 0  &&  FD_ISSET(filedes_, &input_))
    { // ctl input has data, read and parse
      //db printf("fd %d is ready.\n", filedes_);
      int nread = read(filedes_, command_, CTL_INPUT_SIZE);
      command_[nread] = 0;  // null-terminate
      //db printf("read command (nread %d): '%s'\n", nread, nread > 0 ? command_ : "");

      //todo: continue reading when more than CTL_INPUT_SIZE chars are available
      //todo: stop at \0? continue when called again
      return std::string(command_);
    }
    else if (ready == 0)
    {
      //db printf("timeout\n");
      return "timeout";
    }
    else
    {
      fprintf(stderr, "error %d in select: %s\n", errno, strerror(errno));
      return "ioerror";
    }
  }

private:
  std::string name_;    //< name of pipe
  int filedes_;
  struct timeval timeout_;
  fd_set input_;
  static const long CTL_INPUT_SIZE = 65535;
  char command_[CTL_INPUT_SIZE];
};

#else   /////////////////////// Windows

#define NOMINMAX
#include "Windows.h"
#include <errno.h>

/// Windows wrapper around named pipe
class WindowsPipeIO : public PipeIO
{
public:
  WindowsPipeIO (std::string name, double timeout)
  : name_(name), timeout_((int) (timeout * 1000.))
  {
    printf("windowspipeio %s timeout %d ms\n", name_.c_str(), timeout_);

    // open control and status pipes
    hpipe_ = CreateNamedPipeA(name_.c_str(),
                 PIPE_ACCESS_DUPLEX,
                 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT,
                 4, // num. instances
                 1024 * 16, // output buffer size
                 CTL_INPUT_SIZE,  // input size
                 timeout_, // default timeout ms 
                 NULL);
    if (hpipe_ == INVALID_HANDLE_VALUE)
      throw std::runtime_error("can't create pipe " + name);

    // open read handle
    hread_ = CreateFileA(name_.c_str(), GENERIC_READ, 0, NULL, OPEN_EXISTING, 0, NULL);

    if (hread_ == INVALID_HANDLE_VALUE)
      throw std::runtime_error("can't open read handle " + name);

    printf("  windowspipeio created pipe %d, read %d\n", (int) hpipe_, (int) hread_);
  }

  ~WindowsPipeIO()
  {
    CloseHandle(hread_);

    printf("~windowspipeio disconnect %s\n", name_.c_str());
    DisconnectNamedPipe(hpipe_);
    printf("~windowspipeio close %s\n", name_.c_str());
    CloseHandle(hpipe_);
  }

  std::string wait ()
  {
    DWORD dwRead;

    // wait for commands
    if (hpipe_ != INVALID_HANDLE_VALUE  &&  hread_ != INVALID_HANDLE_VALUE)
    {
      //db
      printf("waiting for connection for pipe %s handle %d\n", name_.c_str(), (int) hpipe_);
      bool stat   = ConnectNamedPipe(hpipe_, NULL);
      int  winerr = GetLastError();
      printf("  --> status %d error %d\n", stat, winerr);

      if (winerr == ERROR_PIPE_CONNECTED)
      { // connection is good

    // instead of going through implementing "overlapped i/o", we simply peek into the pipe to see if data is available
    DWORD nready = 0;
    if (PeekNamedPipe(hpipe_, NULL, 0, NULL, &nready, NULL) == 0)
      fprintf(stderr, "error %d peeking into pipe: %s\n", GetLastError(), strerror(errno));

    printf("pipe connected and peeked at %d bytes\n", nready);

    if (nready > 0)
    { // data available
      printf("  pipe %s has available %d bytes\n", name_.c_str(), nready);

      while (ReadFile(hread_, command_, CTL_INPUT_SIZE - 1, &dwRead, NULL) != FALSE)
      {
        /* add terminating zero */
        command_[dwRead] = '\0';
        //db
        printf("  read command (%d bytes): '%s'\n", dwRead, dwRead > 0 ? command_ : "");

        //todo: continue reading when more than CTL_INPUT_SIZE chars are available
        //todo: stop at \0? continue when called again
        return std::string(command_);
      }

      printf("  hmmm, read %d, but there should have been %d bytes (error %d)\n", dwRead, nready, GetLastError());
    }
    // else: no data available: timeout

    DisconnectNamedPipe(hpipe_);
    printf("disconnected from pipe (error %d)\n", GetLastError());
      }
      // else: no client connected: timeout

      //db
      printf("timeout\n");
      Sleep(timeout_);
      return "timeout";
    }
    else
    {
      fprintf(stderr, "error %d (invalid pipe handle): %s\n", errno, strerror(errno));
      return "ioerror";
    }
  }

private:
  std::string name_;    //< name of pipe
  int timeout_; //ms
  HANDLE hpipe_;
  HANDLE hread_;
  static const long CTL_INPUT_SIZE = 65535;
  char command_[CTL_INPUT_SIZE];
};

#endif

} // end namespace ISP

#endif // _PIPEIO_HPP_
...