Отсутствует что-то или я просто не понимаю эполл? - PullRequest
7 голосов
/ 06 марта 2012

Полное раскрытие, я студент, и это задание.Я работал над ней почти неделю без перерыва (в дополнение к предыдущему времени), и я не могу понять, что я делаю неправильно.Мой сервер продолжает зависать на epoll_wait после того, как сделано «несколько» recvs («несколько», потому что я ожидаю несколько ГБ данных и получаю только несколько десятков МБ).Я не думаю, что с моим клиентом все в порядке, потому что он отлично работает с моими избранными и многопоточными серверами.Пожалуйста, быстро взгляните и дайте мне знать, если есть что-то, что выскакивает из-за вас как причина моей проблемы.

Основная идея клиента / сервера состоит в том, чтобы засыпать сервер соединениями (10k +) ипередать определенное количество данных через несколько раз.Этот сервер epoll испытывает проблемы с 2000 годом, когда мой многопоточный сервер справился с задачей 10 тысяч.

Я НЕ прошу вас выполнить мое задание для меня (я почти закончил), я простонужна помощь, чтобы выяснить, что я делаю не так здесь.Заранее благодарим за любую помощь, которую вы можете предложить:)

  1 #include "common.h"
  2 #include <sys/epoll.h>
  3 
  4 uint16_t ready[MAX_CONNS];
  5 uint16_t next;
  6 pthread_mutex_t mutex;
  7 
  8 void *worker_thread(void *param) {
  9     int my_sock, pos;
 10     struct conn_params *conn_ps = (struct conn_params *)param;
 11 
 12     while (1) {
 13         pthread_mutex_lock(&mutex);
 14 
 15         while (1) {
 16             if (next == MAX_CONNS) {
 17                 printf("balls\n");
 18                 next = 4;
 19             }
 20 
 21             if (ready[next] != 0) {
 22                 pos = next;
 23                 my_sock = ready[pos];
 24                 next++;
 25                 break;
 26             }
 27         }
 28 
 29         pthread_mutex_unlock(&mutex);
 30         /* handle recv/send */
 31         if (echo_recv(&conn_ps[my_sock], MULTIPLE) == 0) { /* closed conn */
 32             shutdown(my_sock, SHUT_RDWR);
 33             close(my_sock);
 34             serv_stats.active_connections--;
 35         }
 36         ready[pos] = 0;
 37 /*      print_conn_stats(&conn_ps[my_sock]);*/
 38     }
 39 }
 40 
 41 void *add_client_thread(void *param) {
 42     struct epoll_accept_thread *eat = (struct epoll_accept_thread *)param;
 43     struct sockaddr client;
 44     struct epoll_event event;
 45     socklen_t client_len;
 46     int new_sock, ret;
 47     char hostbuf[NI_MAXHOST], servbuf[NI_MAXSERV];
 48 
 49     bzero(&client, sizeof(struct sockaddr));
 50     event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
 51 
 52     while ((new_sock = accept(eat->listen_sock, &client, &client_len)) != -1) {
 53         set_nonblock(new_sock);
 54         event.data.fd = new_sock;
 55         if (epoll_ctl(eat->fd_epoll, EPOLL_CTL_ADD, new_sock, &event) == -1) {
 56             perror("epoll_ctl");
 57             printf("%u\n", new_sock);
 58             continue;
 59         }
 60 
 61         bzero(&(eat->conn_ps[new_sock]), sizeof(struct conn_params));
 62         eat->conn_ps[new_sock].sock = new_sock;
 63         if ((ret = getnameinfo(&client, client_len, hostbuf, NI_MAXHOST, servbuf, NI_MAXSERV, NI_NUMERICHOST)) != 0) {
 64             gai_strerror(ret);
 65         }
 66 
 67         update_server_stats();
 68         printf("added client\n");
 69     }
 70 
 71     if (errno != EAGAIN) {
 72         perror("Couldn't accept connection");
 73     }
 74 
 75     pthread_exit(NULL);
 76 }
 77 
 78 int main(int argc, char **argv) {
 79     char opt, *port = NULL;
 80     struct addrinfo hints, *results, *p;
 81     int listen_sock = new_tcp_sock(), nfds, i, ret;
 82     int fd_epoll, next_avail = 4;
 83     struct conn_params conn_ps[MAX_CONNS];
 84     struct epoll_event evs[MAX_CONNS];
 85     struct epoll_event event;
 86     struct epoll_accept_thread eat;
 87     pthread_t thread;
 88 
 89     while ((opt = getopt(argc, argv, ":l:")) != -1) {
 90         switch (opt) {
 91             case 'l': /* port to listen on */
 92                 port = optarg;
 93                 break;
 94             case '?': /* unknown option */
 95                 fprintf(stderr, "The option -%c is not supported.\n", opt);
 96                 exit(1);
 97             case ':': /* required arg not supplied for option */
 98                 fprintf(stderr, "The option -%c requires an argument.\n", opt);
 99                 exit(1);
100         }
101     } /* command line arg processing done */
102 
103     if (port == NULL) {
104         fprintf(stderr, "You must provide the port to listen on (-l).\n");
105         exit(1);
106     }
107 
108     signal(SIGINT, handle_interrupt);
109 
110     bzero(&hints, sizeof(struct addrinfo));
111     hints.ai_family = AF_INET;
112     hints.ai_socktype = SOCK_STREAM;
113     hints.ai_flags = AI_PASSIVE;
114 
115     set_nonblock(listen_sock);
116     set_reuseaddr(listen_sock);
117 
118     if ((ret = getaddrinfo(NULL, port, &hints, &results) != 0)) {
119         gai_strerror(ret);
120         exit(1);
121     }
122 
123     for (p = results; p != NULL; p = p->ai_next) { /* attempt to connect to the host */
124         if (bind(listen_sock, p->ai_addr, p->ai_addrlen) == -1) {
125             perror("Bind failed");
126         } else {
127             break;
128         }
129     }
130 
131     if (p == NULL) { /* we were unable to connect to anything */
132         fprintf(stderr, "Unable to bind to the specified port. Exiting...\n");
133         exit(1);
134     }
135 
136     freeaddrinfo(results);
137 
138     if (listen(listen_sock, 5) == -1) {
139         perror("Listen failed");
140         exit(1);
141     }
142 
143     /* everything is set up. method-specific code goes below */
144 
145     start_server_stats();
146     next = 4;
147 
148     if ((fd_epoll = epoll_create(MAX_CONNS)) == -1) {
149         perror("epoll_create");
150         exit(1);
151     }
152 
153     event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
154     event.data.fd = listen_sock;
155     if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, listen_sock, &event) == -1) {
156         perror("epoll_ctl");
157         exit(1);
158     }
159 
160     signal(SIGPIPE, SIG_IGN);
161     bzero(ready, MAX_CONNS * sizeof(uint16_t));
162     pthread_mutex_init(&mutex, NULL);
163 
164     for (i = 0; i < 5; i++) { /* five workers should be enough */
165         pthread_create(&thread, NULL, worker_thread, (void *)&conn_ps);
166     }
167 
168     while (1) {
169         if ((nfds = epoll_wait(fd_epoll, evs, MAX_CONNS, -1)) > 0 && errno == EINTR) {
170             continue;
171         }
172         for (i = 0; i < nfds; i++) { /* loop through all FDs */
173             if (evs[i].events & (EPOLLERR | EPOLLHUP)) { /* if there's an error or a hangup */
174                 /*fprintf(stderr, "Error! Danger, Will Robinson! Danger!");*/
175                 close(evs[i].data.fd);
176                 continue;
177             } else if (evs[i].data.fd == listen_sock) { /* we have a new connection coming in */
178                 eat.listen_sock = listen_sock;
179                 eat.fd_epoll = fd_epoll;
180                 eat.conn_ps = conn_ps;
181                 pthread_create(&thread, NULL, add_client_thread, (void *)&eat);
182             } else { /* inbound data */
183                 while (ready[next_avail] != 0) {
184                     next_avail++;
185 
186                     if (next_avail == MAX_CONNS) {
187                         next_avail = 4;
188                     }
189                 }
190                 ready[next_avail] = evs[i].data.fd;
191             } /* end inbound data */
192         } /* end iterating through FDs */
193     } /* end epoll_wait loop */
194 
195     perror("epoll_wait");
196 
197     return 0;
198 }

А вот функция echo_recv, так как я предполагаю, что кто-то тоже захочет это увидеть:

 14 int echo_recv(struct conn_params *conn_p, int single) {
 15     char client_buf[CLIENT_BUF_SIZE], buffer[BUF_SIZE];
 16     int nread, nwrite, nsent = 0, i;
 17 
 18     while ((nread = recv(conn_p->sock, client_buf, CLIENT_BUF_SIZE, 0)) > 0) {
 19         /* create buffer of MULTIPLIER(int) times what was received */
 20         for (i = 0; i < MULTIPLIER && nread*i < BUF_SIZE; i++) {
 21             memcpy(buffer+(nread*i), client_buf, nread);
 22         }
 23 
 24         /* send the created buffer */
 25         while ((nwrite = send(conn_p->sock, buffer+nsent, (nread*MULTIPLIER)-nsent, 0)) > 0) {
 26             nsent += nwrite;
 27         }
 28 
 29         conn_p->total_recvd += nread; /* update our stats for this conn */
 30         conn_p->total_sent += nsent; /* update our status for this conn */
 31         serv_stats.total_recvd += nread;
 32         serv_stats.total_sent += nsent;
 33         nsent = 0;
 34 
 35         if (single) {
 36             return 1;
 37         }
 38     }
 39 
 40     if (nread == -1 && (errno & EAGAIN)) {
 41         return 1;
 42     }
 43 
 44     if (nread == -1) {
 45         perror("wtf?");
 46     }
 47 
 48     shutdown(conn_p->sock, SHUT_RDWR);
 49     close(conn_p->sock);
 50 
 51     return 0; /* recv failed */
 52 }

Ответы [ 2 ]

2 голосов
/ 06 марта 2012

Вот несколько мыслей:

  1. Вы действительно должны посмотреть, как осуществляется доступ к общему массиву ready. В вашем рабочем потоке вы получаете мьютекс для его чтения, однако бывают случаи, когда вы изменяете эту вне блокировки, кроме того, вы не получаете эту блокировку в цикле опроса (основной поток), вы просто пишете в массив - это совершенно неправильно.
  2. Вы не сохраняете идентификаторы потоков для всех рабочих потоков, как вы предлагаете уничтожить их (или дождаться их завершения - обычно вам нужно вызвать pthread_join)
  3. Вы создаете отдельный поток для принятия соединения, но снова изменяете общую структуру epoll_accept_thread в этом потоке - и вокруг нее нет блокировки.

Сначала я решу все проблемы с синхронизацией, а затем могут появиться другие проблемы.

1 голос
/ 06 марта 2012

Я хотел опубликовать это в комментарии выше, но он стал намного длиннее, чем позволял:

Попробуйте реализовать простой сервер на основе epoll, который полностью асинхронный (шаги)

  1. Настройте ваш принимающий сокет ...
  2. Добавить в epoll.
  3. войти в цикл ожидания:
    1. Проверить, находится ли событие на принимающем сокете или обычном сокете
    2. Если принять сокет, принять соединение, добавить в epoll, вернуться к 3
    3. Если событие в обычном сокете для чтения, прочитать байты X, сохранить в буфер записи и включить событие записи в epoll для сокета, вернитесь к 3
    4. Если событие на обычном сокете для записи, записать байты из буфера в сеть, отключить событие записи, если буфер записи пуст, вернуться к 3.
    5. Если произошла ошибка, удалите сокет из epoll
  4. Четвертого шага нет ... программа должна зацикливаться вечно.

Это должно убрать любую сложность, которую вы добавили, из-за наличия потоков, которые могут вызвать проблемы. Это перемещает epoll обратно в тот же домен, что и select(), за исключением того, что он обычно намного быстрее. Вся идея использования библиотеки событий состоит в том, чтобы знать, когда вы можете читать / записывать вместо установки неблокирующего сокета и пытаться читать из него / писать в него.

Вы также никогда не проверяете возвращаемое значение из write(), которое, возможно, не удалось из-за получения SIGPIPE (я знаю, что вы проигнорировали сигнал, но вы все равно получите EAGAIN / EINTR с ошибкой).

Другое, что я вижу, это то, что вы делаете занятый цикл внутри вашего потока, который ожидает готовности сокетов. Когда вы используете select() или epoll, в этом случае вы получаете уведомление о появлении чего-то нового, поэтому вам не нужно выполнять цикл занятости ...

Я не совсем уверен, чего вы пытаетесь достичь, но ваш код крайне неэффективен.

Что вы могли бы сделать, после реализации простого асинхронного примера с использованием описанных выше шагов - запустить несколько рабочих потоков, которые все прослушивают (используя epoll) события read в сокете listener / accept и имеют каждый из потоков обрабатывают различные соединения (все еще используя то, что я написал выше).

...