C UDP-сокет получения с использованием MSG_DONTWAIT всегда завершается ошибкой - PullRequest
0 голосов
/ 29 декабря 2018

Я работаю над парой приложений клиент / сервер UDP, и мое приложение, которое отправляет команды, работает превосходно - я могу контролировать то, что отправляется в порт через nc и hexdump, и они отлично декодируются.

В моем приложении, которое должно получать команды, я использую recvfrom с флагом MSG_DONTWAIT.Я делаю это, потому что мне нужно проверить очередь для отправляемых вещей, так что просто оставить блокировку недоступной.ЕСЛИ я удаляю флаг MSG_DONTWAIT, сообщения принимаются и обрабатываются правильно, но это блокирует ожидание, которое не будет работать для моего приложения.При использовании MSG_DONTWAIT он всегда возвращает -1 и устанавливает для errno значение EAGAIN.Хотя этого можно ожидать, когда ничего не отправляется, он НИКОГДА не получает ничего вообще.Я бы подумал, что он вернет EAGAIN, пока что-нибудь не станет доступно, но это не так.Соответствующий код выложен ниже - что мне не хватает?

uint8_t Receiver::Setup(uint16_t rx_port, uint16_t tx_port)
{

    std::stringstream ss;
    ss << "UDP session manager, setup ports.";
    Logger::Info(ss.str());

    tx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
    rx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);

    if (rx_socket_fd < 0)
    {
        Logger::Error("Could not open an rx UDP socket!");
    }
    else
    {
        std::cout << "rx_socket_fd is " << rx_socket_fd << "\n";
    }
    if (tx_socket_fd < 0)
    {
        Logger::Error("Could not open an tx UDP socket!");
    }
    else
    {
        std::cout << "tx_socket_fd is " << tx_socket_fd << "\n";
    }


    int reuse = 1;
    if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
        Logger::Warn("Could not set socket reuse!");

    #ifdef SO_REUSEPORT
    reuse = 1;
        if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
            Logger::Warn("setsockopt(SO_REUSEPORT) failed");
    #endif

    reuse = 1;
    if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
        Logger::Warn("Could not set socket reuse!");

    #ifdef SO_REUSEPORT
    reuse = 1;
        if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
            Logger::Warn("setsockopt(SO_REUSEPORT) failed");
    #endif

    memset(&tx_sockaddr, 0, sizeof(tx_sockaddr));
    memset(&rx_sockaddr, 0, sizeof(rx_sockaddr));

    tx_sockaddr.sin_family = AF_INET;
    tx_sockaddr.sin_addr.s_addr = INADDR_ANY;
    tx_sockaddr.sin_port = htons(tx_port);

    rx_sockaddr.sin_family = AF_INET;
    rx_sockaddr.sin_addr.s_addr = INADDR_ANY;
    rx_sockaddr.sin_port = htons(rx_port);

    int rva = 0;

    rva = bind(tx_socket_fd, (const struct sockaddr *) &tx_sockaddr, sizeof(tx_sockaddr) );

    if (rva < 0)
    {
        std::stringstream ss;
        ss << "UDP SessionManager: Could not bind to tx socket (bind returned error code " << rva << ", errno is " << errno << ")";
        Logger::Error(ss.str());
    }

    rva = bind(rx_socket_fd, (const struct sockaddr *) &rx_sockaddr, sizeof(rx_sockaddr) );

    if (rva < 0)
    {
        std::stringstream ss;
        ss << "UDP SessionManager: Could not bind to rx socket (bind returned error code " << rva << ", errno is " << errno << ")";
        Logger::Error(ss.str());
    }

    return NO_ERROR;
}


uint8_t Receiver::SendTelemetry(const TelemetryBase * telemetry)
{
    const uint8_t * bytes = EncodeTelemetryToSend(telemetry);

    if (bytes == NULL)
    {
        Logger::Error("Receiver: Something went wrong trying to encode the telemetry.");
        return 1;
    }

    const UDPHeader * header = (const UDPHeader * ) bytes;
    uint16_t numBytes = header->length;

    std::stringstream ss;
    ss << "Receiver::SendTelemetry - bytesToWrite is " << numBytes << "\n";
    Logger::Info(ss.str());

    int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );

    std::this_thread::sleep_for(std::chrono::milliseconds(10));

    if (rva == -1  && errno == EINVAL)
    {
        ss.clear();
        ss << "invalid argument!";
        Logger::Warn(ss.str());
    }
    else if (rva < 0)
    {
        ss.clear();

        ss << "Failed to write to the UDP port, errno is " << errno;

        Logger::Warn(ss.str());
        return 1;
    }

    delete bytes;

    return 0;
}



uint8_t Receiver::SendCommand(const CommandBase * command)
{
    const uint8_t * bytes = EncodeCommandToSend(command);

    if (bytes == NULL)
    {
        Logger::Error("Receiver: Something went wrong trying to encode the message.");
        return 1;
    }

    const UDPHeader * header = (const UDPHeader * ) bytes;
    uint16_t numBytes = header->length;

    std::stringstream ss;
    ss << "Receiver::SendCommand - bytesToWrite is " << numBytes << "\n";
    Logger::Info(ss.str());

    int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );

    std::this_thread::sleep_for(std::chrono::milliseconds(10));

    if (rva < 0)
    {
        ss.clear();

        ss << "Failed to write to the UDP port, errno is " << errno;

        Logger::Warn(ss.str());
        return 1;
    }

    delete bytes;

    return 0;
}

uint8_t Receiver::Receive()
{
    uint8_t inputBuffer[UDP_BUFFER_BYTES];
    memset(inputBuffer, '\0', UDP_BUFFER_BYTES);

    int totalBytesRead = 0;

    //socklen_t addressLength = sizeof(rx_sockaddr);
    struct sockaddr_in sender;
    socklen_t len;

    totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
                          MSG_DONTWAIT, (struct sockaddr *)  &sender, &len );

    if ( totalBytesRead >= 0 )
    {
        std::stringstream ss;
        ss << "UDP port read " << totalBytesRead << " bytes";
        Logger::Info(ss.str() );

        const CommandBase * command = DecodeReceivedCommand(inputBuffer);

        if (command == NULL)
        {
            Logger::Warn("Failed to decode received command from commanding app.");
            return UDP_ERROR_DECODE_FAILED;
        }

        EnqueCommand(command);

    }
    else
    {
        std::stringstream ss;
        ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;
        Logger::Debug(ss.str());
    }

    return UDP_ERROR_NO_ERROR;
}



void Receiver::ProcessingLoopThread()
{
    while ( GetState() == STATE_RUN )
    {
        const TelemetryBase * telemetry = DequeTelemetry();

        while (telemetry != NULL)
        {
            std::stringstream ss;
            ss << "Receiver sending telemetry with ID: " << telemetry->GetTelemetryID();
            Logger::Debug(ss.str());

            SendTelemetry(telemetry);
            delete telemetry;
            telemetry = DequeTelemetry();
        }

        Receive();

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

Ответы [ 2 ]

0 голосов
/ 29 декабря 2018
struct sockaddr_in sender;
socklen_t len;

totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
                      MSG_DONTWAIT, (struct sockaddr *)  &sender, &len );

Вы не присвоили len разумное значение.Если вы не инициализируете len размером адреса сокета, вызов может не состояться.

Кроме того, вы захватываете errno слишком поздно.Вы должны захватить его сразу после вызова, который получает ошибку.В противном случае другие посреднические операции могут изменить его стоимость.Поэтому вы не можете полагаться на получение разумного значения.

Использование отдельных сокетов для отправки и получения очень странно.Если вы отправляете и получаете в ту же другую конечную точку, вы должны использовать только один сокет.

0 голосов
/ 29 декабря 2018

Несколько вещей:

Возможно, это не проблема, но я бы посоветовал вам зафиксировать ошибку до того, как любой другой код сможет запустить и очистить ошибку.Вместо этого:

    std::stringstream ss;
    ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;

Лучше:

totalBytesRead = recvfrom(rx_socket_fd,...
int lasterror = errno; // catch errno before anything else can change it


. . .
ss << "UDP port rva = " << totalBytesRead << ", errno is " << lasterror;

Вернуться к исходной проблеме.

Я предполагаю, что вам нужно опрашивать сокет более одного разапри использовании неблокирующего флага MSG_DONTWAIT.

Похоже, ваш основной цикл спит в течение 10 миллисекунд между каждым опросом сокета.Если это ваш дизайн, то просто сделайте следующее:

Когда вы создаете сокет, установите для него тайм-аут на 10 миллисекунд:

timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10 * 1000; // 10 milliseconds
setsockopt(rx_socket_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));

Затем просто удалите флаг MSG_DONTWAIT из recvfromcall.

Кроме того, удалите оператор сна в вашем основном цикле:

 std::this_thread::sleep_for(std::chrono::milliseconds(10));

, а затем корректно обработайте ошибку тайм-аута как обычную вещь, которая может произойти

totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
                      0, (struct sockaddr *)  &sender, &len );


if (totalBytesRead >= 0 )
{
    // data available - handle it
}
else
{
    if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
    {
        // socket timed out after waiting 10 milliseconds
    }
    else
    {
        // actual socket error
    }
}
...