передача пакетов boost :: asio завершается неудачно при очень частой отправке пакетов - PullRequest
2 голосов
/ 10 августа 2010

В конечном итоге я пытаюсь перенести буферы с одной машины на другую. Приведенный ниже код принимает поток <id><size><data with size bytes> и считывает часть в функции handleReadHeader, затем читает число байтов <size>, затем возвращается и ждет еще одну пару <id><size>. Я вставил много кода, но на самом деле единственные подозрительные функции:
Downlink :: addMsgToQueue
Downlink :: writeCallback
Downlink :: startWrites () Downlink :: handleReadHeader
Downlink :: handleReadFrameDataBGR

using namespace std;
using namespace boost;
using namespace boost::asio;

Downlink::Downlink() :
  socket(nIO),
  headerSize(sizeof(unsigned int)+1),
  connected(false),
  isWriting(false),
  readHeaderBuffer(headerSize)
{}

Downlink::~Downlink() {
  disconnect();
}

bool Downlink::connect(const std::string &robotHost, unsigned int port) {
  disconnect();

  ip::tcp::resolver resolver(nIO);
  ip::tcp::resolver::query query(robotHost, lexical_cast<string>(port));
  ip::tcp::resolver::iterator iterator = resolver.resolve(query);

  ip::tcp::resolver::iterator end;
  boost::system::error_code ec;
  for(;iterator!=end;++iterator) {
    socket.connect(*iterator, ec);
    if(!ec)
      break;
    socket.close();
  }
  if(!socket.is_open())
    return false;

  async_read(socket, buffer(readHeaderBuffer), 
      bind(&Downlink::handleReadHeader, this, _1, _2));

  //start network thread.
  lock_guard<mutex> l(msgMutex);
  outgoingMessages = queue<vector<char> >();
  nIO.reset();
  t = thread(bind(&boost::asio::io_service::run, &nIO));
  connected = true;
  return true;
}

bool Downlink::isConnected() const {
  return connected;
}

void Downlink::disconnect() {
  nIO.stop();
  t.join();
  socket.close();
  connected = false;
  isWriting = false;
  nIO.reset();
  nIO.run();
}

void Downlink::writeToLogs(const std::string &logMsg) {
  vector<char> newMsg(logMsg.length()+headerSize);
  newMsg[0] = MSG_WRITE_LOG;
  const unsigned int msgLen(logMsg.length());
  memcpy(&newMsg[1], &msgLen, sizeof(unsigned int));
  vector<char>::iterator dataBegin = newMsg.begin();
  advance(dataBegin, headerSize);
  copy(logMsg.begin(), logMsg.end(), dataBegin);
  assert(newMsg.size()==(headerSize+logMsg.length()));
  addMsgToQueue(newMsg);
}

void Downlink::addMsgToQueue(const std::vector<char> &newMsg) {
  lock_guard<mutex> l(msgMutex);
  outgoingMessages.push(newMsg);
  lock_guard<mutex> l2(outMutex);
  if(!isWriting) {
    nIO.post(bind(&Downlink::startWrites, this));
  }
}

void Downlink::writeCallback(const boost::system::error_code& error,
        std::size_t bytes_transferred) {
  if(error) {
    disconnect();
    lock_guard<mutex> l(msgMutex);
    outgoingMessages = queue<vector<char> >();
    return;
  }
  {
    lock_guard<mutex> l2(outMutex);
    isWriting = false;
  }
  startWrites();
}


void Downlink::startWrites() {
  lock_guard<mutex> l(msgMutex);
  lock_guard<mutex> l2(outMutex);
  if(outgoingMessages.empty()) {
    isWriting = false;
    return;
  }

  if(!isWriting) {
    currentOutgoing = outgoingMessages.front();
    outgoingMessages.pop();
    async_write(socket, buffer(currentOutgoing),
  bind(&Downlink::writeCallback, this, _1, _2));
    isWriting = true;
  }
}

void Downlink::handleReadHeader(const boost::system::error_code& error,
    std::size_t bytes_transferred) {
  //TODO: how to handle disconnect on errors?
  cout<<"handleReadHeader"<<endl;
  if(error) {
    return;
  }
  assert(bytes_transferred==headerSize);
  if(bytes_transferred!=headerSize) {
    cout<<"got "<<bytes_transferred<<" while waiting for a header."<<endl;
  }
  currentPacketID = readHeaderBuffer[0];

  memcpy(&currentPacketLength, &readHeaderBuffer[1], sizeof(unsigned int));
  dataStream.resize(currentPacketLength);
  switch(currentPacketID) {
  case MSG_FRAME_BGR: {
    cout<<"- >> gone to read frame. ("<<currentPacketLength<<")"<<endl;
    async_read(socket, asio::buffer(dataStream), 
        boost::asio::transfer_at_least(currentPacketLength),
        bind(&Downlink::handleReadFrameDataBGR, this, _1, _2));    
  } break;
  default: {
    cout<<"->>> gone to read other. ("<<currentPacketLength<<")"<<endl;
    cout<<"      "<<(int)currentPacketID<<endl;
    async_read(socket, asio::buffer(dataStream), 
        boost::asio::transfer_at_least(currentPacketLength),
        bind(&Downlink::handleReadData, this, _1, _2));
  } break;
  }
}

void Downlink::handleReadData(const boost::system::error_code& error,
    std::size_t bytes_transferred) {
  cout<<"handleReadData"<<endl;
  if(error) {
    return;
  }
  if(bytes_transferred!=currentPacketLength) {
    cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
  }
  assert(bytes_transferred==currentPacketLength);

  switch(currentPacketID) {
  case MSG_ASCII: {
    string msg(dataStream.begin(), dataStream.end());
    textCallback(&msg);
  } break;
  case MSG_IMU: {
    Eigen::Vector3d a,g,m;
    unsigned int stamp;
    memcpy(a.data(), &dataStream[0], sizeof(double)*3);
    memcpy(m.data(), &dataStream[0]+sizeof(double)*3, sizeof(double)*3);
    memcpy(g.data(), &dataStream[0]+sizeof(double)*6, sizeof(double)*3);
    memcpy(&stamp, &dataStream[0]+sizeof(double)*9, sizeof(unsigned int));
    imuCallback(a,m,g,stamp);
  } break;
  default:
    //TODO: handle this better?
    cout<<"Unknown packet ID."<<endl;
  }

  async_read(socket, buffer(readHeaderBuffer), 
      boost::asio::transfer_at_least(headerSize),
      bind(&Downlink::handleReadHeader, this, _1, _2));
}

void Downlink::handleReadFrameDataBGR(const boost::system::error_code& error,
          std::size_t bytes_transferred) {
  cout<<"Got a frame"<<endl;
  if(error) {
    return;
  }
  if(bytes_transferred!=currentPacketLength) {
    cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
  }
  assert(bytes_transferred==currentPacketLength);
  unsigned int imageWidth, imageHeight, cameraID;

  unsigned char *readOffset = (unsigned char*)&dataStream[0];
  memcpy(&imageWidth, readOffset, sizeof(unsigned int)); 
  readOffset += sizeof(unsigned int);
  memcpy(&imageHeight, readOffset, sizeof(unsigned int)); 
  readOffset += sizeof(unsigned int);
  memcpy(&cameraID, readOffset, sizeof(unsigned int)); 
  readOffset += sizeof(unsigned int);

  cout<<"("<<imageWidth<<"x"<<imageHeight<<") ID = "<<cameraID<<endl;

  frameCallback(readOffset, imageWidth, imageHeight, cameraID);

  async_read(socket, buffer(readHeaderBuffer), 
      boost::asio::transfer_at_least(headerSize),
      bind(&Downlink::handleReadHeader, this, _1, _2));
}


boost::signals2::connection Downlink::connectTextDataCallback(boost::signals2::signal<void (std::string *)>::slot_type s) {
  return textCallback.connect(s);
}

boost::signals2::connection Downlink::connectIMUDataCallback(boost::signals2::signal<void (Eigen::Vector3d, Eigen::Vector3d, Eigen::Vector3d, unsigned int)>::slot_type s) {
  return imuCallback.connect(s);
}

boost::signals2::connection Downlink::connectVideoFrameCallback(boost::signals2::signal<void (unsigned char *, unsigned int, unsigned int, unsigned int)>::slot_type s) {
  return frameCallback.connect(s);
}

Вот код на другом конце. Это почти точно так же, как другой код, но ошибка может быть в любом конце.

using namespace std;
using namespace boost;
using namespace boost::asio;

Uplink::Uplink(unsigned int port) :
  socket(nIO),
  acceptor(nIO),
  endpoint(ip::tcp::v4(), port),
  headerSize(sizeof(unsigned int)+1), //id + data size
  headerBuffer(headerSize)
{
  //move socket into accept state.
  acceptor.open(endpoint.protocol());
  acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
  acceptor.bind(endpoint);
  acceptor.listen(1);  //1 means only one client in connect queue.
  acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
  //start network thread.
  nIO.reset();
  t = thread(boost::bind(&boost::asio::io_service::run, &nIO));
}

Uplink::~Uplink() {
  nIO.stop();  //tell the network thread to stop.
  t.join();  //wait for the network thread to stop.
  acceptor.close(); //close listen port.
  socket.close();   //close active connections.
  nIO.reset();
  nIO.run(); //let clients know that we're disconnecting.
}

void Uplink::parse_header(const boost::system::error_code& error,
     std::size_t bytes_transferred) {
  if(error || bytes_transferred!=headerSize) {
    disconnect();
    return;
  }
  currentPacketID = headerBuffer[0];
  memcpy(&currentPacketLength, &headerBuffer[1], sizeof(unsigned int));
  //move to read data state

  //TODO: move to different states to parse various packet types.

  async_read(socket, asio::buffer(dataStream), transfer_at_least(currentPacketLength),
      bind(&Uplink::parse_data, this, _1, _2));
}

void Uplink::parse_data(const boost::system::error_code& error,
   std::size_t bytes_transferred) {
  if(error) {
    disconnect();
    return;
  }

  if(bytes_transferred != currentPacketLength) {
    cout<<"bytes_transferred != currentPacketLength"<<endl;
    disconnect();
    return;
  }

  //move back into the header reading state
  async_read(socket, buffer(headerBuffer), 
      bind(&Uplink::parse_header, this, _1, _2));
}

void Uplink::disconnect() {
  acceptor.close();
  socket.close();
  acceptor.open(endpoint.protocol());
  acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
  acceptor.bind(endpoint);
  acceptor.listen(1);  //1 means only one client in connect queue.
  acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
}

void Uplink::accept_handler(const boost::system::error_code& error)
{
  if (!error) {
    //no more clents.
    acceptor.close();
    //move to read header state.
    async_read(socket, buffer(headerBuffer), 
        bind(&Uplink::parse_header, this, _1, _2));
  }
}

void Uplink::sendASCIIMessage(const std::string &m) {
  //Format the message
  unsigned int msgLength(m.length());
  vector<char> outBuffer(msgLength+headerSize);
  outBuffer[0] = MSG_ASCII;
  memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));
  vector<char>::iterator dataBegin(outBuffer.begin());
  advance(dataBegin, headerSize);
  copy(m.begin(), m.end(), dataBegin);
  //queue the message
  addToQueue(outBuffer);
}

void Uplink::sendIMUDataBlock(const nIMUDataBlock *d) {
  //Format the message.
    //a,g,m, 3 components each plus a stamp
  const unsigned int msgLength(3*3*sizeof(double)+sizeof(unsigned int)); 
  vector<char> outBuffer(msgLength+headerSize);
  outBuffer[0] = MSG_IMU;
  memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));

  const Eigen::Vector3d a(d->getAccel());
  const Eigen::Vector3d m(d->getMag());
  const Eigen::Vector3d g(d->getGyro());
  const unsigned int s(d->getUpdateStamp());

  memcpy(&outBuffer[headerSize], a.data(), sizeof(double)*3);
  memcpy(&outBuffer[headerSize+3*sizeof(double)], m.data(), sizeof(double)*3);
  memcpy(&outBuffer[headerSize+6*sizeof(double)], g.data(), sizeof(double)*3);
  memcpy(&outBuffer[headerSize+9*sizeof(double)], &s, sizeof(unsigned int));

  /*
  cout<<"----------------------------------------"<<endl;
  cout<<"Accel = ("<<a[0]<<","<<a[1]<<","<<a[2]<<")"<<endl;
  cout<<"Mag   = ("<<m[0]<<","<<m[1]<<","<<m[2]<<")"<<endl;
  cout<<"Gyro  = ("<<g[0]<<","<<g[1]<<","<<g[2]<<")"<<endl;
  cout<<"Stamp = "<<s<<endl;
  cout<<"----------------------------------------"<<endl;
  */

  //queue the message
  addToQueue(outBuffer);
}

void Uplink::send_handler(const boost::system::error_code& error,
     std::size_t bytes_transferred) {
  {
    lock_guard<mutex> l(queueLock);
    lock_guard<mutex> l2(sendingLock);
    if(outQueue.empty()) {
      currentlySending = false;
      return;
    }
  }
  startSend();
}

void Uplink::addToQueue(const std::vector<char> &out) {
  bool needsRestart = false;
  {
    lock_guard<mutex> l(queueLock);
    lock_guard<mutex> l2(sendingLock);
    outQueue.push(out);
    needsRestart = !currentlySending;
  }
  if(needsRestart)
    nIO.post(bind(&Uplink::startSend, this));
}

void Uplink::startSend() {
  lock_guard<mutex> l(queueLock);
  lock_guard<mutex> l2(sendingLock);
  if(outQueue.empty())
    return;
  currentlySending = true;
  currentWrite = outQueue.front();
  outQueue.pop();
  async_write(socket, buffer(currentWrite), bind(&Uplink::send_handler, 
       this, _1, _2));
}

void Uplink::sendVideoFrameBGR(const unsigned int width, const unsigned int height, 
          const unsigned int cameraID, const unsigned char *frameData) {
  //                             image data            image metadata        header
  const unsigned int packetSize(width*height*3   +   sizeof(unsigned int)*3 + headerSize);
  const unsigned int dataSize(width*height*3   +   sizeof(unsigned int)*3);
  vector<char> outgoingBuffer(packetSize);
  outgoingBuffer[0] = MSG_FRAME_BGR;
  memcpy(&outgoingBuffer[1], &dataSize, sizeof(unsigned int));
  char *writePtr = &outgoingBuffer[headerSize];
  memcpy(writePtr, &width, sizeof(unsigned int));
  writePtr += sizeof(unsigned int);
  memcpy(writePtr, &height, sizeof(unsigned int));
  writePtr += sizeof(unsigned int);
  memcpy(writePtr, &cameraID, sizeof(unsigned int));
  writePtr += sizeof(unsigned int);
  memcpy(writePtr, frameData, width*height*3*sizeof(char));

  //TODO: can we avoid the whole image copy here?
  //TODO: should come up with a better packet buffer build system.
  //IDEA!: maybe have a "request buffer" funxction so the Uplink
  //class can have sole ownership, rather than do the copy in "addtoQueue"
  addToQueue(outgoingBuffer);
}

Эта программа работает большую часть времени, но редко, при отправке большого количества данных без задержки между пакетами происходит сбой. Например:

sendVideoFrameBGR(...);  //occasional fail
sendASCIIMessage("...");

sendVideoFrameBGR(...);  //never fails.
sleep(1); 
sendASCIIMessage("...");

после обработки видеокадра в нисходящем канале он возвращается к hadleHeaderData и ожидает пакет длиной в несколько мегабайт и идентификатор пакета, который не существует. Каким-то образом поток становится поврежденным. Я не знаю почему.

Меня не особо волнует код, который я сейчас написал, поэтому, если кто-нибудь знает хороший класс или библиотеку для разбора потоков по TCP на блоки буферов, я бы лучше использовал это.

EDIT:
Вот точный код, который запускает отправку данных:

    if(frontImage) {
      uplink.sendVideoFrameBGR(frontImage->width, frontImage->height, 0,
                   (unsigned char*)frontImage->imageData);
      cout<<"Sent"<<endl;
      //sleep(1);   //works fine if this is uncommented !
    }

    uplink.sendASCIIMessage("Alive...");
    sleep(1);
    uplink.sendIMUDataBlock(imuDataBlock.get());
    cout<<"Loop"<<endl;
    sleep(1);
  }

1 Ответ

3 голосов
/ 10 августа 2010

Скорее всего, проблема в том, что ваш объект ioservice выполняет несколько операций обработки потоков.

Когда вы вызываете вторую функцию отправки сразу после первой, два объекта функции, отправленные в ioservice, вероятно, делегируются разным потокам.Таким образом, в основном, две записи происходят в одном сокете параллельно.Это, скорее всего, незаконно.Использование Winsock2 с неблокирующими сокетами приведет к повреждению исходящих данных.

Даже если вы используете bool для проверки того, отправляет ли он в данный момент, bool не проверяется, пока один из потоков ioservice не будетобработка функции.Если два потока ioservice активны, когда вы публикуете две части работы, он может отправлять обе отправки одновременно, вызывая асинхронное выполнение двух функций отправки в отдельных потоках.Проверка «в данный момент отправляет» может возвращать ложь в обоих вызовах, поскольку две отправки выполняются параллельно.

...