Я написал программу для модульного тестирования передачи данных с QUdpSocket. Он отправляет произвольное (предоставленное пользователем) количество Data
элементов в отсортированном векторе (Data
имеет индекс и случайно сгенерированное число). Программа создает 2 QUdpSocket
s: одно для отправки, другое для приема данных. Он создает и отправляет вектор Data
, получает его и анализирует на согласованность. Функцию для анализа можно найти здесь . Существует также репо с полным проектом на GitHub.
Я столкнулся с очень странными результатами. Когда я использую главное событие l oop для отправки и получения данных, всегда есть ограничение на количество получаемых байтов: для Windows 10 это 65536 байт, для Windows 7 - 8192. Я пробовал чтобы установить размер буфера QUdpSocket
с разными значениями (по умолчанию 0, в документах говорится, что это означает, что буфер не имеет ограничений), но результат всегда один и тот же. Wireshark, однако, показывает, что все пакеты отправляются независимо от того, сколько данных я передаю.
Когда я использую разные QThread
s для отправки и получения сокетов, я также получаю очень непоследовательное поведение. Таким образом, когда я запускаю Windows 10 в режиме выпуска (скомпилированный с MSV C или MinGW) ALL , пакеты принимаются. Для режима отладки:
- просто запуск теста, скомпилированного с MinGW, проходит нормально, и все пакеты принимаются
- запуск теста в режиме отладки (с CLion) кардинально отличается. Выполнение занимает намного больше времени, и многие пакеты теряются или принимаются не по порядку.
- выполнение теста, скомпилированного с помощью MSV C, дает неверные результаты - пакеты потеряны или не в порядке, несмотря ни на что.
struct Data
{
uint64_t index,
rndNumber;
// uint64_t zero{0};
};
// helper function to ensure Data is not sent segmented
template<typename Iter, typename = typename
std::enable_if<
std::is_same<typename std::iterator_traits<Iter>::iterator_category,
std::random_access_iterator_tag>::value &&
sizeof(typename std::iterator_traits<Iter>::value_type) <= 512>::type>
void WriteData(QUdpSocket &socket, Iter begin, Iter end)
{
constexpr size_t sizeofData = sizeof(typename std::iterator_traits<Iter>::value_type);
static_assert(sizeofData <= 512, "Size of data type must be less or equal 512 bytes");
assert(std::distance(begin, end) >= 0 && "Invalid range");
while (begin != end)
{
size_t elemsToSend = 512 / sizeofData;
size_t elemsRemains = std::distance(begin, end);
elemsToSend = std::min(elemsToSend, elemsRemains);
size_t bytesToSend = elemsToSend * sizeofData;
const char *pDatagram = reinterpret_cast<const char *>(&*begin);
uint64_t bytesWritten = socket.write(pDatagram, bytesToSend);
assert(bytesWritten == bytesToSend ||
bytesWritten != -1 && "Failed to send datagram");
begin = std::next(begin, elemsToSend);
}
}
int main(int argc, char **argv)
{
size_t elems = 32768;
if (argc == 2)
{
try
{
elems = std::stoll(argv[1]);
}
catch (const std::invalid_argument &e)
{
std::cerr << "Invalid argument for elements number. Testing with " << elems <<
"elements." << std::endl;
}
}
// form vector to send
std::vector<Data> dataSent{elems},
dataReceived{};
{
std::random_device rd;
std::mt19937 rng(rd());
std::uniform_int_distribution<std::mt19937::result_type> dist(0);
for (size_t i = 0; i != elems; ++i)
{
dataSent[i].index = i;
dataSent[i].rndNumber = dist(rng);
}
}
QHostAddress address = QHostAddress::LocalHost;
quint16 unicastPortSend = 8001;
QCoreApplication app{argc, argv};
QTimer receiveTimeout;
QObject::connect(&receiveTimeout, &QTimer::timeout,
&app, &QCoreApplication::quit, Qt::QueuedConnection);
receiveTimeout.start(2000);
QUdpSocket receiver;
receiver.bind(address, unicastPortSend);
#ifdef QTHREADS
// receive vector in a separate thread
QThread receiverThread;
receiver.moveToThread(&receiverThread);
receiveTimeout.moveToThread(&receiverThread);
QObject::connect(&receiveTimeout, &QTimer::timeout,
&receiverThread, &QThread::quit);
// move receiver and timer back to the main thread when finished
QObject::connect(&receiverThread, &QThread::finished, &receiver,
std::bind(&QObject::moveToThread, &receiver,
app.thread()));
QObject::connect(&receiverThread, &QThread::finished, &receiveTimeout,
std::bind(&QObject::moveToThread, &receiveTimeout,
app.thread()));
receiverThread.start();
#endif
// connect reading function
size_t countSteps{};
QObject::connect(&receiver, &QUdpSocket::readyRead, [&]
{
receiveTimeout.stop();
++countSteps;
while (receiver.hasPendingDatagrams())
{
QByteArray array;
int64_t pendingDatagramSize = receiver.pendingDatagramSize();
assert(pendingDatagramSize != -1 && "Unexpected datagram size!");
array.resize(pendingDatagramSize);
uint16_t readSize = receiver.readDatagram(array.data(), pendingDatagramSize);
assert(readSize == pendingDatagramSize && "Failed to read pending datagram");
const Data *pData = reinterpret_cast<const Data *>(array.data());
size_t elemsReceived = readSize / sizeof(Data);
std::copy(pData, std::next(pData, elemsReceived), std::back_inserter(dataReceived));
}
receiveTimeout.start(200);
});
// sender socket
QUdpSocket sender;
sender.connectToHost(address, unicastPortSend);
#ifdef QTHREADS
QThread senderThread;
sender.moveToThread(&senderThread);
// move sender back to main thread on finish
QObject::connect(&senderThread, &QThread::finished,
&sender,
std::bind(&QObject::moveToThread, &sender, app.thread()));
senderThread.start();
#endif
QTimer::singleShot(2, &sender, [&]
{
WriteData(sender, dataSent.cbegin(), dataSent.cend());
#ifdef QTHREADS
senderThread.quit();
#endif
});
app.exec();
auto result = make_io_arrays_compare(std::move(dataSent), std::move(dataReceived),
[](const Data &d1, const Data &d2)
{
return std::tie(d1.index, d1.rndNumber) <
std::tie(d2.index, d2.rndNumber);
});
std::cout << result;
if (result)
return 0;
return -1;
}
Подводя итог, мои вопросы:
- Почему у меня есть ограничение на получение данных, когда сокеты работают в основном (том же) потоке (событие l oop)?
- По какой причине я не получаю согласованных результатов, когда сокеты выполняются в отдельных потоках (циклы событий)?
- Почему запуск теста в режиме отладки занимает невероятно много времени и пакеты потеряны или получены не по порядку?
- Если QThreads (циклы событий) ненадежны, как я могу решить эту проблему без циклов событий?
- Если QUdpSockets дают непредсказуемые, противоречивые и ненадежные результаты, что такое хорошая кроссплатформенная (и желательно легкая) библиотека с сокетами udp и tcp?