У меня есть две пары простого TCP-сервера / клиента, т. Е. По одному клиенту на сервер, работающий в Windows:
- Серверы работают в процессе (приложении).
- Клиентызапустить в другом процессе.
- Серверы продолжают посылать тактовые импульсы (строку) своему парному клиенту.
- Первая пара сервер / клиент запускает свои основные циклы в своих собственных потоках.
- Как только первый сервер / клиент пожали друг другу руку с первым тактом, вторая пара сервер / клиент запускает своиmainloops в своих собственных потоках.
- Для этого теста они работают на одной машине с разными портами: 2345 и 2346.
Теперь моя проблема
- Первый клиент получает пульс своего сервера.
- Второй клиент делает НЕ , хотя второй сервер отправляет пульсы без ошибок.
Вот код сервера:
// hello_dualchannel_server.cpp : This file contains the 'main' function.
#include "pch.h"
#include <iostream>
#include <thread>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <spdlog/spdlog.h>
#define SPDLOG_WCHAR_TO_UTF8_SUPPORT
#ifdef _DEBUG
#if !defined(SPDLOG_ACTIVE_LEVEL)
#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
#endif // #if !defined(SPDLOG_ACTIVE_LEVEL)
#define SPDLOG_DEBUG_ON
#define SPDLOG_TRACE_ON
#define _trace SPDLOG_TRACE
#endif // #ifdef _DEBUG
using namespace spdlog;
SOCKET g_sockFirst = 0;
SOCKET g_sockClientFirst = 0;
std::thread g_threadFirst;
uint32_t g_timeLatestHeartBeatFirst = 0;
SOCKET g_sockSecond = 0;
SOCKET g_sockClientSecond = 0;
std::thread g_threadSecond;
uint32_t g_timeLatestHeartBeatSecond = 0;
void SetupLogger() {
#ifdef _DEBUG
spdlog::set_level(spdlog::level::trace);
spdlog::set_pattern("[%H:%M:%S%z][%^%L%$][%t:%s:%#] %v");
#else
spdlog::set_level(spdlog::level::info);
spdlog::set_pattern("[%H:%M:%S][%^%L%$][%t] %v");
#endif // #ifdef _DEBUG
}
int InitWinSock() {
WORD wVersionRequested;
WSADATA wsaData;
/* Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h */
wVersionRequested = MAKEWORD(2, 2);
int err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0) {
/* Tell the user that we could not find a usable */
/* Winsock DLL. */
printf("WSAStartup failed with error: %d\n", err);
return 1;
}
/* Confirm that the WinSock DLL supports 2.2.*/
/* Note that if the DLL supports versions greater */
/* than 2.2 in addition to 2.2, it will still return */
/* 2.2 in wVersion since that is the version we */
/* requested. */
if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) {
/* Tell the user that we could not find a usable */
/* WinSock DLL. */
printf("Could not find a usable version of Winsock.dll\n");
WSACleanup();
return 1;
}
else
printf("The Winsock 2.2 dll was found okay\n");
return 0;
}
bool Init(int host_port, SOCKET* p_sockServer, SOCKET*p_sockClient) {
int err = 0;
int* p_int = 0;
std::string host_name("127.0.0.1");
struct sockaddr_in my_addr;
int addr_size = 0;
sockaddr_in sadr_client;
if (!*p_sockServer) {
*p_sockServer = socket(AF_INET, SOCK_STREAM, 0);
if (*p_sockServer == -1) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("Server Error initializing socket: {}", log);
goto FINISH;
}
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if ((setsockopt(*p_sockServer, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)
|| (setsockopt(*p_sockServer, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("Server Error setting options: {}", log);
free(p_int);
goto FINISH;
}
free(p_int);
info("Server socket is set up.");
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(*p_sockServer, (sockaddr*)&my_addr, sizeof(my_addr)) == -1) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("Server Error binding to socket, make sure nothing else is listening on this port: {}", log);
goto FINISH;
}
if (listen(*p_sockServer, 10) == -1) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("Server Error listening: {}", log);
goto FINISH;
}
info("SUCCESS: Server socket listening ...");
}
info("Server accepting connection ...");
addr_size = sizeof(sockaddr_in);
char sAddress[MAX_PATH];
*p_sockClient = accept(*p_sockServer, (sockaddr*)&sadr_client, &addr_size);
if (*p_sockClient == INVALID_SOCKET) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("Server error accepting client connection: {}", log);
// DO NOT close sockets here.
return false;
}
inet_ntop(sadr_client.sin_family, &sadr_client.sin_addr, sAddress, MAX_PATH);
g_timeLatestHeartBeatFirst = GetCurrentTime();
info("SUCCESS: Server accepted client connection.");
return true;
FINISH:
closesocket(*p_sockServer);
return false;
}
bool IsConnected(uint32_t timeLatestHeartBeat) {
// CAUTION: denser than client for sure catch
const unsigned long ConnTimeoutMs = 300;
auto cur = GetCurrentTime();
auto latest = timeLatestHeartBeat;
return cur - latest < ConnTimeoutMs;
}
bool StayInTouch(const char* name, SOCKET* pSockClient, uint32_t* pTimeLatestHeartBeat) {
if (IsConnected(*pTimeLatestHeartBeat))
return true;
char heartBeat[] = "biku";
int nBytesSent = 0;
int flags = 0;
int res = send(*pSockClient, heartBeat, sizeof(heartBeat), flags);
if (res == SOCKET_ERROR) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("{}: Server failed to send heartbeat: {}, Windows error: {}", name, log, GetLastError());
return false;
}
else if (res == 0) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("{}: Server sent zerobyte heartbeat: {}", name, log);
return false;
}
debug("{}: Heartbeat sent: {}", name, heartBeat);
*pTimeLatestHeartBeat = GetCurrentTime();
return true;
}
void Close(SOCKET* pSock) {
closesocket(*pSock);
*pSock = 0;
}
bool Connect() {
if (g_threadFirst.joinable()) {
warn("FirstTunnel already running. Skipped.");
return true;
}
g_threadFirst = std::thread([&]() {
bool isConnected = false;
while (true) {
while (!isConnected) {
isConnected = Init(2345, &g_sockFirst, &g_sockClientFirst);
}
isConnected = StayInTouch("FirstTunnel", &g_sockClientFirst, &g_timeLatestHeartBeatFirst);
if (!isConnected) {
// We don't close as client.
// We keep connecting
error("About to reconnect ...");
Sleep(1000);
continue;
}
if (!g_threadSecond.joinable()) {
g_threadSecond = std::thread([&]() {
while (true) {
while (!isConnected) {
isConnected = Init(2346, &g_sockSecond, &g_sockClientSecond);
}
isConnected = StayInTouch("SecondTunnel", &g_sockClientSecond, &g_timeLatestHeartBeatSecond);
if (!isConnected) {
// We don't close as client.
// We keep connecting
error("About to reconnect ...");
Sleep(1000);
continue;
}
}
info("SecondTunnel quitting...");
Close(&g_sockSecond);
});
}
}
info("FirstTunnel quitting...");
Close(&g_sockFirst);
});
while (true) {
//info("main thread ...");
Sleep(3000);
}
return g_threadFirst.joinable() ? true : false;
}
int main() {
SetupLogger();
info("Hello World!\n");
if (InitWinSock()) {
critical("Failed to initialize Window socket. Aborted.");
}
Connect();
if (g_threadSecond.joinable()) {
g_threadSecond.join();
}
if (g_threadFirst.joinable()) {
g_threadFirst.join();
}
WSACleanup();
info("Bye!");
}
Вот код клиента
// hello_dualchannel_client.cpp : This file contains the 'main' function.
//
#include "pch.h"
#include <iostream>
#include <thread>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <spdlog/spdlog.h>
#define SPDLOG_WCHAR_TO_UTF8_SUPPORT
#ifdef _DEBUG
#if !defined(SPDLOG_ACTIVE_LEVEL)
#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
#endif // #if !defined(SPDLOG_ACTIVE_LEVEL)
#define SPDLOG_DEBUG_ON
#define SPDLOG_TRACE_ON
#define _trace SPDLOG_TRACE
#endif // #ifdef _DEBUG
using namespace spdlog;
SOCKET g_sockFirst = 0;
std::thread g_threadFirst;
uint32_t g_timeLatestHeartBeatFirst;
SOCKET g_sockSecond = 0;
std::thread g_threadSecond;
uint32_t g_timeLatestHeartBeatSecond;
void SetupLogger() {
#ifdef _DEBUG
spdlog::set_level(spdlog::level::trace);
spdlog::set_pattern("[%H:%M:%S%z][%^%L%$][%t:%s:%#] %v");
#else
spdlog::set_level(spdlog::level::info);
spdlog::set_pattern("[%H:%M:%S][%^%L%$][%t] %v");
#endif // #ifdef _DEBUG
}
int InitWinSock() {
WORD wVersionRequested;
WSADATA wsaData;
/* Use the MAKEWORD(lowbyte, highbyte) macro declared in Windef.h */
wVersionRequested = MAKEWORD(2, 2);
int err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0) {
/* Tell the user that we could not find a usable */
/* Winsock DLL. */
printf("WSAStartup failed with error: %d\n", err);
return 1;
}
/* Confirm that the WinSock DLL supports 2.2.*/
/* Note that if the DLL supports versions greater */
/* than 2.2 in addition to 2.2, it will still return */
/* 2.2 in wVersion since that is the version we */
/* requested. */
if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) {
/* Tell the user that we could not find a usable */
/* WinSock DLL. */
printf("Could not find a usable version of Winsock.dll\n");
WSACleanup();
return 1;
}
else
printf("The Winsock 2.2 dll was found okay\n");
return 0;
}
bool Init(int host_port, SOCKET* p_sock) {
int err = 0;
int* p_int = 0;
std::string host_name("127.0.0.1");
struct sockaddr_in my_addr;
char handshake[] = "hello";
//int nBytesSent;
if (!*p_sock) {
*p_sock = socket(AF_INET, SOCK_STREAM, 0);
if (*p_sock == -1) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("Client Error initializing socket {}", log);
goto FINISH;
}
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if ((setsockopt(*p_sock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)
|| (setsockopt(*p_sock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("Client Error setting options {}", log);
free(p_int);
goto FINISH;
}
free(p_int);
info("SUCCESS: Client socket is set up.");
}
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
inet_pton(my_addr.sin_family, host_name.c_str(), &my_addr.sin_addr);
if (connect(*p_sock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == SOCKET_ERROR) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("Client Error connecting socket {}", log);
Sleep(1000);
goto FINISH;
}
/*nBytesSent = send(g_sockFirst, handshake, sizeof(handshake), 0);
if (nBytesSent <= 0) {
char log[MAX_PATH];
strerror_s(log, MAX_PATH, errno);
error("Client error sending handshake: {}", log);
goto FINISH;
}*/
g_timeLatestHeartBeatFirst = GetCurrentTime();
info("SUCCESS: Client connected to server.");
return true;
FINISH:
closesocket(*p_sock);
*p_sock = 0;
return false;
}
bool IsConnected(uint32_t timeLatestHeartBeat) {
const unsigned long ConnTimeoutMs = 3000;
auto cur = GetCurrentTime();
auto latest = timeLatestHeartBeat;
//if (cur - latest > ConnTimeoutMs)
//{
// debug("cur: {}, late: {}", cur, latest);
//}
return cur - latest < ConnTimeoutMs;
}
bool StayInTouch(const char* name, SOCKET* pSock, uint32_t* pTimeLatestHeartBeat) {
// Client checks inbox right away and measure timeout later.
char heartBeat[MAX_PATH] = { 0 };
// CAUTION: min 100ms required for receiving heartbeat.
//const uint32_t TimeoutMS = 100;
int flags = 0;
int nBytesRecved = recv(*pSock, heartBeat, sizeof(heartBeat), flags);
bool gotHeartbeat = nBytesRecved > 0;
if (gotHeartbeat) {
debug("{}: Heartbeat received: {}", name, heartBeat);
*pTimeLatestHeartBeat = GetCurrentTime();
}
return IsConnected(*pTimeLatestHeartBeat);
}
void Close(SOCKET* pSock) {
closesocket(*pSock);
*pSock = 0;
}
bool Connect() {
if (g_threadFirst.joinable()) {
warn("FirstTunnel already running. Skipped.");
return true;
}
g_threadFirst = std::thread([&]() {
bool isConnected = false;
while (true) {
while (!isConnected) {
isConnected = Init(2345, &g_sockFirst);
}
isConnected = StayInTouch("FirstTunnel", &g_sockFirst, &g_timeLatestHeartBeatFirst);
if (!isConnected) {
// We don't close as client.
// We keep connecting
Close(&g_sockFirst);
error("About to reconnect ...");
continue;
}
if (!g_threadSecond.joinable()) {
g_threadSecond = std::thread([&]() {
while (true) {
while (!isConnected) {
isConnected = Init(2346, &g_sockSecond);
}
isConnected = StayInTouch("SecondTunnel", &g_sockSecond, &g_timeLatestHeartBeatSecond);
if (!isConnected) {
// We don't close as client.
// We keep connecting
error("About to reconnect ...");
Sleep(1000);
continue;
}
}
info("SecondTunnel quitting...");
Close(&g_sockSecond);
});
}
}
info("FirstTunnel quitting.");
Close(&g_sockFirst);
});
while (true) {
//info("main thread ...");
Sleep(3000);
}
return g_threadFirst.joinable() ? true : false;
}
int main() {
SetupLogger();
info("Hello World!\n");
if (InitWinSock()) {
critical("Failed to initialize Window socket. Aborted.");
}
Connect();
if (g_threadSecond.joinable()) {
g_threadSecond.join();
}
if (g_threadFirst.joinable()) {
g_threadFirst.join();
}
WSACleanup();
info("Bye!");
}
Основная логика подключения находится в функции Connect()
. Буду признателен за подсказки, где я ошибся.
Чтобы запустить код как есть, вам нужна одна зависимость spdlog
vcpkg install spdlog:x64-Windows
Вы также можете заменить все журналы на основе spdlogкод с вашим собственным.
ОБНОВЛЕНИЕ
Наблюдение # 1
Я вошел в код и подтвердил
- Все циклы работают. Таким образом, все потоки создаются.
- Из-за защиты
joinable()
не создаются избыточные потоки. - Единственная точка отказа - это вызов
recv
второго клиента.
Итак, заключение
- Нет брандмауэров
- Нет избыточных потоков
- Оба потока работают на клиентской и серверной сторонах.
Наблюдение # 2
Во время работы серверных и клиентских программ и разрешения проблемы я попытался
- Оставить обе программы открытыми.
- Запустите netcat (
netcat
порт NMap) следующим образом C:\Apps\Nmap\ncat.exe 127.0.0.1 2346
Это действительно поможет исправить ситуацию. Он говорит мне, что есть проблема с соединением, хотя я определенно могу войти в клиентский код и увидеть, что соединение все еще там.
Observation # 3
- После того, как я оставлю точки остановатолько во втором коде подключения клиентской программы и при запуске программы я не смог выйти в первый код подключения. Обратите внимание, что они находятся в отдельных потоках.
- Оставляя точки останова в обоих соединениях и быстро переступая между ними, я мог бы продолжать движение, не попадая в ловушку только в одном потоке. Это когда я замечаю, что клиент начинает получать сообщения, которые он должен.
Так что я подозреваю, что там есть проблема с многопоточностью. Если это проблема, как это исправить?