У меня есть сервер, написанный на простом старом C, принимающий TCP-соединения с использованием kqueue на FreeBSD.
Входящие соединения принимаются и добавляются в простой пул соединений для отслеживания дескриптора файла.
Когда данные получены (в EVFILT_READ), я вызываю recv (), а затем помещаюполезная нагрузка в очереди сообщений для другого потока для его обработки.Прием и обработка данных этим способом работает идеально.
Когда поток обработки завершен, может потребоваться отправить что-то обратно клиенту.Поскольку поток обработки имеет доступ к пулу соединений и может легко получить дескриптор файла, я просто вызываю send () из потока обработки.
Это работает в 99% случаев, но время от времениkqueue дает мне флаг EV_EOF, и соединение обрывается.
Существует четкая корреляция между частотой вызовов send () и количеством ошибок EV_EOF, поэтому я чувствую EV_EOF из-занекоторое состояние гонки между моей нитью kqueue и нитью обработки.
Вызовы send () всегда возвращают ожидаемое количество байтов, поэтому я не заполняю буфер передачи.
Так что мой вопрос; Это приемлемо длявызвать send () из отдельного потока, как описано здесь?Если нет, то каков будет правильный способ асинхронной отправки данных клиентам?
Все примеры, которые я нахожу, вызывают вызовы send () в том же контексте, что и цикл kqueue, но моим потокам обработки, возможно, придется отправлятьвозвращать данные в любое время - даже через несколько минут после последних полученных данных от клиента - поэтому, очевидно, я не могу заблокировать цикл kqueue на это время ..
Соответствующие фрагменты кода:
void *tcp_srvthread(void *arg)
{
[[...Bunch of declarations...]]
tcp_serversocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
...
setsockopt(tcp_serversocket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int));
...
err = bind(tcp_serversocket, (const struct sockaddr*)&sa, sizeof(sa));
...
err = listen(tcp_serversocket, 10);
...
kq = kqueue();
EV_SET(&evSet, tcp_serversocket, EVFILT_READ | EV_CLEAR, EV_ADD, 0, 0, NULL);
...
while(!fTerminated) {
timeout.tv_sec = 2; timeout.tv_nsec = 0;
nev = kevent(kq, &evSet, 0, evList, NLIST, &timeout);
for (i=0; i<nev; i++) {
if (evList[i].ident == tcp_serversocket) { // new connection?
socklen = sizeof(addr);
fd = accept(evList[i].ident, &addr, &socklen); // accept it
if(fd > 0) { // accept ok?
uidx = conn_add(fd, (struct sockaddr_in *)&addr); // Add it to connected controllers
if(uidx >= 0) { // add ok?
EV_SET(&evSet, fd, EVFILT_READ | EV_CLEAR, EV_ADD, 0, 0, (void*)(uint64_t)(0x00E20000 | uidx)); // monitor events from it
if (kevent(kq, &evSet, 1, NULL, 0, NULL) == -1) { // monitor ok?
conn_delete(uidx); // ..no, so delete it from my list also
}
} else { // no room on server?
close(fd);
}
}
else Log(0, "ERR: accept fd=%d", fd);
}
else
if (evList[i].flags & EV_EOF) {
[[ ** THIS IS CALLED SOMETIMES AFTER CALLING SEND - WHY?? ** ]]
uidx = (uint32_t)evList[i].udata;
conn_delete( uidx );
}
else
if (evList[i].filter == EVFILT_READ) {
if((nr = recv(evList[i].ident, buf, sizeof(buf)-2, 0)) > 0) {
uidx = (uint32_t)evList[i].udata;
recv_data(uidx, buf, nr); // This will queue the message for the processing thread
}
}
}
else {
// should not get here.
}
}
}
Поток обработки выглядит примерно так (очевидно, что в дополнение к тому, что показано) происходит множество манипуляций с данными:
void *parsethread(void *arg)
{
int len;
tmsg_Queue mq;
char is_ok;
while(!fTerminated) {
if((len = msgrcv(msgRxQ, &mq, sizeof(tmsg_Queue), 0, 0)) > 0) {
if( process_message(mq) ) {
[[ processing will find the uidx of the client and build the return data ]]
send( ctl[uidx].fd, replydata, replydataLen, 0 );
}
}
}
}
Цените любые идеи или толчки в правильном направлении.Спасибо.