AF_XDP-Socket vs Linux Sockets: почему мой AF-XDP Socket теряет пакеты, а универсальный c linux - нет? - PullRequest
1 голос
/ 15 марта 2020

Я сравниваю сокеты AF-XDP и Linux Сокеты с точки зрения того, сколько пакетов они могут обработать без потери пакетов (потеря пакетов определяется как порядковый номер RTP текущего пакета, не равный RTP порядковый номер предыдущего пакета + 1).

Я заметил, что моя программа сокета AF-XDP (я не могу определить, связана ли эта проблема с программой ядра или программой пространства пользователя) потеря около ~25 пакетов в секунду со скоростью 390.000 пакетов в секунду, тогда как эквивалентная программа с универсальными c linux сокетами не теряет никаких пакетов.

Я реализовал так называемый distributor -program, который загружает программу ядра XDP один раз, устанавливает универсальный сокет c linux и добавляет setsockopt(IP_ADD_MEMBERSHIP) к этому универсальному сокету c для каждого многоадресного адреса, который я передаю программе через командную строку. После этого distributor загружает файловый дескриптор BPF_MAP_TYPE_HASH, помещенный в программу ядра XDP, и вставляет маршруты для traffi c на тот случай, если один сокет AF-XDP должен совместно использовать свой umem позже.

Затем программа XDP-kernel проверяет каждый пакет IPv4 / UDP на наличие записи в этой карте ha sh. В основном это выглядит так:

const struct pckt_idntfy_raw raw = {
    .src_ip = 0, /* not used at the moment */
    .dst_ip = iph->daddr,
    .dst_port = udh->dest,
    .pad = 0
};

const int *idx = bpf_map_lookup_elem(&xdp_packet_mapping, &raw);

if(idx != NULL) {
    if (bpf_map_lookup_elem(&xsks_map, idx)) {
        bpf_printk("Found socket @ index: %d!\n", *idx);
        return bpf_redirect_map(&xsks_map, *idx, 0);
    } else {
        bpf_printk("Didn't find connected socket for index %d!\n", *idx);
    }
}

В случае, если существует idx, это означает, что за этим индексом в BPF_MAP_TYPE_XSKMAP.

находится сокет. После выполнения всего этого distributor порождает новый процесс через fork(), передавая все многоадресные адреса (включая порт назначения), которые должны обрабатываться этим процессом (один процесс обрабатывает одну RX-очередь). Если RX-очередей недостаточно, некоторые процессы могут получить несколько групповых адресов. Это означает, что они будут использовать SHARED UMEM.

Я в основном ориентировал свою программу пространства пользователя AF-XDP на этот пример кода: https://github.com/torvalds/linux/blob/master/samples/bpf/xdpsock_user.c

Я использую те же функции xsk_configure_umem, xsk_populate_fill_ring и xsk_configure_socket.

Поскольку я решил, что для этого приложения не требуется максимальная задержка, я отправляю процесс в спящий режим на указанное время (около 1 - 2ms) после чего он проходит через каждый сокет AF-XDP (большую часть времени это только один сокет) и обрабатывает каждый полученный пакет для этого сокета, проверяя, что ни один пакет не был пропущен:

while(!global_exit) {
    nanosleep(&spec, &remaining);

    for(int i = 0; i < cfg.ip_addrs_len; i++) {
        struct xsk_socket_info *socket = xsk_sockets[i];
        if(atomic_exchange(&socket->stats_sync.lock, 1) == 0) {
            handle_receive_packets(socket);
            atomic_fetch_xor(&socket->stats_sync.lock, 1); /* release socket-lock */
        }
    }
}

На мой взгляд, в этом нет ничего особенного, но почему-то я теряю ~25 пакетов на 390.000 пакетах, даже если мой UMEM близок к 1 ГБ ОЗУ.

Для сравнения, мой generi c Программа для сокетов linux выглядит следующим образом (вкратце):

int fd = socket(AF_INET, SOCK_RAW, IPPROTO_UDP);

/* setting some socket options */

struct sockaddr_in sin;
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = cfg->ip_addrs[0]->pckt.dst_port;
inet_aton(cfg->ip_addrs[0]->pckt.dst_ip, &sin.sin_addr);

if(bind(fd, (struct sockaddr*)&sin, sizeof(struct sockaddr)) < 0) {
    fprintf(stderr, "Error on binding socket: %s\n", strerror(errno));
    return - 1;
}

ioctl(fd, SIOCGIFADDR, &intf);

Программа distributor создает новый процесс для каждого данного multicast-ip в случае использования сокетов generi c linux (потому что нет сложных методов такие как SHARED-UMEM в генерируемых сокетах c, я не беспокоюсь о нескольких многоадресных потоках на процесс). Позже я, конечно, присоединяюсь к многоадресному членству:

struct ip_mreqn mreq;
memset(&mreq, 0, sizeof(struct ip_mreqn));

const char *multicast_ip = cfg->ip_addrs[0]->pckt.dst_ip;

if(inet_pton(AF_INET, multicast_ip, &mreq.imr_multiaddr.s_addr)) {
    /* Local interface address */
    memcpy(&mreq.imr_address, &cfg->ifaddr, sizeof(struct in_addr));
    mreq.imr_ifindex = cfg->ifindex;

    if(setsockopt(igmp_socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreqn)) < 0) {
        fprintf(stderr, "Failed to set `IP_ADD_MEMBERSHIP`: %s\n", strerror(errno));
        return;
    } else {
        printf("Successfully added Membership for IP: %s\n", multicast_ip);
    }
}

и начинаю обрабатывать пакеты (не в спящем режиме, а в стиле busy-loop):

void read_packets_recvmsg_with_latency(struct config *cfg, struct statistic *st, void *buff, const int igmp_socket_fd) {
    char ctrl[CMSG_SPACE(sizeof(struct timeval))];

    struct msghdr msg;
    struct iovec iov;
    msg.msg_control = (char*)ctrl;
    msg.msg_controllen = sizeof(ctrl);
    msg.msg_name = &cfg->ifaddr;
    msg.msg_namelen = sizeof(cfg->ifaddr);

    msg.msg_iov = &iov;
    msg.msg_iovlen = 1;
    iov.iov_base = buff;
    iov.iov_len = BUFFER_SIZE;

    struct timeval time_user, time_kernel;
    struct cmsghdr *cmsg = (struct cmsghdr*)&ctrl;

    const int64_t read_bytes = recvmsg(igmp_socket_fd, &msg, 0);
    if(read_bytes == -1) {
        return;
    }

    gettimeofday(&time_user, NULL);

    if(cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_TIMESTAMP) {
        memcpy(&time_kernel, CMSG_DATA(cmsg), sizeof(struct timeval));
    }

    if(verify_rtp(cfg, st, read_bytes, buff)) {
        const double timediff = (time_user.tv_sec - time_kernel.tv_sec) * 1000000 + (time_user.tv_usec - time_kernel.tv_usec);
        if(timediff > st->stats.latency_us) {
            st->stats.latency_us = timediff;
        }
    }
}



int main(...) {
    ....
    while(!is_global_exit) {
        read_packets_recvmsg_with_latency(&cfg, &st, buffer, igmp_socket_fd);
    }
}

Вот и все.

Обратите внимание, что в описанном случае использования, когда я начинаю терять пакеты, я не использую SHARED UMEM, это всего лишь одна RX-очередь, получающая многоадресный поток. В случае, если я обрабатываю меньший многоадресный поток около 150.000 pps - решение AF-XDP не теряет никаких пакетов. Но это также наоборот - около 520.000 pps в той же очереди RX (используя SHARED UMEM) я получаю потерю 12.000 pps.

Есть идеи, что мне не хватает?

...