Две пары TCP сервер / клиент, использующие один и тот же код, но только один клиент получает сообщения - PullRequest
0 голосов
/ 08 ноября 2019

У меня есть две пары простого TCP-сервера / клиента, т. Е. По одному клиенту на сервер, работающий в Windows:

  • Серверы работают в процессе (приложении).
  • Клиентызапустить в другом процессе.
  • Серверы продолжают посылать тактовые импульсы (строку) своему парному клиенту.
  • Первая пара сервер / клиент запускает свои основные циклы в своих собственных потоках.
  • Как только первый сервер / клиент пожали друг другу руку с первым тактом, вторая пара сервер / клиент запускает своиmainloops в своих собственных потоках.
  • Для этого теста они работают на одной машине с разными портами: 2345 и 2346.

Теперь моя проблема

  • Первый клиент получает пульс своего сервера.
  • Второй клиент делает НЕ , хотя второй сервер отправляет пульсы без ошибок.

Вот код сервера:

// hello_dualchannel_server.cpp : This file contains the 'main' function. 

#include "pch.h"

#include <iostream>
#include <thread>

#include <winsock2.h>
#include <ws2tcpip.h>

#include <spdlog/spdlog.h>


#define SPDLOG_WCHAR_TO_UTF8_SUPPORT
#ifdef _DEBUG
#if !defined(SPDLOG_ACTIVE_LEVEL)
#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
#endif  // #if !defined(SPDLOG_ACTIVE_LEVEL)
#define SPDLOG_DEBUG_ON
#define SPDLOG_TRACE_ON
#define _trace SPDLOG_TRACE 
#endif  // #ifdef _DEBUG
using namespace spdlog;

SOCKET g_sockFirst = 0;
SOCKET g_sockClientFirst = 0;
std::thread g_threadFirst;
uint32_t g_timeLatestHeartBeatFirst = 0;

SOCKET g_sockSecond = 0;
SOCKET g_sockClientSecond = 0;
std::thread g_threadSecond;
uint32_t g_timeLatestHeartBeatSecond = 0;

void SetupLogger() {
#ifdef _DEBUG
    spdlog::set_level(spdlog::level::trace);
    spdlog::set_pattern("[%H:%M:%S%z][%^%L%$][%t:%s:%#] %v");
#else
    spdlog::set_level(spdlog::level::info);
    spdlog::set_pattern("[%H:%M:%S][%^%L%$][%t] %v");
#endif  // #ifdef _DEBUG
}

int InitWinSock() {
    WORD wVersionRequested;
    WSADATA wsaData;

    /* Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h */
    wVersionRequested = MAKEWORD(2, 2);

    int err = WSAStartup(wVersionRequested, &wsaData);
    if (err != 0) {
        /* Tell the user that we could not find a usable */
        /* Winsock DLL.                                  */
        printf("WSAStartup failed with error: %d\n", err);
        return 1;
    }

    /* Confirm that the WinSock DLL supports 2.2.*/
    /* Note that if the DLL supports versions greater    */
    /* than 2.2 in addition to 2.2, it will still return */
    /* 2.2 in wVersion since that is the version we      */
    /* requested.                                        */

    if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) {
        /* Tell the user that we could not find a usable */
        /* WinSock DLL.                                  */
        printf("Could not find a usable version of Winsock.dll\n");
        WSACleanup();
        return 1;
    }
    else
        printf("The Winsock 2.2 dll was found okay\n");
    return 0;
}

bool Init(int host_port, SOCKET* p_sockServer, SOCKET*p_sockClient) {
    int err = 0;
    int* p_int = 0;
    std::string host_name("127.0.0.1");
    struct sockaddr_in my_addr;
    int addr_size = 0;
    sockaddr_in sadr_client;

    if (!*p_sockServer) {
        *p_sockServer = socket(AF_INET, SOCK_STREAM, 0);
        if (*p_sockServer == -1) {
            char log[MAX_PATH];
            strerror_s(log, MAX_PATH, errno);
            error("Server Error initializing socket: {}", log);
            goto FINISH;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;
        if ((setsockopt(*p_sockServer, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)
            || (setsockopt(*p_sockServer, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)) {
            char log[MAX_PATH];
            strerror_s(log, MAX_PATH, errno);
            error("Server Error setting options: {}", log);
            free(p_int);
            goto FINISH;
        }
        free(p_int);
        info("Server socket is set up.");

        my_addr.sin_family = AF_INET;
        my_addr.sin_port = htons(host_port);
        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = INADDR_ANY;

        if (bind(*p_sockServer, (sockaddr*)&my_addr, sizeof(my_addr)) == -1) {
            char log[MAX_PATH];
            strerror_s(log, MAX_PATH, errno);
            error("Server Error binding to socket, make sure nothing else is listening on this port: {}", log);
            goto FINISH;
        }
        if (listen(*p_sockServer, 10) == -1) {
            char log[MAX_PATH];
            strerror_s(log, MAX_PATH, errno);
            error("Server Error listening: {}", log);
            goto FINISH;
        }
        info("SUCCESS: Server socket listening ...");
    }

    info("Server accepting connection ...");
    addr_size = sizeof(sockaddr_in);
    char sAddress[MAX_PATH];
    *p_sockClient = accept(*p_sockServer, (sockaddr*)&sadr_client, &addr_size);
    if (*p_sockClient == INVALID_SOCKET) {
        char log[MAX_PATH];
        strerror_s(log, MAX_PATH, errno);
        error("Server error accepting client connection: {}", log);
        // DO NOT close sockets here.
        return false;
    }
    inet_ntop(sadr_client.sin_family, &sadr_client.sin_addr, sAddress, MAX_PATH);
    g_timeLatestHeartBeatFirst = GetCurrentTime();
    info("SUCCESS: Server accepted client connection.");
    return true;
FINISH:
    closesocket(*p_sockServer);
    return false;
}

bool IsConnected(uint32_t timeLatestHeartBeat) {
    // CAUTION: denser than client for sure catch
    const unsigned long ConnTimeoutMs = 300;
    auto cur = GetCurrentTime();
    auto latest = timeLatestHeartBeat;
    return cur - latest < ConnTimeoutMs;
}

bool StayInTouch(const char* name, SOCKET* pSockClient, uint32_t* pTimeLatestHeartBeat) {
    if (IsConnected(*pTimeLatestHeartBeat))
        return true;
    char heartBeat[] = "biku";
    int nBytesSent = 0;
    int flags = 0;
    int res = send(*pSockClient, heartBeat, sizeof(heartBeat), flags);
    if (res == SOCKET_ERROR) {
        char log[MAX_PATH];
        strerror_s(log, MAX_PATH, errno);
        error("{}: Server failed to send heartbeat: {}, Windows error: {}", name, log, GetLastError());
        return false;
    }
    else if (res == 0) {
        char log[MAX_PATH];
        strerror_s(log, MAX_PATH, errno);
        error("{}: Server sent zerobyte heartbeat: {}", name, log);
        return false;
    }
    debug("{}: Heartbeat sent: {}", name, heartBeat);
    *pTimeLatestHeartBeat = GetCurrentTime();
    return true;
}

void Close(SOCKET* pSock) {
    closesocket(*pSock);
    *pSock = 0;
}

bool Connect() {
    if (g_threadFirst.joinable()) {
        warn("FirstTunnel already running. Skipped.");
        return true;
    }
    g_threadFirst = std::thread([&]() {
        bool isConnected = false;
        while (true) {
            while (!isConnected) {
                isConnected = Init(2345, &g_sockFirst, &g_sockClientFirst);
            }
            isConnected = StayInTouch("FirstTunnel", &g_sockClientFirst, &g_timeLatestHeartBeatFirst);
            if (!isConnected) {
                // We don't close as client.
                // We keep connecting               
                error("About to reconnect ...");
                Sleep(1000);
                continue;
            }
            if (!g_threadSecond.joinable()) {
                g_threadSecond = std::thread([&]() {
                    while (true) {
                        while (!isConnected) {
                            isConnected = Init(2346, &g_sockSecond, &g_sockClientSecond);
                        }
                        isConnected = StayInTouch("SecondTunnel", &g_sockClientSecond, &g_timeLatestHeartBeatSecond);
                        if (!isConnected) {
                            // We don't close as client.
                            // We keep connecting               
                            error("About to reconnect ...");
                            Sleep(1000);
                            continue;
                        }
                    }
                    info("SecondTunnel quitting...");
                    Close(&g_sockSecond);
                });
            }
        }
        info("FirstTunnel quitting...");
        Close(&g_sockFirst);
    });

    while (true) {
        //info("main thread ...");
        Sleep(3000);
    }

    return g_threadFirst.joinable() ? true : false;
}


int main() {
    SetupLogger();
    info("Hello World!\n");

    if (InitWinSock()) {
        critical("Failed to initialize Window socket. Aborted.");
    }

    Connect();

    if (g_threadSecond.joinable()) {
        g_threadSecond.join();
    }
    if (g_threadFirst.joinable()) {
        g_threadFirst.join();
    }
    WSACleanup();
    info("Bye!");
}

Вот код клиента

// hello_dualchannel_client.cpp : This file contains the 'main' function. 
//

#include "pch.h"

#include <iostream>
#include <thread>

#include <winsock2.h>
#include <ws2tcpip.h>

#include <spdlog/spdlog.h>

#define SPDLOG_WCHAR_TO_UTF8_SUPPORT
#ifdef _DEBUG
#if !defined(SPDLOG_ACTIVE_LEVEL)
#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
#endif  // #if !defined(SPDLOG_ACTIVE_LEVEL)
#define SPDLOG_DEBUG_ON
#define SPDLOG_TRACE_ON
#define _trace SPDLOG_TRACE 
#endif  // #ifdef _DEBUG
using namespace spdlog;

SOCKET g_sockFirst = 0;
std::thread g_threadFirst;
uint32_t g_timeLatestHeartBeatFirst;

SOCKET g_sockSecond = 0;
std::thread g_threadSecond;
uint32_t g_timeLatestHeartBeatSecond;

void SetupLogger() {
#ifdef _DEBUG
    spdlog::set_level(spdlog::level::trace);
    spdlog::set_pattern("[%H:%M:%S%z][%^%L%$][%t:%s:%#] %v");
#else
    spdlog::set_level(spdlog::level::info);
    spdlog::set_pattern("[%H:%M:%S][%^%L%$][%t] %v");
#endif  // #ifdef _DEBUG
}

int InitWinSock() {
    WORD wVersionRequested;
    WSADATA wsaData;

    /* Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h */
    wVersionRequested = MAKEWORD(2, 2);

    int err = WSAStartup(wVersionRequested, &wsaData);
    if (err != 0) {
        /* Tell the user that we could not find a usable */
        /* Winsock DLL.                                  */
        printf("WSAStartup failed with error: %d\n", err);
        return 1;
    }

    /* Confirm that the WinSock DLL supports 2.2.*/
    /* Note that if the DLL supports versions greater    */
    /* than 2.2 in addition to 2.2, it will still return */
    /* 2.2 in wVersion since that is the version we      */
    /* requested.                                        */

    if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) {
        /* Tell the user that we could not find a usable */
        /* WinSock DLL.                                  */
        printf("Could not find a usable version of Winsock.dll\n");
        WSACleanup();
        return 1;
    }
    else
        printf("The Winsock 2.2 dll was found okay\n");
    return 0;
}

bool Init(int host_port, SOCKET* p_sock) {
    int err = 0;
    int* p_int = 0;
    std::string host_name("127.0.0.1");
    struct sockaddr_in my_addr;
    char handshake[] = "hello";
    //int nBytesSent;
    if (!*p_sock) {
        *p_sock = socket(AF_INET, SOCK_STREAM, 0);
        if (*p_sock == -1) {
            char log[MAX_PATH];
            strerror_s(log, MAX_PATH, errno);
            error("Client Error initializing socket {}", log);
            goto FINISH;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;
        if ((setsockopt(*p_sock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)
            || (setsockopt(*p_sock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)) {
            char log[MAX_PATH];
            strerror_s(log, MAX_PATH, errno);
            error("Client Error setting options {}", log);
            free(p_int);
            goto FINISH;
        }
        free(p_int);
        info("SUCCESS: Client socket is set up.");
    }

    my_addr.sin_family = AF_INET;
    my_addr.sin_port = htons(host_port);
    memset(&(my_addr.sin_zero), 0, 8);
    inet_pton(my_addr.sin_family, host_name.c_str(), &my_addr.sin_addr);
    if (connect(*p_sock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == SOCKET_ERROR) {
        char log[MAX_PATH];
        strerror_s(log, MAX_PATH, errno);
        error("Client Error connecting socket {}", log);
        Sleep(1000);
        goto FINISH;
    }
    /*nBytesSent = send(g_sockFirst, handshake, sizeof(handshake), 0);
    if (nBytesSent <= 0) {
        char log[MAX_PATH];
        strerror_s(log, MAX_PATH, errno);
        error("Client error sending handshake: {}", log);
        goto FINISH;
    }*/
    g_timeLatestHeartBeatFirst = GetCurrentTime();
    info("SUCCESS: Client connected to server.");
    return true;
FINISH:
    closesocket(*p_sock);
    *p_sock = 0;
    return false;
}

bool IsConnected(uint32_t timeLatestHeartBeat) {
    const unsigned long ConnTimeoutMs = 3000;
    auto cur = GetCurrentTime();
    auto latest = timeLatestHeartBeat;
    //if (cur - latest > ConnTimeoutMs)
    //{
    //  debug("cur: {}, late: {}", cur, latest);
    //} 
    return cur - latest < ConnTimeoutMs;
}

bool StayInTouch(const char* name, SOCKET* pSock, uint32_t* pTimeLatestHeartBeat) {
    // Client checks inbox right away and measure timeout later.
    char heartBeat[MAX_PATH] = { 0 };
    // CAUTION: min 100ms required for receiving heartbeat.
    //const uint32_t TimeoutMS = 100;
    int flags = 0;
    int nBytesRecved = recv(*pSock, heartBeat, sizeof(heartBeat), flags);
    bool gotHeartbeat = nBytesRecved > 0;
    if (gotHeartbeat) {
        debug("{}: Heartbeat received: {}", name, heartBeat);
        *pTimeLatestHeartBeat = GetCurrentTime();
    }
    return IsConnected(*pTimeLatestHeartBeat);
}

void Close(SOCKET* pSock) {
    closesocket(*pSock);
    *pSock = 0;
}

bool Connect() {
    if (g_threadFirst.joinable()) {
        warn("FirstTunnel already running. Skipped.");
        return true;
    }
    g_threadFirst = std::thread([&]() {
        bool isConnected = false;
        while (true) {
            while (!isConnected) {
                isConnected = Init(2345, &g_sockFirst);
            }
            isConnected = StayInTouch("FirstTunnel", &g_sockFirst, &g_timeLatestHeartBeatFirst);
            if (!isConnected) {
                // We don't close as client.
                // We keep connecting
                Close(&g_sockFirst);
                error("About to reconnect ...");
                continue;
            }
            if (!g_threadSecond.joinable()) {
                g_threadSecond = std::thread([&]() {
                    while (true) {
                        while (!isConnected) {
                            isConnected = Init(2346, &g_sockSecond);
                        }
                        isConnected = StayInTouch("SecondTunnel", &g_sockSecond, &g_timeLatestHeartBeatSecond);
                        if (!isConnected) {
                            // We don't close as client.
                            // We keep connecting               
                            error("About to reconnect ...");
                            Sleep(1000);
                            continue;
                        }
                    }
                    info("SecondTunnel quitting...");
                    Close(&g_sockSecond);
                });
            }
        }
        info("FirstTunnel quitting.");
        Close(&g_sockFirst);
    });

    while (true) {
        //info("main thread ...");
        Sleep(3000);
    }

    return g_threadFirst.joinable() ? true : false;
}

int main() {
    SetupLogger();
    info("Hello World!\n");

    if (InitWinSock()) {
        critical("Failed to initialize Window socket. Aborted.");
    }

    Connect();

    if (g_threadSecond.joinable()) {
        g_threadSecond.join();
    }
    if (g_threadFirst.joinable()) {
        g_threadFirst.join();
    }
    WSACleanup();
    info("Bye!");
}

Основная логика подключения находится в функции Connect(). Буду признателен за подсказки, где я ошибся.

Чтобы запустить код как есть, вам нужна одна зависимость spdlog

vcpkg install spdlog:x64-Windows

Вы также можете заменить все журналы на основе spdlogкод с вашим собственным.

ОБНОВЛЕНИЕ

Наблюдение # 1

Я вошел в код и подтвердил

  • Все циклы работают. Таким образом, все потоки создаются.
  • Из-за защиты joinable() не создаются избыточные потоки.
  • Единственная точка отказа - это вызов recv второго клиента.

Итак, заключение

  • Нет брандмауэров
  • Нет избыточных потоков
  • Оба потока работают на клиентской и серверной сторонах.

Наблюдение # 2

Во время работы серверных и клиентских программ и разрешения проблемы я попытался

  • Оставить обе программы открытыми.
  • Запустите netcat (netcat порт NMap) следующим образом C:\Apps\Nmap\ncat.exe 127.0.0.1 2346

Это действительно поможет исправить ситуацию. Он говорит мне, что есть проблема с соединением, хотя я определенно могу войти в клиентский код и увидеть, что соединение все еще там.

Observation # 3

  • После того, как я оставлю точки остановатолько во втором коде подключения клиентской программы и при запуске программы я не смог выйти в первый код подключения. Обратите внимание, что они находятся в отдельных потоках.
  • Оставляя точки останова в обоих соединениях и быстро переступая между ними, я мог бы продолжать движение, не попадая в ловушку только в одном потоке. Это когда я замечаю, что клиент начинает получать сообщения, которые он должен.

Так что я подозреваю, что там есть проблема с многопоточностью. Если это проблема, как это исправить?

1 Ответ

0 голосов
/ 11 ноября 2019

Я сам нашел проблему. Это была глупая ошибка с моей стороны:

Два потока разделяют состояние по ошибке: isConnected. Таким образом, оригинальные программы продолжают пинать друг друга после того, как одна из них получает новое соединение или сердцебиение. Они должны были использовать разные состояния для этого.

Для более подробной информации: isConnected, который инициализируется g_threadFirst, проваливается при закрытии анонимной функции g_threadSecond.

...