Для моего тестового примера отправляйте сообщение «00» только по bufferevent_write
. case 1: 20 000 TCP-соединений и отправка «00» каждые 10 секунд, это будет стоить 0,15 секунды. случай 2: только 1 TCP-соединение и отправка «00» 20000 раз каждые 10 секунд, это будет стоить 0,015 с.
Пожалуйста, дайте мне несколько советов по улучшению bufferevent_write
производительности.
Я просто хочу как можно быстрее и удивляться тому, что, если bufferevent_write имеет значение asyn c, почему отправить 20k сообщения на 1 tcp намного быстрее, чем отправить 1 msssage на 20k tcp.
CPU info:
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 85
Model name: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping: 7
CPU MHz: 2500.000
BogoMIPS: 5000.00
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 1024K
L3 cache: 36608K
NUMA node0 CPU(s): 0-15
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl cpuid tsc_known_freq pni pclmulqdq monitor ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single pti fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm mpx avx512f avx512dq rdseed adx smap avx512cd avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves arat avx512_vnni
Memory info:
32G
весь тестовый пример
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/thread.h>
#include <netinet/tcp.h>
#include <atomic>
#include <cerrno>
#include <csignal>
#include <cstring>
#include <ctime>
#include <deque>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <set>
#include <thread>
using namespace std::chrono_literals;
static event_base *kEventBase{nullptr};
static evconnlistener *kListener{nullptr};
static std::set<bufferevent *> kSessions{};
static std::mutex kSessionsMutex{};
static std::atomic_bool kRunning{false};
static void stop() {
kRunning = false;
if (kListener != nullptr) {
evconnlistener_disable(kListener);
std::cout << "normal listener stopped" << std::endl;
}
struct timeval local_timeval = {1, 0};
if (kEventBase != nullptr) { event_base_loopexit(kEventBase, &local_timeval); }
}
static void handler(int sig) {
std::cout << "get signal: " << sig << std::endl;
stop();
}
static void ReadCallback(bufferevent *event, void *) {
auto buffer = evbuffer_new();
evbuffer_add_buffer(buffer, bufferevent_get_input(event));
auto data_size = evbuffer_get_length(buffer);
char data[data_size + 1];
bzero(data, data_size + 1);
evbuffer_remove(buffer, data, data_size);
evbuffer_free(buffer);
std::cout << "get data: " << data << std::endl;
}
static void EventCallback(bufferevent *event, short events, void *) {
if (events & BEV_EVENT_EOF) {
std::cout << "socket EOF" << std::endl;
} else if (events & BEV_EVENT_ERROR) {
std::cout << "socket error: " << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
} else if (events & BEV_EVENT_TIMEOUT) {
std::cout << "socket read/write timeout" << std::endl;
} else {
std::cout << "unhandled socket events: " << std::to_string(events) << std::endl;
}
{
std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
kSessions.erase(event);
bufferevent_free(event);
}
}
static void listenerCallback(evconnlistener *, evutil_socket_t socket, sockaddr *, int, void *) {
bufferevent *event =
bufferevent_socket_new(kEventBase, socket, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
if (event == nullptr) {
std::cout << "create buffer event failed" << std::endl;
return;
}
int enable = 1;
setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, (void *)&enable, sizeof(enable));
setsockopt(socket, IPPROTO_TCP, TCP_QUICKACK, (void *)&enable, sizeof(enable));
bufferevent_setcb(event, ReadCallback, nullptr, EventCallback, nullptr);
bufferevent_enable(event, EV_WRITE | EV_READ);
kSessions.emplace(event);
}
int main(int argc, const char **argv) {
signal(SIGTERM, handler);
signal(SIGINT, handler);
evthread_use_pthreads();
// init
kEventBase = event_base_new();
if (kEventBase == nullptr) {
std::cout << "cannot create event_base_miner_listener_" << std::endl;
return -1;
}
sockaddr_in local_sin{};
bzero(&local_sin, sizeof(local_sin));
local_sin.sin_family = AF_INET;
local_sin.sin_port = htons(1800u);
local_sin.sin_addr.s_addr = htonl(INADDR_ANY);
kListener = evconnlistener_new_bind(kEventBase,
listenerCallback,
nullptr,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
-1,
reinterpret_cast<sockaddr *>(&local_sin),
static_cast<int>(sizeof(local_sin)));
if (kListener == nullptr) {
std::cout << "cannot create normal listener" << std::endl;
return -1;
}
kRunning = true;
std::thread thread_send_message([]() {
while (kRunning) {
{
// case 1: If send to 20,000 tcp connection, and send "00" for each, it will cost 0.15s.
std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
std::clock_t clock_start = std::clock();
for (auto &it : kSessions) { bufferevent_write(it, "00", 2); }
std::cout << "send message to all done, client count: " << kSessions.size()
<< ", elapsed: " << std::clock() - clock_start << std::endl;
}
{
// case 2: If send to 1 tcp connection, and send "00" 20,000 times, it will cost 0.015s.
// std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
// for (auto &it : kSessions) {
// std::clock_t clock_start = std::clock();
// for (int i = 0; i < 20000; ++i) { bufferevent_write(it, "00", 2); }
// std::cout << "send message 20k times done, elapsed: " << std::clock() - clock_start
// << std::endl;
// }
}
std::this_thread::sleep_for(10s);
}
});
event_base_dispatch(kEventBase);
if (thread_send_message.joinable()) { thread_send_message.join(); }
{
std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
for (auto &it : kSessions) { bufferevent_free(it); }
kSessions.clear();
}
if (kListener != nullptr) {
evconnlistener_free(kListener);
kListener = nullptr;
}
if (kEventBase != nullptr) {
event_base_free(kEventBase);
kEventBase = nullptr;
}
}
минимально воспроизводимый пример
// case 1: 20,000 tcp connections, and send "00" for each every 10s, it will cost 0.15s.
std::clock_t clock_start = std::clock();
for (auto &it : kSessions) { bufferevent_write(it, "00", 2); }
std::cout << "send message to all done, client count: " << kSessions.size()
<< ", elapsed: " << std::clock() - clock_start << std::endl;
// case 2: only 1 tcp connection, and send "00" 20,000 times every 10s, it will cost 0.015s.
for (auto &it : kSessions) {
std::clock_t clock_start = std::clock();
for (int i = 0; i < 20000; ++i) { bufferevent_write(it, "00", 2); }
std::cout << "send message 20k times done, elapsed: " << std::clock() - clock_start
<< std::endl;
}
цепочка корпуса 1:
% time seconds usecs/call calls errors syscall
------ ----------- ----------- --------- --------- ----------------
56.32 29.519892 9 3135415 408444 futex
20.53 10.762191 7 1490532 epoll_ctl
15.25 7.992391 11 715355 writev
3.98 2.086553 45360 46 nanosleep
1.86 0.973074 11 85273 1 epoll_wait
0.62 0.324022 8 39267 19266 accept4
0.58 0.305246 6 48721 read
0.55 0.286858 6 48762 write
0.30 0.154980 4 40004 setsockopt
0.01 0.006486 5 1216 mprotect
0.01 0.002952 21 143 madvise
0.00 0.001018 7 152 brk
0.00 0.000527 6 94 clock_gettime
0.00 0.000023 3 8 openat
0.00 0.000021 21 1 mremap
0.00 0.000010 0 22 mmap
0.00 0.000007 1 9 close
0.00 0.000000 0 8 fstat
0.00 0.000000 0 3 munmap
0.00 0.000000 0 4 rt_sigaction
0.00 0.000000 0 1 rt_sigprocmask
0.00 0.000000 0 1 ioctl
0.00 0.000000 0 1 readv
0.00 0.000000 0 8 8 access
0.00 0.000000 0 1 socket
0.00 0.000000 0 1 bind
0.00 0.000000 0 1 listen
0.00 0.000000 0 1 clone
0.00 0.000000 0 1 execve
0.00 0.000000 0 4 getuid
0.00 0.000000 0 4 getgid
0.00 0.000000 0 4 geteuid
0.00 0.000000 0 4 getegid
0.00 0.000000 0 1 arch_prctl
0.00 0.000000 0 1 set_tid_address
0.00 0.000000 0 2 set_robust_list
0.00 0.000000 0 1 eventfd2
0.00 0.000000 0 1 epoll_create1
0.00 0.000000 0 1 pipe2
0.00 0.000000 0 1 prlimit64
------ ----------- ----------- --------- --------- ----------------
100.00 52.416251 5605075 427719 total
цепочка корпуса 2:
% time seconds usecs/call calls errors syscall
------ ----------- ----------- --------- --------- ----------------
normal listener stopped
66.66 0.151105 7 22506 3469 futex
9.74 0.022084 6 3709 1 epoll_wait
9.54 0.021624 4 5105 epoll_ctl
9.47 0.021466 8 2550 writev
2.47 0.005598 4 1263 write
1.70 0.003857 3 1246 read
0.18 0.000409 18 23 nanosleep
0.09 0.000197 4 46 clock_gettime
0.03 0.000068 4 16 mprotect
0.02 0.000035 2 21 mmap
0.01 0.000024 8 3 munmap
0.01 0.000019 10 2 1 accept4
0.01 0.000018 5 4 setsockopt
0.01 0.000015 8 2 set_robust_list
0.01 0.000014 4 4 rt_sigaction
0.01 0.000014 4 4 geteuid
0.01 0.000013 3 4 getgid
0.01 0.000012 3 4 getuid
0.01 0.000012 3 4 getegid
0.00 0.000011 1 8 fstat
0.00 0.000010 10 1 socket
0.00 0.000008 8 1 clone
0.00 0.000007 2 3 brk
0.00 0.000007 7 1 pipe2
0.00 0.000006 1 7 openat
0.00 0.000006 6 1 epoll_create1
0.00 0.000005 1 8 8 access
0.00 0.000005 5 1 bind
0.00 0.000005 5 1 eventfd2
0.00 0.000005 5 1 prlimit64
0.00 0.000004 1 7 close
0.00 0.000004 4 1 listen
0.00 0.000003 3 1 rt_sigprocmask
0.00 0.000003 3 1 arch_prctl
0.00 0.000003 3 1 set_tid_address
0.00 0.000000 0 1 execve
------ ----------- ----------- --------- --------- ----------------
100.00 0.226676 36561 3479 total