Почему решения Unix IPC теряют пропускную способность выше 16 МБ в пользовательских тестах? - PullRequest
2 голосов
/ 05 октября 2019

Я хотел бы оценить общую производительность некоторых распространенных решений IPC в системах Unix. В качестве отправной точки я создал несколько небольших тестовых программ, предназначенных для отправки двоичных данных от разветвленного клиента на сервер (размер 1 КиБ - 512 МиБ, каждая итерация удваивает предыдущий размер). Блокируя временные метки в обоих процессах, можно оценить все, разделив размер данных на прошедшее время.

Кажется, все работает, однако я не понимаю, почему все результаты резко падаютпревышает 16 МБ и затем застаивается, как видно на следующей диаграмме:

Unix IPC throughput

Для справки сокращения обозначаются следующим образом: uds (сокет домена Unix), fifo (FIFO), fifoenh (расширенный FIFO = увеличенный локальный и глобальный размер FIFO), mmap (разделяемая память POSIX), tcp (сокет TCP), mq (очередь сообщений POSIX), zmq-ipc-pr (ZeroMQ).

Я приложил пример программы, используемой для оценки пропускной способности сокетов домена Unix (проверка ошибок для ясности опущена). Все остальные работают аналогично:

#include <sstream>
#include <fstream>
#include <string>
#include <memory>
#include <chrono>

#include <sys/mman.h>
#include <sys/wait.h>
#include <sys/un.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>

#include <sys/socket.h>
#include <netinet/in.h>

typedef struct {
    sem_t sem;
} MySem;

int main(int argc, char *argv[])
{
    // setup log file
    std::stringstream fileName;
    fileName << "uds" << "_";
    std::ofstream logFile;

    // setup semaphore & place in shared memory
    size_t szMySem = sizeof(MySem);
    MySem* pSem = (MySem*)mmap(NULL, szMySem, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, 0, 0);
    sem_init(&pSem->sem, 1, 0);

    // remove, if present
    unlink("/tmp/uds.uds");

    struct sockaddr_un udsAddr;
    memset(&udsAddr, 0, sizeof(udsAddr));
    udsAddr.sun_family = AF_UNIX;
    strcpy(udsAddr.sun_path, "/tmp/uds.uds");

    int pid = fork();

    if (pid > 0)
    {
        // parent (server = reader)
        fileName << "server.csv";
        logFile.open(fileName.str());

        int sockFd = socket(AF_UNIX, SOCK_STREAM, 0);
        bind(sockFd, (struct sockaddr*)&udsAddr, sizeof(udsAddr));
        listen(sockFd, 10);

        // inform client that socket is ready
        sem_post(&pSem->sem);

        socklen_t addrlen = sizeof (struct sockaddr_in);
        int connFd = accept(sockFd, (struct sockaddr *) &udsAddr, &addrlen);

        // from 1 KiB to 512 MiB
        for(int szKiB = 1; szKiB < 1048576; szKiB = szKiB * 2)
        {
            int szBytes = szKiB * 1024;

            // 100 runs for each size to attain average
            for (int i = 0; i < 100; i++)
            {
                // create buffer
                std::shared_ptr<uint8_t[]> pBuf(new uint8_t[szBytes]);

                uint32_t allBytesRead = 0;
                while(allBytesRead != szBytes)
                {
                    // read() blocks until data is available
                    uint32_t bytesRead = read(connFd, (void*)(pBuf.get() + allBytesRead), szBytes);
                    allBytesRead += bytesRead;
                }

                // lock current time after read
                long timeStamp = (std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
                logFile << szBytes << ";" << i << ";" << 2 << ";" << allBytesRead << ";;" << timeStamp << std::endl;

                // inform client that it can send next data package
                sem_post(&pSem->sem);
            }
        }

        // cleanup
        close(connFd);
        close(sockFd);
        unlink("/tmp/uds.uds");
        // remove semaphore from shared memory
        munmap(pSem, szMySem);
        logFile.close();
    }

    else
    {
        // child (client = writer)
        fileName << "client.csv";
        logFile.open(fileName.str());

        // wait until server has set up socket
        sem_wait(&pSem->sem);

        int sockFd = socket(AF_UNIX, SOCK_STREAM, 0);
        connect(sockFd, (struct sockaddr*)&udsAddr, sizeof(udsAddr));

        // from 1 KiB to 512 MiB
        for(int szKiB = 1; szKiB < 1048576; szKiB = szKiB * 2)
        {
            int szBytes = 1024 * szKiB;

            // 100 runs for each size to attain average
            for(int it = 0; it < 100; it++)
            {
                // create buffer
                std::shared_ptr<uint8_t[]> pBuf(new uint8_t[szBytes]);
                // char 'A'
                memset(pBuf.get(), 0x41, szBytes);

                // lock current time before write
                long timeStamp = (std::chrono::duration_cast<std::chrono::microseconds>(
                            std::chrono::system_clock::now().time_since_epoch()).count());

                uint32_t allBytesWritten = 0;
                while(allBytesWritten != szBytes)
                {
                    uint32_t bytesWritten = write(sockFd, (void*)(pBuf.get() + allBytesWritten), szBytes);
                    allBytesWritten += bytesWritten;
                }

                logFile << szBytes << ";" << it << ";" << 1 << ";;" << allBytesWritten << ";" << timeStamp << std::endl;

                // wait until server has read data
                sem_wait(&pSem->sem);
                // then wait 1 sec before next send
                sleep(1);
            }
        }

        // cleanup
        close(sockFd);
        logFile.close();
    }

    return 0;
}

Все тесты проводились на экземпляре AWS EC2 md5.large (Intel Xeon Platinum 8175 @ 3,1 ГГц, 8 ГБ ОЗУ, Ubuntu Server 18.04 LTS x64). Любые намеки относительно того, что может вызвать это специфическое поведение, приветствуются.

...