Конечный автомат для потока для связи с сокетом - PullRequest
0 голосов
/ 24 января 2019

Я работаю на многопоточном сокет-сервере для обработки клиентских соединений с несколькими сокетами. Клиенты могут подключаться и отключаться асинхронно с сервером, при подключении клиент должен отправлять некоторые данные в предварительно определенном формате протокола пользовательских пакетов.
В протоколе определены начало кадра (SOP) и конец кадра (EOP).

Я написал код C так, что для каждого успешного клиентского соединения создается поток, который продолжает получать байты от клиента в предопределенном формате пакета, поток имеет локальный конечный автомат потока, потому что каждый клиент может подключиться асинхронно поэтому состояния для каждого клиента могут быть разными.
Ниже приведен поток, который получает эти данные от клиента и поддерживает состояние в зависимости от типа полученного байта:

static void *receive_handler(void *args) {

  struct thread_args *local_args = args;
  struct sockaddr_in6 *client_address = local_args->client_address;
  //struct itimerval timer_val;
  int32_t conn_fd = local_args->conn_fd;
  int32_t val_read = 0;
  int32_t resp_code = 0;
  uint32_t sendBuffLen = 0;
  int8_t buffer[BUFFER_SIZE] = { 0 };
  uint8_t RetBuff[1024] = { 0 };
  int8_t rx_addr_str[INET6_ADDRSTRLEN];
  int8_t byte = 0;
  int16_t idx = ePacketType;
  int16_t packet_len = 0;
  int16_t calculated_crc = 0, recv_crc = 0;
  uint16_t num_bytes = 0;

  memset(rx_addr_str, 0, INET6_ADDRSTRLEN);
  inet_ntop(AF_INET6, &(client_address->sin6_addr), rx_addr_str, INET6_ADDRSTRLEN);
  printf("\nRx Thread (%d) Created for %s\n", local_args->connection_no, rx_addr_str);

  int eState = eStart_Frame;

  memcpy(rx_Packet_Info[local_args->connection_no].inet6, rx_addr_str, INET6_ADDRSTRLEN);

  //timerclear(&timer_val.it_interval); /* zero interval means no reset of timer */
  //timerclear(&timer_val.it_value);
  //timer_val.it_value.tv_sec = 10; /* 10 second timeout */

  //(void) signal(SIGALRM, state_reset_handler);

  while (1) {

    if (eState != eChecksum_Verify) {
      val_read = -1;
      val_read = recv(conn_fd, &byte, sizeof(byte), 0);
      debug_printf(INFO, "Amount Read: %d Byte Rxd: 0x%x => 0x%X\n", val_read, (byte & 0xFF), byte);
      if (val_read <= 0) {
        if (parse_packet("ERR_DISCONNECT", rx_addr_str, local_args->connection_no) < 0) {
          debug_printf(ERR, "Error parsing packet: %s\n", strerror(errno));
        }
        debug_printf(ERR, "May be closed by client %s: %s\n", rx_addr_str, strerror(errno));
        debug_printf(ERR, "Exiting Rx Thread: ConnIdx: %d", num_connections);
        close(conn_fd);
        pthread_exit(NULL);
      }
    }

    switch (eState) {

      case eStart_Frame:
        debug_printf(DEBG, "Current State: %d\n", eState);
        if ((val_read > 0) && (byte & 0xFF) == SOP) {
          memset(buffer, 0, BUFFER_SIZE);
          val_read = -1;
          buffer[eSOP] = (byte & 0xFF);
          eState = eFrame_Len;
        }
        break;

      case eFrame_Len: {
        static char MSB_Rxd = 0;
        debug_printf(DEBG, "Current State: %d\n", eState);
        if (val_read > 0) {
          if (MSB_Rxd == 0) {
            buffer[ePacket_length] = byte;
            MSB_Rxd = 1;
          }
          else {
            buffer[ePacket_length + 1] = byte;
            eState = eFrame;
            num_bytes = 0;
            MSB_Rxd = 0;
            packet_len = (buffer[ePacket_length] & 0xFF << 8) | (buffer[ePacket_length + 1]);
            debug_printf(INFO, "Packet Length: %d : 0x%x 0x%x\n", packet_len,
                buffer[ePacket_length], buffer[ePacket_length + 1]);
          }
        }
      }
        break;

      case eFrame:
        debug_printf(DEBG, "Current State: %d\n", eState);
        num_bytes++;
        buffer[idx] = byte;
        if (num_bytes == packet_len) {
          eState = eEnd_Frame;
          debug_printf(DEBG, "Num bytes: 0x%x\n", num_bytes);
        }
        else {
          debug_printf(ERR, "Num bytes: 0x%x Pkt Len: 0x%x\n", num_bytes, packet_len);
        }
        idx++;
        break;

      case eEnd_Frame:
        debug_printf(ERR, "Current State: %d val read %d\n", eState, val_read);
        if ((val_read > 0) && (byte & 0xFF) == EOP) {
          val_read = -1;
          eState = eChecksum_Verify;
        }
        break;

      case eChecksum_Verify: {

        calculated_crc = crc_16(&buffer[ePacket_length], (num_bytes));
        recv_crc = buffer[num_bytes + 1] << 8 | (buffer[num_bytes + 2] & 0xFF);

        if (calculated_crc != recv_crc) {
          debug_printf(ERR, "CRC Error! CRC do not match!!\n");
          debug_printf(ERR, "Calculated CRC: 0x%X\nCRC Rxd: 0x%X\n", calculated_crc, recv_crc);
          resp_code = CRC_ERR;
          send(conn_fd, &resp_code, sizeof(resp_code), 0);
        }
        else {
          if (rx_Packet_Info[local_args->connection_no].packetUUID != NULL) {
            free(rx_Packet_Info[local_args->connection_no].packetUUID);
            rx_Packet_Info[local_args->connection_no].packetUUID = NULL;
          }

          rx_Packet_Info[local_args->connection_no].packetUUID = calloc(buffer[ePacketUUIDLen],
              sizeof(uint8_t));
          memcpy(rx_Packet_Info[local_args->connection_no].packetUUID, &buffer[ePacketUUID],
              buffer[ePacketUUIDLen]);
          rx_Packet_Info[local_args->connection_no].packetUUIDlength = buffer[ePacketUUIDLen];

          printf("\nRX-Thread-UUID %d: ConnNo: %d\n", buffer[ePacketUUIDLen],
              local_args->connection_no);
          for (char i = 0; i < buffer[ePacketUUIDLen]; i++) {
            printf("0x%x ", rx_Packet_Info[local_args->connection_no].packetUUID[i]);
          }
          printf("\n");
          if (parse_packet(buffer, rx_addr_str, local_args->connection_no) < 0) {
            debug_printf(ERR, "Error parsing packet: %s\n", strerror(errno));
          }
        }
        num_bytes = 0;
        eState = eStart_Frame;
        idx = ePacketType;
      }
        break;

      default:
        debug_printf(DEBG, "Invalid State!! Should not come here.\n");
        num_bytes = 0;
        eState = eStart_Frame;
        idx = ePacketType;
        break;
    }
  }

  return NULL;
}

Мой вопрос: как мне сбросить этот конечный автомат, если, скажем, после получения начала кадра клиент застревает и не может отправить длину кадра или полный кадр до конца кадра?

Один из способов, который я подумал, - реализовать обратный вызов таймера, но я не уверен, как следует отслеживать конечный автомат нескольких потоков. Кто-нибудь может подсказать, что мне делать в этом сценарии или я делаю что-то не так?

1 Ответ

0 голосов
/ 24 января 2019

Если я правильно разбираю вопрос, вы спрашиваете, как изящно справиться с ситуацией, когда подключающийся клиент не отправляет данные своевременно - т.е. он отправил первую часть сообщения, но (из-за проблем с сетью, из-за ошибки на стороне клиента или чего-либо еще) никогда не отправляет остальное, оставляя ваш поток ввода-вывода на стороне сервера заблокированным в вызове recv() на длительное / неопределенное время.

Если так, то первый вопрос, который нужно задать, это: действительно ли это проблема? Если каждое соединение получает свой собственный поток, то блокирование одного конкретного потока / соединения не должно вызывать проблем для других потоков, поскольку все они выполняются независимо друг от друга. Так, может быть, вы можете просто полностью игнорировать проблему?

Тем не менее, более вероятный ответ заключается в том, что игнорирование проблемы не достаточно хорошо из-за пары последующих проблем, которые не легко игнорируются: (а) что, если слишком много клиентских подключений «зависнет» на в то же время? Одно или два зашедших в тупик соединения / потоки TCP не имеют большого значения, но если одна и та же проблема продолжает возникать, в конечном итоге у вас закончатся ресурсы для порождения большего количества потоков или соединений TCP, и тогда ваш сервер больше не сможет функционировать. И (б) что, если процесс сервера хочет выйти сейчас? (т. е. потому что пользователь сервера отправил ему прерывание SIGINT или подобное). Если один или несколько потоков заблокированы на неопределенный срок, сервер не сможет выйти своевременно и контролируемым образом, поскольку основной поток должен ждать все потоки TCP должны выйти первыми, прежде чем он сможет очистить ресурсы всего процесса, и любые заблокированные потоки не будут выходить в течение длительного времени, если вообще когда-либо.

Итак, если предположить, что проблема требует , то самый надежный способ, который я нашел, - это никогда не блокировать recv() (или send()). , Вместо этого убедитесь, что каждый сокет находится в неблокирующем режиме, и вместо этого используйте блок while в цикле только в вызове select(). Это делает ваш конечный автомат немного более сложным (поскольку теперь ему придется обрабатывать как частичные, так и частичные приемы), но компенсирующим преимуществом является то, что поток теперь лучше контролирует свое собственное поведение блокировки. В частности, вы можете указать select() всегда возвращать через определенное время, несмотря ни на что, и (еще лучше) вы можете указать select() возвращать всякий раз, когда у любого из нескольких сокетов есть байты, готовые для чтения в Это. Это означает, что если ваш основной поток хочет выйти, он может использовать pipe() или socketpair() для отправки фиктивного байта в каждый поток TCP и поток TCP (который предположительно заблокирован внутри select(), ожидая либо данные от его клиента или от сокета pipe / socketpair) немедленно вернутся из select(), увидят, что основной поток отправил ему байт, и ответят, выйдя немедленно.

Этого должно быть достаточно - по моему опыту, лучше не устанавливать фиксированные тайм-ауты, если вы можете этого избежать, поскольку трудно предсказать, какой будет производительность сети во всех случаях, и какое правило вы можете использовать Придумайте (например, «клиент, который не отправляет сообщение целиком за 5 секунд, должен быть сломан»), скорее всего, ошибочны, и у вас возникнут ложноположительные проблемы, если вы попытаетесь применить это правило. Лучше просто позволить каждому клиенту занимать столько времени, сколько ему нужно / нужно, и в то же время иметь механизм, с помощью которого основной поток может запросить немедленный выход из определенного клиентского потока, если / когда это становится необходимым (например, во время завершения процесса сервера или если активных потоков TCP слишком много и вы хотите удалить старые / неактивные потоки, прежде чем создавать больше)

...