Как правильно читать данные при использовании epoll_wait - PullRequest
4 голосов
/ 04 апреля 2011

Я пытаюсь портировать на Linux существующий код Windows C ++, который использует IOCP.Решив использовать epoll_wait для достижения высокого параллелизма, я уже столкнулся с теоретической проблемой, когда мы пытаемся обработать полученные данные.

Представьте, что два потока вызывают epoll_wait, и два последовательных сообщения, получаемых такимчто Linux разблокирует первый поток и вскоре второй.

Пример:

Thread 1 blocks on epoll_wait
Thread 2 blocks on epoll_wait
Client sends a chunk of data 1
Thread 1 deblocks from epoll_wait, performs recv and tries to process data
Client sends a chunk of data 2
Thread 2 deblocks, performs recv and tries to process data.

Возможен ли этот сценарий?Т.е. это может произойти?

Есть ли способ предотвратить это, чтобы избежать реализации синхронизации в коде recv / processing?

Ответы [ 5 ]

5 голосов
/ 04 апреля 2011

Если у вас есть несколько потоков, читающих с одного и того же набора ручек epoll, я бы порекомендовал вам перевести свои ручки epoll в режим запуска по одному уровню с EPOLLONESHOT.Это гарантирует, что после того, как один поток наблюдает за сработавшим дескриптором, никакой другой поток не будет наблюдать его, пока вы не используете epoll_ctl для повторного включения дескриптора.

Если вам нужно обрабатывать пути чтения и записи независимо,может потребоваться полностью разделить пулы потоков чтения и записи;иметь один дескриптор epoll для событий чтения и один для событий записи и назначать потоки исключительно одному или другому.Кроме того, есть отдельная блокировка для чтения и для путей записи.Разумеется, вы должны быть осторожны во взаимодействии между потоками чтения и записи, чтобы изменить любое состояние для каждого сокета.

Если вы используете такой подход разделения, вам нужно подумать, как обращаться сзакрытие сокетов.Скорее всего, вам понадобится дополнительная блокировка общих данных и флаги «подтверждения закрытия», установленные под общей блокировкой данных, для путей чтения и записи.Потоки чтения и записи могут затем поспорить, чтобы подтвердить, и последний, кто подтвердит, получит очистку общих структур данных.То есть как то так:

void OnSocketClosed(shareddatastructure *pShared, int writer)
{
  epoll_ctl(myepollhandle, EPOLL_CTL_DEL, pShared->fd, NULL);
  LOCK(pShared->common_lock);
  if (writer)
    pShared->close_ack_w = true;
  else
    pShared->close_ack_r = true;

  bool acked = pShared->close_ack_w && pShared->close_ack_r;
  UNLOCK(pShared->common_lock);

  if (acked)
    free(pShared);
}
3 голосов
/ 04 апреля 2011

Я предполагаю, что ситуация, которую вы пытаетесь обработать, выглядит примерно так:

У вас есть несколько (может быть, очень много) сокетов, из которых вы хотите получать данные одновременно;

Вы хотите начать обработку данных с первого соединения в потоке A при его первом получении, а затем убедитесь, что данные из этого соединения не обрабатываются ни в одном другом потоке, пока вы не закончили с ним в потоке A.

Пока вы это делаете, если некоторые данные теперь получены по другому соединению, вы хотите, чтобы поток B выбрал эти данные и обработал их, при этом все еще будучи уверенным, что никто не сможет обработать это соединение, пока поток B не завершит его и т. Д. .

В этих обстоятельствах оказывается, что использование epoll_wait () с одним и тем же epoll fd в нескольких потоках является достаточно эффективным подходом (я не утверждаю, что он обязательно является наиболее эффективным).

Хитрость в том, чтобы добавить отдельные соединения fds в epoll fd с флагом EPOLLONESHOT. Это гарантирует, что как только fd был возвращен из epoll_wait (), он не контролируется, пока вы специально не скажете epoll, чтобы отслеживать его снова. Это гарантирует, что поток, обрабатывающий это соединение, не подвергается никаким помехам, поскольку никакой другой поток не может обрабатывать это соединение, пока этот поток не помечает соединение, подлежащее мониторингу.

Вы можете настроить fd для отслеживания EPOLLIN или EPOLLOUT снова, используя epoll_ctl () и EPOLL_CTL_MOD.

Существенным преимуществом использования epoll, подобного этому, в нескольких потоках является то, что, когда один поток завершает соединение и добавляет его обратно в контролируемый набор epoll, все остальные потоки, все еще находящиеся в epoll_wait (), немедленно отслеживают его даже до предыдущего поток обработки возвращается к epoll_wait (). Между прочим, это также может быть недостатком из-за отсутствия локальности данных кэша, если другой поток теперь немедленно обнаруживает это соединение (таким образом, необходимо извлечь структуры данных для этого соединения и очистить кэш предыдущего потока). Что работает лучше всего, будет зависеть от вашего точного шаблона использования.

Если вы пытаетесь обрабатывать сообщения, полученные впоследствии по одному и тому же соединению в разных потоках, тогда эта схема использования epoll вам не подойдет, и подход с использованием прослушивающего потока, питающего эффективную очередь, питающую рабочие потоки, может быть лучше.

2 голосов
/ 05 апреля 2011

Предыдущие ответы, которые указывают на то, что вызов epoll_wait () из нескольких потоков является плохой идеей, почти наверняка верны, но я был достаточно заинтригован вопросом, чтобы попытаться выяснить, что происходит, когда он вызывается из нескольких потоков в та же ручка, ожидающая того же сокета. Я написал следующий тестовый код:

#include <netinet/in.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

struct thread_info {
  int number;
  int socket;
  int epoll;
};

void * thread(struct thread_info * arg)
{
    struct epoll_event events[10];
    int s;
    char buf[512];

    sleep(5 * arg->number);
    printf("Thread %d start\n", arg->number);

    do {
        s = epoll_wait(arg->epoll, events, 10, -1);

        if (s < 0) {
            perror("wait");
            exit(1);
        } else if (s == 0) {
            printf("Thread %d No data\n", arg->number);
            exit(1);
        }
        if (recv(arg->socket, buf, 512, 0) <= 0) {
            perror("recv");
            exit(1);
        }
        printf("Thread %d got data\n", arg->number);
    } while (s == 1);

    printf("Thread %d end\n", arg->number);

    return 0;
}

int main()
{
    pthread_attr_t attr;
    pthread_t threads[2];
    struct thread_info thread_data[2];
    int s;
    int listener, client, epollfd;
    struct sockaddr_in listen_address;
    struct sockaddr_storage client_address;
    socklen_t client_address_len;
    struct epoll_event ev;

    listener = socket(AF_INET, SOCK_STREAM, 0);

    if (listener < 0) {
        perror("socket");
        exit(1);
    }

    memset(&listen_address, 0, sizeof(struct sockaddr_in));
    listen_address.sin_family = AF_INET;
    listen_address.sin_addr.s_addr = INADDR_ANY;
    listen_address.sin_port = htons(6799);

    s = bind(listener,
             (struct sockaddr*)&listen_address,
             sizeof(listen_address));

    if (s != 0) {
        perror("bind");
        exit(1);
    }

    s = listen(listener, 1);

    if (s != 0) {
        perror("listen");
        exit(1);
    }

    client_address_len = sizeof(client_address);
    client = accept(listener,
                    (struct sockaddr*)&client_address,
                    &client_address_len);

    epollfd = epoll_create(10);
    if (epollfd == -1) {
        perror("epoll_create");
        exit(1);
    }

    ev.events = EPOLLIN;
    ev.data.fd = client;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, client, &ev) == -1) {
        perror("epoll_ctl: listen_sock");
        exit(1);
    }

    thread_data[0].number = 0;
    thread_data[1].number = 1;
    thread_data[0].socket = client;
    thread_data[1].socket = client;
    thread_data[0].epoll = epollfd;
    thread_data[1].epoll = epollfd;

    s = pthread_attr_init(&attr);
    if (s != 0) {
        perror("pthread_attr_init");
        exit(1);
    }

    s = pthread_create(&threads[0],
                       &attr,
                       (void*(*)(void*))&thread,
                       &thread_data[0]);

    if (s != 0) {
        perror("pthread_create");
        exit(1);
    }

    s = pthread_create(&threads[1],
                       &attr,
                       (void*(*)(void*))&thread,
                       &thread_data[1]);

    if (s != 0) {
        perror("pthread_create");
        exit(1);
    }

    pthread_join(threads[0], 0);
    pthread_join(threads[1], 0);

    return 0;
}

Когда данные поступают, и оба потока ожидают epoll_wait (), будет возвращаться только один, но при поступлении последующих данных поток, который просыпается для обработки данных, фактически является случайным между двумя потоками. Я не смог найти способ повлиять на то, какая нить проснулась.

Кажется вероятным, что единственный поток, вызывающий epoll_wait, наиболее целесообразен, когда события передаются рабочим потокам для накачки ввода-вывода.

1 голос
/ 04 апреля 2011

Я считаю, что высокопроизводительное программное обеспечение, использующее epoll и поток на ядро, создает несколько дескрипторов epoll, каждый из которых обрабатывает подмножество всех соединений. Таким образом, работа разделяется, но проблема, которую вы описываете, устраняется.

0 голосов
/ 04 апреля 2011

Обычно epoll используется, когда один поток прослушивает данные из одного асинхронного источника.Чтобы избежать ожидания «занято» (опрос вручную), вы используете epoll, чтобы сообщить, когда данные готовы (так же, как select).

Это не стандартная практика, когда несколько потоков читают изодин источник данных, и я, по крайней мере, считаю это плохой практикой.

Если вы хотите использовать несколько потоков, но у вас есть только один входной источник, то назначьте один из потоков для прослушивания и помещения данных в очередь.поэтому другие потоки могут читать отдельные фрагменты из очереди.

...