Я сравниваю сокеты AF-XDP и Linux Сокеты с точки зрения того, сколько пакетов они могут обработать без потери пакетов (потеря пакетов определяется как порядковый номер RTP текущего пакета, не равный RTP порядковый номер предыдущего пакета + 1
).
Я заметил, что моя программа сокета AF-XDP (я не могу определить, связана ли эта проблема с программой ядра или программой пространства пользователя) потеря около ~25
пакетов в секунду со скоростью 390.000
пакетов в секунду, тогда как эквивалентная программа с универсальными c linux сокетами не теряет никаких пакетов.
Я реализовал так называемый distributor
-program, который загружает программу ядра XDP один раз, устанавливает универсальный сокет c linux и добавляет setsockopt(IP_ADD_MEMBERSHIP)
к этому универсальному сокету c для каждого многоадресного адреса, который я передаю программе через командную строку. После этого distributor
загружает файловый дескриптор BPF_MAP_TYPE_HASH
, помещенный в программу ядра XDP, и вставляет маршруты для traffi c на тот случай, если один сокет AF-XDP должен совместно использовать свой umem позже.
Затем программа XDP-kernel проверяет каждый пакет IPv4 / UDP на наличие записи в этой карте ha sh. В основном это выглядит так:
const struct pckt_idntfy_raw raw = {
.src_ip = 0, /* not used at the moment */
.dst_ip = iph->daddr,
.dst_port = udh->dest,
.pad = 0
};
const int *idx = bpf_map_lookup_elem(&xdp_packet_mapping, &raw);
if(idx != NULL) {
if (bpf_map_lookup_elem(&xsks_map, idx)) {
bpf_printk("Found socket @ index: %d!\n", *idx);
return bpf_redirect_map(&xsks_map, *idx, 0);
} else {
bpf_printk("Didn't find connected socket for index %d!\n", *idx);
}
}
В случае, если существует idx
, это означает, что за этим индексом в BPF_MAP_TYPE_XSKMAP
.
находится сокет. После выполнения всего этого distributor
порождает новый процесс через fork()
, передавая все многоадресные адреса (включая порт назначения), которые должны обрабатываться этим процессом (один процесс обрабатывает одну RX-очередь). Если RX-очередей недостаточно, некоторые процессы могут получить несколько групповых адресов. Это означает, что они будут использовать SHARED UMEM
.
Я в основном ориентировал свою программу пространства пользователя AF-XDP на этот пример кода: https://github.com/torvalds/linux/blob/master/samples/bpf/xdpsock_user.c
Я использую те же функции xsk_configure_umem
, xsk_populate_fill_ring
и xsk_configure_socket
.
Поскольку я решил, что для этого приложения не требуется максимальная задержка, я отправляю процесс в спящий режим на указанное время (около 1 - 2ms
) после чего он проходит через каждый сокет AF-XDP (большую часть времени это только один сокет) и обрабатывает каждый полученный пакет для этого сокета, проверяя, что ни один пакет не был пропущен:
while(!global_exit) {
nanosleep(&spec, &remaining);
for(int i = 0; i < cfg.ip_addrs_len; i++) {
struct xsk_socket_info *socket = xsk_sockets[i];
if(atomic_exchange(&socket->stats_sync.lock, 1) == 0) {
handle_receive_packets(socket);
atomic_fetch_xor(&socket->stats_sync.lock, 1); /* release socket-lock */
}
}
}
На мой взгляд, в этом нет ничего особенного, но почему-то я теряю ~25
пакетов на 390.000
пакетах, даже если мой UMEM близок к 1 ГБ ОЗУ.
Для сравнения, мой generi c Программа для сокетов linux выглядит следующим образом (вкратце):
int fd = socket(AF_INET, SOCK_RAW, IPPROTO_UDP);
/* setting some socket options */
struct sockaddr_in sin;
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = cfg->ip_addrs[0]->pckt.dst_port;
inet_aton(cfg->ip_addrs[0]->pckt.dst_ip, &sin.sin_addr);
if(bind(fd, (struct sockaddr*)&sin, sizeof(struct sockaddr)) < 0) {
fprintf(stderr, "Error on binding socket: %s\n", strerror(errno));
return - 1;
}
ioctl(fd, SIOCGIFADDR, &intf);
Программа distributor
создает новый процесс для каждого данного multicast-ip в случае использования сокетов generi c linux (потому что нет сложных методов такие как SHARED-UMEM в генерируемых сокетах c, я не беспокоюсь о нескольких многоадресных потоках на процесс). Позже я, конечно, присоединяюсь к многоадресному членству:
struct ip_mreqn mreq;
memset(&mreq, 0, sizeof(struct ip_mreqn));
const char *multicast_ip = cfg->ip_addrs[0]->pckt.dst_ip;
if(inet_pton(AF_INET, multicast_ip, &mreq.imr_multiaddr.s_addr)) {
/* Local interface address */
memcpy(&mreq.imr_address, &cfg->ifaddr, sizeof(struct in_addr));
mreq.imr_ifindex = cfg->ifindex;
if(setsockopt(igmp_socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreqn)) < 0) {
fprintf(stderr, "Failed to set `IP_ADD_MEMBERSHIP`: %s\n", strerror(errno));
return;
} else {
printf("Successfully added Membership for IP: %s\n", multicast_ip);
}
}
и начинаю обрабатывать пакеты (не в спящем режиме, а в стиле busy-loop
):
void read_packets_recvmsg_with_latency(struct config *cfg, struct statistic *st, void *buff, const int igmp_socket_fd) {
char ctrl[CMSG_SPACE(sizeof(struct timeval))];
struct msghdr msg;
struct iovec iov;
msg.msg_control = (char*)ctrl;
msg.msg_controllen = sizeof(ctrl);
msg.msg_name = &cfg->ifaddr;
msg.msg_namelen = sizeof(cfg->ifaddr);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
iov.iov_base = buff;
iov.iov_len = BUFFER_SIZE;
struct timeval time_user, time_kernel;
struct cmsghdr *cmsg = (struct cmsghdr*)&ctrl;
const int64_t read_bytes = recvmsg(igmp_socket_fd, &msg, 0);
if(read_bytes == -1) {
return;
}
gettimeofday(&time_user, NULL);
if(cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_TIMESTAMP) {
memcpy(&time_kernel, CMSG_DATA(cmsg), sizeof(struct timeval));
}
if(verify_rtp(cfg, st, read_bytes, buff)) {
const double timediff = (time_user.tv_sec - time_kernel.tv_sec) * 1000000 + (time_user.tv_usec - time_kernel.tv_usec);
if(timediff > st->stats.latency_us) {
st->stats.latency_us = timediff;
}
}
}
int main(...) {
....
while(!is_global_exit) {
read_packets_recvmsg_with_latency(&cfg, &st, buffer, igmp_socket_fd);
}
}
Вот и все.
Обратите внимание, что в описанном случае использования, когда я начинаю терять пакеты, я не использую SHARED UMEM
, это всего лишь одна RX-очередь, получающая многоадресный поток. В случае, если я обрабатываю меньший многоадресный поток около 150.000 pps
- решение AF-XDP не теряет никаких пакетов. Но это также наоборот - около 520.000 pps
в той же очереди RX (используя SHARED UMEM
) я получаю потерю 12.000 pps
.
Есть идеи, что мне не хватает?