Recv из работающих локально, но ничего не получает на случайных портах между выполнениями - PullRequest
2 голосов
/ 12 марта 2019

Я работаю с многопоточным прослушивателем UDP, и я застрял в проблеме, которая определенно превосходит меня.

Итак, мне необходимо получать огромное количество пакетов UDP в нескольких портах.Локально лучшим решением для меня было вызывать неблокирующее recv из такого количества потоков, сколько портов я слушаю (выбор и опрос были слишком медленными для моих требований).Я использую диспетчер пула потоков, он просто вызывает задачи потоков и очередей.Вот код:

void receiveFromSocket(void * arguments){

    sockaddr_in client;  // Local
    socklen_t clientSize = sizeof(client);
    memset(&client, 0, sizeof(client));
    struct arg_struct_listenPort *args2 = (struct arg_struct_listenPort *)arguments;
        int fd = args2->arg_fd;
        int port = args2->arg_port;

    for(;;) {
        char buf[158];
        memset(buf,0,158*sizeof(char));
        int n = recvfrom(fd, (char * ) buf, 158, MSG_DONTWAIT, ( struct sockaddr *) &client, &clientSize);

            if(n == -1){
               //cerr << "Error while receiving from client: " << errno << endl;
               continue;
            }

            if(n != 158){
               cerr << "Discarded message since it's not 158 bytes." << endl;
               continue;
            }
            struct arg_struct args;
                args.arg_port = port;
                memcpy(args.buf,buf,158);

            thpool_add_work(globals.thpool, socketThread, (void*)(&args));

    }

}


/// Runs the Socket listener
int network_accept_any()
{
        vector<int>::iterator i;
        for(i = globals.fds.begin(); i != globals.fds.end(); i++){
            int port = distance(globals.fds.begin(),i);
            struct arg_struct_listenPort args;
                args.arg_fd = *i;
                args.arg_port = globals.cmnSystemCatalogs[port].diag_port;
            thpool_add_work(globals.thpool, receiveFromSocket, (void*)(&args));
        }
        cout << "Listening threads created..." << endl;

    return 0;
}

Это прекрасно работает локально.Но когда я компилирую его в производственной среде, некоторые порты слушают пакеты, а другие просто нет!И рабочие порты меняются при каждом исполнении.Я могу подтвердить, что это не проблема брандмауэра.Я также могу ясно видеть пакеты через Wireshark.Я могу получать пакеты на эти порты через Netcat.Netstat показывает все открытые порты.

Моя локальная среда - это виртуальная машина Ubuntu 18.04, а производственная среда - Debian 9.8.

Вот как я называю сокеты:

int lSocket(int port) {

    //Crear Socket
        int listening = socket(AF_INET, SOCK_DGRAM, 0);
        if (listening == -1) {
            cerr << "No se puede crear el socket";
            exit(EXIT_FAILURE);
        }

        //Enlazar socket a un IP / puerto

        struct sockaddr_in hint;
        memset(&hint, 0, sizeof(hint));
        hint.sin_family = AF_INET; //IPv4
        hint.sin_port = htons(port); //Port
        hint.sin_addr.s_addr = htonl(INADDR_ANY);

        if(bind(listening, (struct sockaddr*)&hint, sizeof(hint)) == -1) { //Enlaza las opciones definidas al socket
            cerr << "No se puede enlazar IP/puerto" << endl;
            exit(EXIT_FAILURE);
        }


        return listening;

}

Любой совет приветствуется!

РЕДАКТИРОВАТЬ:

Как и предполагалось, я попытался переключиться на блокировку ввода / вывода, но основная проблема остается.Все еще не получает все открытые порты.

1 Ответ

1 голос
/ 12 марта 2019

Какой удивительный прием!

@ molbdnilo был абсолютно прав:

Вы используете указатели на объекты, срок жизни которых истек (& args). Это имеет неопределенное поведение - может показаться, что работает, но это ошибка это нужно исправить.

Вот фиксированный код. Нужно быть осторожным, передавая аргументы потокам!

void receiveFromSocket(void * arguments){

    sockaddr_in client;  // Local
    socklen_t clientSize = sizeof(client);
    memset(&client, 0, sizeof(client));
    struct arg_struct_listenPort *args2 = (struct arg_struct_listenPort *)arguments;
        int fd = args2->arg_fd;
        int port = args2->arg_port;

    for(;;) {
        char buf[158];
        memset(buf,0,158*sizeof(char));
        int n = recvfrom(fd, (char * ) buf, 158, MSG_WAITALL, ( struct sockaddr *) &client, &clientSize);

            if(n == -1){
               cerr << "Error while receiving from client: " << errno << endl;
               continue;
            }

            if(n != 158){
               cerr << "Discarded message since it's not 158 bytes." << endl;
               continue;
            }
            arg_struct *args = new arg_struct;
                args->arg_port = port;
                memcpy(args->buf,buf,158);

            thpool_add_work(globals.thpool, socketThread, (void*)(args));

    }

}


/// Runs the Socket listener
int network_accept_any()
{
        vector<int>::iterator i;
        for(i = globals.fds.begin(); i != globals.fds.end(); i++){
            int port = distance(globals.fds.begin(),i);
          arg_struct_listenPort *args = new arg_struct_listenPort;
                args->arg_fd = *i;
                args->arg_port = globals.cmnSystemCatalogs[port].diag_port;
            thpool_add_work(globals.thpool, receiveFromSocket, (void*)(args));
        }
        cout << "Listening threads created..." << endl;


    return 0;
}

Также я буду следить за комментариями @John Bollinger и @Superlokkus.

Спасибо всем!

...