Я работаю на многопоточном сокет-сервере для обработки клиентских соединений с несколькими сокетами. Клиенты могут подключаться и отключаться асинхронно с сервером, при подключении клиент должен отправлять некоторые данные в предварительно определенном формате протокола пользовательских пакетов.
В протоколе определены начало кадра (SOP) и конец кадра (EOP).
Я написал код C так, что для каждого успешного клиентского соединения создается поток, который продолжает получать байты от клиента в предопределенном формате пакета, поток имеет локальный конечный автомат потока, потому что каждый клиент может подключиться асинхронно поэтому состояния для каждого клиента могут быть разными.
Ниже приведен поток, который получает эти данные от клиента и поддерживает состояние в зависимости от типа полученного байта:
static void *receive_handler(void *args) {
struct thread_args *local_args = args;
struct sockaddr_in6 *client_address = local_args->client_address;
//struct itimerval timer_val;
int32_t conn_fd = local_args->conn_fd;
int32_t val_read = 0;
int32_t resp_code = 0;
uint32_t sendBuffLen = 0;
int8_t buffer[BUFFER_SIZE] = { 0 };
uint8_t RetBuff[1024] = { 0 };
int8_t rx_addr_str[INET6_ADDRSTRLEN];
int8_t byte = 0;
int16_t idx = ePacketType;
int16_t packet_len = 0;
int16_t calculated_crc = 0, recv_crc = 0;
uint16_t num_bytes = 0;
memset(rx_addr_str, 0, INET6_ADDRSTRLEN);
inet_ntop(AF_INET6, &(client_address->sin6_addr), rx_addr_str, INET6_ADDRSTRLEN);
printf("\nRx Thread (%d) Created for %s\n", local_args->connection_no, rx_addr_str);
int eState = eStart_Frame;
memcpy(rx_Packet_Info[local_args->connection_no].inet6, rx_addr_str, INET6_ADDRSTRLEN);
//timerclear(&timer_val.it_interval); /* zero interval means no reset of timer */
//timerclear(&timer_val.it_value);
//timer_val.it_value.tv_sec = 10; /* 10 second timeout */
//(void) signal(SIGALRM, state_reset_handler);
while (1) {
if (eState != eChecksum_Verify) {
val_read = -1;
val_read = recv(conn_fd, &byte, sizeof(byte), 0);
debug_printf(INFO, "Amount Read: %d Byte Rxd: 0x%x => 0x%X\n", val_read, (byte & 0xFF), byte);
if (val_read <= 0) {
if (parse_packet("ERR_DISCONNECT", rx_addr_str, local_args->connection_no) < 0) {
debug_printf(ERR, "Error parsing packet: %s\n", strerror(errno));
}
debug_printf(ERR, "May be closed by client %s: %s\n", rx_addr_str, strerror(errno));
debug_printf(ERR, "Exiting Rx Thread: ConnIdx: %d", num_connections);
close(conn_fd);
pthread_exit(NULL);
}
}
switch (eState) {
case eStart_Frame:
debug_printf(DEBG, "Current State: %d\n", eState);
if ((val_read > 0) && (byte & 0xFF) == SOP) {
memset(buffer, 0, BUFFER_SIZE);
val_read = -1;
buffer[eSOP] = (byte & 0xFF);
eState = eFrame_Len;
}
break;
case eFrame_Len: {
static char MSB_Rxd = 0;
debug_printf(DEBG, "Current State: %d\n", eState);
if (val_read > 0) {
if (MSB_Rxd == 0) {
buffer[ePacket_length] = byte;
MSB_Rxd = 1;
}
else {
buffer[ePacket_length + 1] = byte;
eState = eFrame;
num_bytes = 0;
MSB_Rxd = 0;
packet_len = (buffer[ePacket_length] & 0xFF << 8) | (buffer[ePacket_length + 1]);
debug_printf(INFO, "Packet Length: %d : 0x%x 0x%x\n", packet_len,
buffer[ePacket_length], buffer[ePacket_length + 1]);
}
}
}
break;
case eFrame:
debug_printf(DEBG, "Current State: %d\n", eState);
num_bytes++;
buffer[idx] = byte;
if (num_bytes == packet_len) {
eState = eEnd_Frame;
debug_printf(DEBG, "Num bytes: 0x%x\n", num_bytes);
}
else {
debug_printf(ERR, "Num bytes: 0x%x Pkt Len: 0x%x\n", num_bytes, packet_len);
}
idx++;
break;
case eEnd_Frame:
debug_printf(ERR, "Current State: %d val read %d\n", eState, val_read);
if ((val_read > 0) && (byte & 0xFF) == EOP) {
val_read = -1;
eState = eChecksum_Verify;
}
break;
case eChecksum_Verify: {
calculated_crc = crc_16(&buffer[ePacket_length], (num_bytes));
recv_crc = buffer[num_bytes + 1] << 8 | (buffer[num_bytes + 2] & 0xFF);
if (calculated_crc != recv_crc) {
debug_printf(ERR, "CRC Error! CRC do not match!!\n");
debug_printf(ERR, "Calculated CRC: 0x%X\nCRC Rxd: 0x%X\n", calculated_crc, recv_crc);
resp_code = CRC_ERR;
send(conn_fd, &resp_code, sizeof(resp_code), 0);
}
else {
if (rx_Packet_Info[local_args->connection_no].packetUUID != NULL) {
free(rx_Packet_Info[local_args->connection_no].packetUUID);
rx_Packet_Info[local_args->connection_no].packetUUID = NULL;
}
rx_Packet_Info[local_args->connection_no].packetUUID = calloc(buffer[ePacketUUIDLen],
sizeof(uint8_t));
memcpy(rx_Packet_Info[local_args->connection_no].packetUUID, &buffer[ePacketUUID],
buffer[ePacketUUIDLen]);
rx_Packet_Info[local_args->connection_no].packetUUIDlength = buffer[ePacketUUIDLen];
printf("\nRX-Thread-UUID %d: ConnNo: %d\n", buffer[ePacketUUIDLen],
local_args->connection_no);
for (char i = 0; i < buffer[ePacketUUIDLen]; i++) {
printf("0x%x ", rx_Packet_Info[local_args->connection_no].packetUUID[i]);
}
printf("\n");
if (parse_packet(buffer, rx_addr_str, local_args->connection_no) < 0) {
debug_printf(ERR, "Error parsing packet: %s\n", strerror(errno));
}
}
num_bytes = 0;
eState = eStart_Frame;
idx = ePacketType;
}
break;
default:
debug_printf(DEBG, "Invalid State!! Should not come here.\n");
num_bytes = 0;
eState = eStart_Frame;
idx = ePacketType;
break;
}
}
return NULL;
}
Мой вопрос: как мне сбросить этот конечный автомат, если, скажем, после получения начала кадра клиент застревает и не может отправить длину кадра или полный кадр до конца кадра?
Один из способов, который я подумал, - реализовать обратный вызов таймера, но я не уверен, как следует отслеживать конечный автомат нескольких потоков.
Кто-нибудь может подсказать, что мне делать в этом сценарии или я делаю что-то не так?