Тестирование QUdpSocket. Обнаружен лимит на количество принимаемых пакетов. Несогласованные результаты при использовании QThreads, проблемы с режимом отладки - PullRequest
0 голосов
/ 05 августа 2020

Я написал программу для модульного тестирования передачи данных с QUdpSocket. Он отправляет произвольное (предоставленное пользователем) количество Data элементов в отсортированном векторе (Data имеет индекс и случайно сгенерированное число). Программа создает 2 QUdpSocket s: одно для отправки, другое для приема данных. Он создает и отправляет вектор Data, получает его и анализирует на согласованность. Функцию для анализа можно найти здесь . Существует также репо с полным проектом на GitHub.

Я столкнулся с очень странными результатами. Когда я использую главное событие l oop для отправки и получения данных, всегда есть ограничение на количество получаемых байтов: для Windows 10 это 65536 байт, для Windows 7 - 8192. Я пробовал чтобы установить размер буфера QUdpSocket с разными значениями (по умолчанию 0, в документах говорится, что это означает, что буфер не имеет ограничений), но результат всегда один и тот же. Wireshark, однако, показывает, что все пакеты отправляются независимо от того, сколько данных я передаю.

Когда я использую разные QThread s для отправки и получения сокетов, я также получаю очень непоследовательное поведение. Таким образом, когда я запускаю Windows 10 в режиме выпуска (скомпилированный с MSV C или MinGW) ALL , пакеты принимаются. Для режима отладки:

  • просто запуск теста, скомпилированного с MinGW, проходит нормально, и все пакеты принимаются
  • запуск теста в режиме отладки (с CLion) кардинально отличается. Выполнение занимает намного больше времени, и многие пакеты теряются или принимаются не по порядку.
  • выполнение теста, скомпилированного с помощью MSV C, дает неверные результаты - пакеты потеряны или не в порядке, несмотря ни на что.
struct Data
{
    uint64_t index,
            rndNumber;
    // uint64_t zero{0};
};

// helper function to ensure Data is not sent segmented
template<typename Iter, typename = typename
std::enable_if<
        std::is_same<typename std::iterator_traits<Iter>::iterator_category,
                std::random_access_iterator_tag>::value &&
        sizeof(typename std::iterator_traits<Iter>::value_type) <= 512>::type>
void WriteData(QUdpSocket &socket, Iter begin, Iter end)
{
    constexpr size_t sizeofData = sizeof(typename std::iterator_traits<Iter>::value_type);
    static_assert(sizeofData <= 512, "Size of data type must be less or equal 512 bytes");

    assert(std::distance(begin, end) >= 0 && "Invalid range");
    while (begin != end)
    {
        size_t elemsToSend = 512 / sizeofData;
        size_t elemsRemains = std::distance(begin, end);

        elemsToSend = std::min(elemsToSend, elemsRemains);
        size_t bytesToSend = elemsToSend * sizeofData;
        const char *pDatagram = reinterpret_cast<const char *>(&*begin);

        uint64_t bytesWritten = socket.write(pDatagram, bytesToSend);
        assert(bytesWritten == bytesToSend ||
               bytesWritten != -1 && "Failed to send datagram");

        begin = std::next(begin, elemsToSend);
    }
}


int main(int argc, char **argv)
{
    size_t elems = 32768;
    if (argc == 2)
    {
        try
        {
            elems = std::stoll(argv[1]);
        }
        catch (const std::invalid_argument &e)
        {
            std::cerr << "Invalid argument for elements number. Testing with " << elems <<
                      "elements." << std::endl;
        }
    }

    // form vector to send
    std::vector<Data> dataSent{elems},
            dataReceived{};
    {
        std::random_device rd;
        std::mt19937 rng(rd());
        std::uniform_int_distribution<std::mt19937::result_type> dist(0);
        for (size_t i = 0; i != elems; ++i)
        {
            dataSent[i].index = i;
            dataSent[i].rndNumber = dist(rng);
        }
    }

    QHostAddress address = QHostAddress::LocalHost;
    quint16 unicastPortSend = 8001;

    QCoreApplication app{argc, argv};

    QTimer receiveTimeout;
    QObject::connect(&receiveTimeout, &QTimer::timeout,
                     &app, &QCoreApplication::quit, Qt::QueuedConnection);
    receiveTimeout.start(2000);

    QUdpSocket receiver;
    receiver.bind(address, unicastPortSend);

#ifdef QTHREADS
    // receive vector in a separate thread
    QThread receiverThread;
    receiver.moveToThread(&receiverThread);
    receiveTimeout.moveToThread(&receiverThread);

    QObject::connect(&receiveTimeout, &QTimer::timeout,
                     &receiverThread, &QThread::quit);

    // move receiver and timer back to the main thread when finished
    QObject::connect(&receiverThread, &QThread::finished, &receiver,
                     std::bind(&QObject::moveToThread, &receiver,
                               app.thread()));

    QObject::connect(&receiverThread, &QThread::finished, &receiveTimeout,
                     std::bind(&QObject::moveToThread, &receiveTimeout,
                               app.thread()));

    receiverThread.start();
#endif

    // connect reading function
    size_t countSteps{};
    QObject::connect(&receiver, &QUdpSocket::readyRead, [&]
    {
        receiveTimeout.stop();
        ++countSteps;
        while (receiver.hasPendingDatagrams())
        {
            QByteArray array;

            int64_t pendingDatagramSize = receiver.pendingDatagramSize();
            assert(pendingDatagramSize != -1 && "Unexpected datagram size!");

            array.resize(pendingDatagramSize);

            uint16_t readSize = receiver.readDatagram(array.data(), pendingDatagramSize);
            assert(readSize == pendingDatagramSize && "Failed to read pending datagram");

            const Data *pData = reinterpret_cast<const Data *>(array.data());
            size_t elemsReceived = readSize / sizeof(Data);

            std::copy(pData, std::next(pData, elemsReceived), std::back_inserter(dataReceived));
        }
        receiveTimeout.start(200);
    });

    // sender socket
    QUdpSocket sender;
    sender.connectToHost(address, unicastPortSend);

#ifdef QTHREADS
    QThread senderThread;
    sender.moveToThread(&senderThread);

    // move sender back to main thread on finish
    QObject::connect(&senderThread, &QThread::finished,
                     &sender,
                     std::bind(&QObject::moveToThread, &sender, app.thread()));

    senderThread.start();
#endif

    QTimer::singleShot(2, &sender, [&]
    {
        WriteData(sender, dataSent.cbegin(), dataSent.cend());
#ifdef QTHREADS
        senderThread.quit();
#endif
    });

    app.exec();

    auto result = make_io_arrays_compare(std::move(dataSent), std::move(dataReceived),
                                         [](const Data &d1, const Data &d2)
                                         {
                                             return std::tie(d1.index, d1.rndNumber) <
                                                    std::tie(d2.index, d2.rndNumber);
                                         });

    std::cout << result;

    if (result)
        return 0;

    return -1;
}

Подводя итог, мои вопросы:

  1. Почему у меня есть ограничение на получение данных, когда сокеты работают в основном (том же) потоке (событие l oop)?
  2. По какой причине я не получаю согласованных результатов, когда сокеты выполняются в отдельных потоках (циклы событий)?
  3. Почему запуск теста в режиме отладки занимает невероятно много времени и пакеты потеряны или получены не по порядку?
  4. Если QThreads (циклы событий) ненадежны, как я могу решить эту проблему без циклов событий?
  5. Если QUdpSockets дают непредсказуемые, противоречивые и ненадежные результаты, что такое хорошая кроссплатформенная (и желательно легкая) библиотека с сокетами udp и tcp?
...