Я работаю над парой приложений клиент / сервер UDP, и мое приложение, которое отправляет команды, работает превосходно - я могу контролировать то, что отправляется в порт через nc и hexdump, и они отлично декодируются.
В моем приложении, которое должно получать команды, я использую recvfrom с флагом MSG_DONTWAIT.Я делаю это, потому что мне нужно проверить очередь для отправляемых вещей, так что просто оставить блокировку недоступной.ЕСЛИ я удаляю флаг MSG_DONTWAIT, сообщения принимаются и обрабатываются правильно, но это блокирует ожидание, которое не будет работать для моего приложения.При использовании MSG_DONTWAIT он всегда возвращает -1 и устанавливает для errno значение EAGAIN.Хотя этого можно ожидать, когда ничего не отправляется, он НИКОГДА не получает ничего вообще.Я бы подумал, что он вернет EAGAIN, пока что-нибудь не станет доступно, но это не так.Соответствующий код выложен ниже - что мне не хватает?
uint8_t Receiver::Setup(uint16_t rx_port, uint16_t tx_port)
{
std::stringstream ss;
ss << "UDP session manager, setup ports.";
Logger::Info(ss.str());
tx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
rx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (rx_socket_fd < 0)
{
Logger::Error("Could not open an rx UDP socket!");
}
else
{
std::cout << "rx_socket_fd is " << rx_socket_fd << "\n";
}
if (tx_socket_fd < 0)
{
Logger::Error("Could not open an tx UDP socket!");
}
else
{
std::cout << "tx_socket_fd is " << tx_socket_fd << "\n";
}
int reuse = 1;
if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("Could not set socket reuse!");
#ifdef SO_REUSEPORT
reuse = 1;
if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("setsockopt(SO_REUSEPORT) failed");
#endif
reuse = 1;
if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("Could not set socket reuse!");
#ifdef SO_REUSEPORT
reuse = 1;
if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("setsockopt(SO_REUSEPORT) failed");
#endif
memset(&tx_sockaddr, 0, sizeof(tx_sockaddr));
memset(&rx_sockaddr, 0, sizeof(rx_sockaddr));
tx_sockaddr.sin_family = AF_INET;
tx_sockaddr.sin_addr.s_addr = INADDR_ANY;
tx_sockaddr.sin_port = htons(tx_port);
rx_sockaddr.sin_family = AF_INET;
rx_sockaddr.sin_addr.s_addr = INADDR_ANY;
rx_sockaddr.sin_port = htons(rx_port);
int rva = 0;
rva = bind(tx_socket_fd, (const struct sockaddr *) &tx_sockaddr, sizeof(tx_sockaddr) );
if (rva < 0)
{
std::stringstream ss;
ss << "UDP SessionManager: Could not bind to tx socket (bind returned error code " << rva << ", errno is " << errno << ")";
Logger::Error(ss.str());
}
rva = bind(rx_socket_fd, (const struct sockaddr *) &rx_sockaddr, sizeof(rx_sockaddr) );
if (rva < 0)
{
std::stringstream ss;
ss << "UDP SessionManager: Could not bind to rx socket (bind returned error code " << rva << ", errno is " << errno << ")";
Logger::Error(ss.str());
}
return NO_ERROR;
}
uint8_t Receiver::SendTelemetry(const TelemetryBase * telemetry)
{
const uint8_t * bytes = EncodeTelemetryToSend(telemetry);
if (bytes == NULL)
{
Logger::Error("Receiver: Something went wrong trying to encode the telemetry.");
return 1;
}
const UDPHeader * header = (const UDPHeader * ) bytes;
uint16_t numBytes = header->length;
std::stringstream ss;
ss << "Receiver::SendTelemetry - bytesToWrite is " << numBytes << "\n";
Logger::Info(ss.str());
int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (rva == -1 && errno == EINVAL)
{
ss.clear();
ss << "invalid argument!";
Logger::Warn(ss.str());
}
else if (rva < 0)
{
ss.clear();
ss << "Failed to write to the UDP port, errno is " << errno;
Logger::Warn(ss.str());
return 1;
}
delete bytes;
return 0;
}
uint8_t Receiver::SendCommand(const CommandBase * command)
{
const uint8_t * bytes = EncodeCommandToSend(command);
if (bytes == NULL)
{
Logger::Error("Receiver: Something went wrong trying to encode the message.");
return 1;
}
const UDPHeader * header = (const UDPHeader * ) bytes;
uint16_t numBytes = header->length;
std::stringstream ss;
ss << "Receiver::SendCommand - bytesToWrite is " << numBytes << "\n";
Logger::Info(ss.str());
int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (rva < 0)
{
ss.clear();
ss << "Failed to write to the UDP port, errno is " << errno;
Logger::Warn(ss.str());
return 1;
}
delete bytes;
return 0;
}
uint8_t Receiver::Receive()
{
uint8_t inputBuffer[UDP_BUFFER_BYTES];
memset(inputBuffer, '\0', UDP_BUFFER_BYTES);
int totalBytesRead = 0;
//socklen_t addressLength = sizeof(rx_sockaddr);
struct sockaddr_in sender;
socklen_t len;
totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
MSG_DONTWAIT, (struct sockaddr *) &sender, &len );
if ( totalBytesRead >= 0 )
{
std::stringstream ss;
ss << "UDP port read " << totalBytesRead << " bytes";
Logger::Info(ss.str() );
const CommandBase * command = DecodeReceivedCommand(inputBuffer);
if (command == NULL)
{
Logger::Warn("Failed to decode received command from commanding app.");
return UDP_ERROR_DECODE_FAILED;
}
EnqueCommand(command);
}
else
{
std::stringstream ss;
ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;
Logger::Debug(ss.str());
}
return UDP_ERROR_NO_ERROR;
}
void Receiver::ProcessingLoopThread()
{
while ( GetState() == STATE_RUN )
{
const TelemetryBase * telemetry = DequeTelemetry();
while (telemetry != NULL)
{
std::stringstream ss;
ss << "Receiver sending telemetry with ID: " << telemetry->GetTelemetryID();
Logger::Debug(ss.str());
SendTelemetry(telemetry);
delete telemetry;
telemetry = DequeTelemetry();
}
Receive();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}