Не удается найти причину ошибки "Broken Pipe" при отправке непрерывных блоков данных через веб-сокет Beast - PullRequest
0 голосов
/ 29 марта 2019

Я работаю над распознаванием потокового аудио с API речи Watson to text to text. Я создал веб-сокет с библиотекой boost (beast 1.68.0) на C ++ (стандарт 11).

Я успешно подключился к серверу IBM и хочу отправить 231 296 байт необработанных аудиоданных на сервер следующим образом.

{
  "action": "start",
  "content-type": "audio/l16;rate=44100"
}

websocket.binary(true);
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 31,296 bytes>

websocket.binary(false);
{
  "action": "stop"
}

Ожидаемый результат от IBMServer:

 {"results": [
      {"alternatives": [
            {  "confidence": xxxx, 
               "transcript": "call Rohan Chauhan "
            }],"final": true
      }], "result_index": 0
}

Но я не получаю желаемого результата: скорее ошибка говорит "Разбитая труба"

DataSize is: 50000 | mIsLast is : 0
DataSize is: 50000 | mIsLast is : 0
what : Broken pipe
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 31296 | mIsLast is : 0
what : Operation canceled

Вот мой код , который является адаптацией примера примера , приведенного в библиотеке животных.

Foo.hpp

class IbmWebsocketSession: public std::enable_shared_from_this<IbmWebsocketSession> {
protected:
    char binarydata[50000];
    std::string TextStart;
    std::string TextStop;

public:
    explicit IbmWebsocketSession(net::io_context& ioc, ssl::context& ctx, SttService* ibmWatsonobj) :
        mResolver(ioc), mWebSocket(ioc, ctx) {
    TextStart ="{\"action\":\"start\",\"content-type\": \"audio/l16;rate=44100\"}";
    TextStop = "{\"action\":\"stop\"}";


   /**********************************************************************
    * Desc  : Send start frame
   **********************************************************************/
    void send_start(beast::error_code ec);
   /**********************************************************************
    * Desc  : Send Binary data
   **********************************************************************/
    void send_binary(beast::error_code ec);
   /**********************************************************************
    * Desc  : Send Stop frame
   **********************************************************************/
    void send_stop(beast::error_code ec);
   /**********************************************************************
    * Desc  : Read the file for binary data to be sent
   **********************************************************************/
    void readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF);
}

foo.cpp

void IbmWebsocketSession::on_ssl_handshake(beast::error_code ec) {
    if(ec)
        return fail(ec, "connect");
// Perform the websocket handshake
    ws_.async_handshake_ex(host, "/speech-to-text/api/v1/recognize", [Token](request_type& reqHead) {reqHead.insert(http::field::authorization,Token);},bind(&IbmWebsocketSession::send_start, shared_from_this(),placeholders::_1));
}

void IbmWebsocketSession::send_start(beast::error_code ec){
    if(ec)
        return fail(ec, "ssl_handshake");

    ws_.async_write(net::buffer(TextStart),
        bind(&IbmWebsocketSession::send_binary, shared_from_this(),placeholders::_1));
}

void IbmWebsocketSession::send_binary(beast::error_code ec) {
    if(ec)
        return fail(ec, "send_start");
    readFile(binarydata, &Datasize, &StartPos, &IsLast);

    ws_.binary(true);
    if (!IsLast) {
        ws_.async_write(net::buffer(binarydata, Datasize),
            bind(&IbmWebsocketSession::send_binary, shared_from_this(),
                    placeholders::_1));

    } else {
        IbmWebsocketSession::on_binarysent(ec);
    }
}

void IbmWebsocketSession::on_binarysent(beast::error_code ec) {
    if(ec)
        return fail(ec, "send_binary");

    ws_.binary(false);
    ws_.async_write(net::buffer(TextStop),
           bind(&IbmWebsocketSession::read_response, shared_from_this(), placeholders::_1));
}

void IbmWebsocketSession::readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF) {

    unsigned int end = 0;
    unsigned int start = 0;
    unsigned int length = 0;

    // Creation of ifstream class object to read the file
    ifstream infile(filepath, ifstream::binary);

    if (infile) {
        // Get the size of the file
        infile.seekg(0, ios::end);
        end = infile.tellg();

        infile.seekg(*start_pos, ios::beg);
        start = infile.tellg();

        length = end - start;
    }

    if ((size_t) length < 150) {
        *Len = (size_t) length;
        *ReachedEOF = true;
    // cout << "Reached end of File (last 150 bytes)" << endl;

    } else if ((size_t) length <= 50000) {  //Maximumbytes to send are 50000
        *Len = (size_t) length;
        *start_pos += (size_t) length;
        *ReachedEOF = false;
        infile.read(bdata, length);

    } else {
        *Len = 50000;
        *start_pos += 50000;
        *ReachedEOF = false;
        infile.read(bdata, 50000);
    }

    infile.close();
}

Есть предложения?

1 Ответ

0 голосов
/ 01 апреля 2019

Из документации Boost мы имеем следующую выдержку на websocket::async_write

Эта функция используется для асинхронной записи полного сообщения.Этот звонок всегда возвращается немедленно.Асинхронная операция будет продолжаться до тех пор, пока не будет выполнено одно из следующих условий:

  1. Полное сообщение записано.

  2. Произошла ошибка.

Поэтому при создании объекта буфера для передачи ему net::buffer(TextStart), например, время жизниbuffer передается ему только до тех пор, пока функция не вернется.Может случиться так, что даже после того, как функция возвратит вас, асинхронная запись все еще работает в буфере согласно документации, но содержимое больше не является действительным, поскольку buffer была локальной переменной.

Чтобы исправить это, вымог бы сделать ваш TextStart статическим или объявить, что как член вашего класса и скопировать его в boost :: asio :: buffer , есть множество примеров, как это сделать.Примечание. Я упоминаю TextStart только в функции IbmWebsocketSession::send_start.Проблема почти одинакова во всем вашем коде.

Из определения API-интерфейса IBM Watson , для инициирования соединения требуется определенный формат, который затем можно представить в виде строки.У вас есть строка, но отсутствует правильный формат, из-за которого одноранговое соединение закрывает соединение, и вы выполняете запись в закрытый сокет, таким образом, в разорванный канал.

Для инициирующего соединения требуется:

  var message = {
    action: 'start',
    content-type: 'audio/l16;rate=22050'
  };

Который может быть представлен как string TextStart = "action: 'start',\r\ncontent-type: 'audio\/l16;rate=44100'" в соответствии с вашими требованиями.

После обсуждения в чате ОП решил проблему, добавив код:

if (!IsLast ) {
    ws_.async_write(net::buffer(binarydata, Datasize),
    bind(&IbmWebsocketSession::send_binary, shared_from_this(),
    placeholders::_1));
} 
else {
     if (mIbmWatsonobj->IsGstFileWriteDone()) { //checks for the file write completion
         IbmWebsocketSession::on_binarysent(ec);
     } else {
         std::this_thread::sleep_for(std::chrono::seconds(1));
         IbmWebsocketSession::send_binary(ec);
     }
}

Что из обсуждения связано с тем, что клиенту было отправлено больше байтов до того, как запись файла была завершена на том же наборе байтов.OP теперь проверяет это перед попыткой отправить больше байтов.

...