Как улучшить производительность libevent bufferevent_write - PullRequest
0 голосов
/ 06 августа 2020

Для моего тестового примера отправляйте сообщение «00» только по bufferevent_write. case 1: 20 000 TCP-соединений и отправка «00» каждые 10 секунд, это будет стоить 0,15 секунды. случай 2: только 1 TCP-соединение и отправка «00» 20000 раз каждые 10 секунд, это будет стоить 0,015 с.

Пожалуйста, дайте мне несколько советов по улучшению bufferevent_write производительности.

Я просто хочу как можно быстрее и удивляться тому, что, если bufferevent_write имеет значение asyn c, почему отправить 20k сообщения на 1 tcp намного быстрее, чем отправить 1 msssage на 20k tcp.

CPU info:
Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              16
On-line CPU(s) list: 0-15
Thread(s) per core:  2
Core(s) per socket:  8
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               85
Model name:          Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping:            7
CPU MHz:             2500.000
BogoMIPS:            5000.00
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:           32K
L1i cache:           32K
L2 cache:            1024K
L3 cache:            36608K
NUMA node0 CPU(s):   0-15
Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl cpuid tsc_known_freq pni pclmulqdq monitor ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single pti fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm mpx avx512f avx512dq rdseed adx smap avx512cd avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves arat avx512_vnni

Memory info:
32G

весь тестовый пример

#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/thread.h>
#include <netinet/tcp.h>

#include <atomic>
#include <cerrno>
#include <csignal>
#include <cstring>
#include <ctime>
#include <deque>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <set>
#include <thread>

using namespace std::chrono_literals;

static event_base *kEventBase{nullptr};
static evconnlistener *kListener{nullptr};
static std::set<bufferevent *> kSessions{};
static std::mutex kSessionsMutex{};
static std::atomic_bool kRunning{false};

static void stop() {
    kRunning = false;
    if (kListener != nullptr) {
        evconnlistener_disable(kListener);
        std::cout << "normal listener stopped" << std::endl;
    }
    struct timeval local_timeval = {1, 0};
    if (kEventBase != nullptr) { event_base_loopexit(kEventBase, &local_timeval); }
}

static void handler(int sig) {
    std::cout << "get signal: " << sig << std::endl;
    stop();
}

static void ReadCallback(bufferevent *event, void *) {
    auto buffer = evbuffer_new();
    evbuffer_add_buffer(buffer, bufferevent_get_input(event));
    auto data_size = evbuffer_get_length(buffer);
    char data[data_size + 1];
    bzero(data, data_size + 1);
    evbuffer_remove(buffer, data, data_size);
    evbuffer_free(buffer);
    std::cout << "get data: " << data << std::endl;
}

static void EventCallback(bufferevent *event, short events, void *) {
    if (events & BEV_EVENT_EOF) {
        std::cout << "socket EOF" << std::endl;
    } else if (events & BEV_EVENT_ERROR) {
        std::cout << "socket error: " << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
    } else if (events & BEV_EVENT_TIMEOUT) {
        std::cout << "socket read/write timeout" << std::endl;
    } else {
        std::cout << "unhandled socket events: " << std::to_string(events) << std::endl;
    }

    {
        std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
        kSessions.erase(event);
        bufferevent_free(event);
    }
}

static void listenerCallback(evconnlistener *, evutil_socket_t socket, sockaddr *, int, void *) {
    bufferevent *event =
        bufferevent_socket_new(kEventBase, socket, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);

    if (event == nullptr) {
        std::cout << "create buffer event failed" << std::endl;
        return;
    }

    int enable = 1;
    setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, (void *)&enable, sizeof(enable));
    setsockopt(socket, IPPROTO_TCP, TCP_QUICKACK, (void *)&enable, sizeof(enable));

    bufferevent_setcb(event, ReadCallback, nullptr, EventCallback, nullptr);
    bufferevent_enable(event, EV_WRITE | EV_READ);

    kSessions.emplace(event);
}

int main(int argc, const char **argv) {
    signal(SIGTERM, handler);
    signal(SIGINT, handler);

    evthread_use_pthreads();

    // init
    kEventBase = event_base_new();
    if (kEventBase == nullptr) {
        std::cout << "cannot create event_base_miner_listener_" << std::endl;
        return -1;
    }

    sockaddr_in local_sin{};
    bzero(&local_sin, sizeof(local_sin));
    local_sin.sin_family = AF_INET;
    local_sin.sin_port = htons(1800u);
    local_sin.sin_addr.s_addr = htonl(INADDR_ANY);
    kListener = evconnlistener_new_bind(kEventBase,
                                        listenerCallback,
                                        nullptr,
                                        LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
                                        -1,
                                        reinterpret_cast<sockaddr *>(&local_sin),
                                        static_cast<int>(sizeof(local_sin)));
    if (kListener == nullptr) {
        std::cout << "cannot create normal listener" << std::endl;
        return -1;
    }

    kRunning = true;
    std::thread thread_send_message([]() {
        while (kRunning) {
            {
                // case 1: If send to 20,000 tcp connection, and send "00" for each, it will cost 0.15s.
                std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
                std::clock_t clock_start = std::clock();
                for (auto &it : kSessions) { bufferevent_write(it, "00", 2); }
                std::cout << "send message to all done, client count: " << kSessions.size()
                          << ", elapsed: " << std::clock() - clock_start << std::endl;
            }
            {
                // case 2: If send to 1 tcp connection, and send "00" 20,000 times, it will cost 0.015s.
//                std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
//                for (auto &it : kSessions) {
//                    std::clock_t clock_start = std::clock();
//                    for (int i = 0; i < 20000; ++i) { bufferevent_write(it, "00", 2); }
//                    std::cout << "send message 20k times done, elapsed: " << std::clock() - clock_start
//                              << std::endl;
//                }
            }
            std::this_thread::sleep_for(10s);
        }
    });

    event_base_dispatch(kEventBase);

    if (thread_send_message.joinable()) { thread_send_message.join(); }
    {
        std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
        for (auto &it : kSessions) { bufferevent_free(it); }
        kSessions.clear();
    }
    if (kListener != nullptr) {
        evconnlistener_free(kListener);
        kListener = nullptr;
    }
    if (kEventBase != nullptr) {
        event_base_free(kEventBase);
        kEventBase = nullptr;
    }
}

минимально воспроизводимый пример

// case 1: 20,000 tcp connections, and send "00" for each every 10s, it will cost 0.15s.
std::clock_t clock_start = std::clock();
for (auto &it : kSessions) { bufferevent_write(it, "00", 2); }
std::cout << "send message to all done, client count: " << kSessions.size()
          << ", elapsed: " << std::clock() - clock_start << std::endl;
// case 2: only 1 tcp connection, and send "00" 20,000 times every 10s, it will cost 0.015s.
for (auto &it : kSessions) {
    std::clock_t clock_start = std::clock();
    for (int i = 0; i < 20000; ++i) { bufferevent_write(it, "00", 2); }
    std::cout << "send message 20k times done, elapsed: " << std::clock() - clock_start
              << std::endl;
}

цепочка корпуса 1:

% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
 56.32   29.519892           9   3135415    408444 futex
 20.53   10.762191           7   1490532           epoll_ctl
 15.25    7.992391          11    715355           writev
  3.98    2.086553       45360        46           nanosleep
  1.86    0.973074          11     85273         1 epoll_wait
  0.62    0.324022           8     39267     19266 accept4
  0.58    0.305246           6     48721           read
  0.55    0.286858           6     48762           write
  0.30    0.154980           4     40004           setsockopt
  0.01    0.006486           5      1216           mprotect
  0.01    0.002952          21       143           madvise
  0.00    0.001018           7       152           brk
  0.00    0.000527           6        94           clock_gettime
  0.00    0.000023           3         8           openat
  0.00    0.000021          21         1           mremap
  0.00    0.000010           0        22           mmap
  0.00    0.000007           1         9           close
  0.00    0.000000           0         8           fstat
  0.00    0.000000           0         3           munmap
  0.00    0.000000           0         4           rt_sigaction
  0.00    0.000000           0         1           rt_sigprocmask
  0.00    0.000000           0         1           ioctl
  0.00    0.000000           0         1           readv
  0.00    0.000000           0         8         8 access
  0.00    0.000000           0         1           socket
  0.00    0.000000           0         1           bind
  0.00    0.000000           0         1           listen
  0.00    0.000000           0         1           clone
  0.00    0.000000           0         1           execve
  0.00    0.000000           0         4           getuid
  0.00    0.000000           0         4           getgid
  0.00    0.000000           0         4           geteuid
  0.00    0.000000           0         4           getegid
  0.00    0.000000           0         1           arch_prctl
  0.00    0.000000           0         1           set_tid_address
  0.00    0.000000           0         2           set_robust_list
  0.00    0.000000           0         1           eventfd2
  0.00    0.000000           0         1           epoll_create1
  0.00    0.000000           0         1           pipe2
  0.00    0.000000           0         1           prlimit64
------ ----------- ----------- --------- --------- ----------------
100.00   52.416251               5605075    427719 total

цепочка корпуса 2:

% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
normal listener stopped
 66.66    0.151105           7     22506      3469 futex
  9.74    0.022084           6      3709         1 epoll_wait
  9.54    0.021624           4      5105           epoll_ctl
  9.47    0.021466           8      2550           writev
  2.47    0.005598           4      1263           write
  1.70    0.003857           3      1246           read
  0.18    0.000409          18        23           nanosleep
  0.09    0.000197           4        46           clock_gettime
  0.03    0.000068           4        16           mprotect
  0.02    0.000035           2        21           mmap
  0.01    0.000024           8         3           munmap
  0.01    0.000019          10         2         1 accept4
  0.01    0.000018           5         4           setsockopt
  0.01    0.000015           8         2           set_robust_list
  0.01    0.000014           4         4           rt_sigaction
  0.01    0.000014           4         4           geteuid
  0.01    0.000013           3         4           getgid
  0.01    0.000012           3         4           getuid
  0.01    0.000012           3         4           getegid
  0.00    0.000011           1         8           fstat
  0.00    0.000010          10         1           socket
  0.00    0.000008           8         1           clone
  0.00    0.000007           2         3           brk
  0.00    0.000007           7         1           pipe2
  0.00    0.000006           1         7           openat
  0.00    0.000006           6         1           epoll_create1
  0.00    0.000005           1         8         8 access
  0.00    0.000005           5         1           bind
  0.00    0.000005           5         1           eventfd2
  0.00    0.000005           5         1           prlimit64
  0.00    0.000004           1         7           close
  0.00    0.000004           4         1           listen
  0.00    0.000003           3         1           rt_sigprocmask
  0.00    0.000003           3         1           arch_prctl
  0.00    0.000003           3         1           set_tid_address
  0.00    0.000000           0         1           execve
------ ----------- ----------- --------- --------- ----------------
100.00    0.226676                 36561      3479 total

1 Ответ

1 голос
/ 07 августа 2020

Как улучшить производительность libevent bufferevent_write

Прочтите документацию libevent , изучите его исходный код и рассмотрите другие события l oop библиотеки вроде libev , Qt , Wt , libonion , POCO et c ... .

Помните о нескольких моментах. Я предполагаю, что это современная система Linux / x86-64.

  • вы можете профилировать свою библиотеку событий с открытым исходным кодом l oop (например, скомпилировав ее из исходного кода с недавним G CC и с использованием флагов -pg -O2, затем strace (1) и / или gprof (1) и / или perf (1) и / или time (1) (а также top (1) , ps (1) , pro c (5) , netstat (8) , ip (8) , ifconfig (8) , tcpdump (8) , xosview до наблюдайте за своей всей Linux системой ). Конечно, прочтите time (7) и epoll ( 7) и poll (2)

  • TCP / IP вводит некоторые накладные расходы, IP-маршрутизация добавляет дополнительные накладные расходы, и типичный пакет Ethe rnet содержит не менее сотен байтов (и десятков байтов накладных расходов). Вы, безусловно, захотите send (2) или recv (2) сразу несколько сотен байт. * 107 8 * Отправка коротких "00" сообщений (около четырех байтов полезной нагрузки) неэффективна . Убедитесь, что ваше приложение отправляет сообщения размером в сотни байт одновременно. Вы можете рассмотреть какой-нибудь подход JSONRP C (и, конечно, разработать свой протокол на более высоком уровне с меньшим количеством, но более крупными сообщениями, запускающими каждое более сложное поведение) или какой-то MPI один. Способ отправки меньшего количества сообщений более высокого уровня - это встроить некоторый интерпретатор, например Guile или Lua, и отправлять фрагменты сценария или запросы более высокого уровня (например, NeWS делал в прошлом, и PostgreSQL или exim делать сегодня)

  • для коротких и небольших сообщений предпочитают запускать несколько процессов или потоков на том же компьютере и используйте mq_overview (7) , pipe (7) , fifo (7) , unix (7) , избегая Ethe rnet.

  • Большинство компьютеров 2020 многоядерные , и с осторожностью вы можете использовать Pthreads или std::thread (с одним потоком, работающим на каждом ядре, поэтому как минимум 2 или 4 разных потока на портативном компьютере или сотня потоков на мощном сервере Linux). Вам понадобится код синхронизации (например, std::mutex с std::lock_guard или мьютексы Pẗhread ....)

  • помните о проблеме C10K , а черпайте вдохновение из существующих серверных программ или библиотек с открытым исходным кодом , таких как lighttpd , Wt , FLTK , REDIS , Vmime , libcurl , libonion (и изучение их исходный код и наблюдают за их поведением во время выполнения с gdb (1) и / или strace (1) или ltrace (1) )

  • узким местом может быть сеть (тогда вы не сможете улучшить свой код для повышения производительности; вам потребуются некоторые изменения в вашем программном обеспечении архитектура). Подробнее о облачных вычислениях , распределенных вычислениях , XDR , ASN.1 , SOAP, REST , Веб-службы , библиотеки sh, π-исчисление

Обратите внимание, что :

static void handler(int sig) {
   std::cout << "get signal: " << sig << std::endl;
   stop();
}

при использовании с signal (7) противоречит правилам signal-security (7) , поэтому вы можете использовать pipe ( 7) для трюка с самообработкой, как , предложенного Qt , или рассмотрите возможность использования системного вызова Linux Speci c signalfd (2) .

Читайте также Расширенный Linux Программирование затем системные вызовы (2) и socket (7) и tcp ( 7) .

...