У меня есть несколько записей pthreads в один сокет SOCK_STREAM (TCP) (установлен для неблокирующего ввода / вывода). Каждый поток пытается написать одно «сообщение» за раз, и данные сообщений из разных потоков не должны чередоваться.
TCP - это поток байтов, он не имеет понятия «сообщения», напримерUDP делает. Вы ДОЛЖНЫ синхронизировать доступ к сокету TCP, например, с мьютексом, чтобы избежать переплетения «сообщений».
Я подготовлен для send () для возврата EWOULDBLOCK / EAGAIN в меньшем количестве случаев (в этом случае я просто откладываю запись сообщения в асинхронный поток, используя структуру очередей с защитой мьютекса)
В этом случае вам, вероятно, потребуется отправить MOST / ALL ваших «сообщений» черезта же очередь, поэтому он может гарантировать, что они будут отправлены в правильном порядке. Если вы начинаете отправлять «сообщение» напрямую, и он получает ошибку EWOULDBLOCK
/ EAGAIN
, оставшиеся неотправленные данные для этого «сообщения» и все последующие «сообщения» ДОЛЖНЫ быть поставлены в очередь до тех пор, пока очередь не будет очищена. Всякий раз, когда сокет доступен для записи, отправляйте все, что находится в очереди, удаляя только то, что было успешно отправлено, и повторяйте по мере необходимости, пока очередь не будет очищена. Только после этого вы можете безопасно отправлять новые «сообщения» напрямую, пока не появится сообщение об ошибке EWOULDBLOCK
/ EAGAIN
. Но, несмотря ни на что, вам все равно нужно будет синхронизировать отправку, так что одновременно отправляется только 1 «сообщение», напрямую или из очереди.
Например, вы можете сделать что-то вроде этого (псевдокод):
void sendMsg(message msg)
{
lock(&mutex);
if (!queue.empty()) {
queue.add(&msg, sizeof(msg));
}
else
{
byte *ptr = &msg;
size_t size = sizeof(msg);
do {
int sent = send(sock, ptr, size, 0);
if (sent == -1) {
if (errno == EINTR) continue;
queue.add(ptr, size);
break;
}
ptr += sent;
size -= sent;
}
while (size > 0);
}
unlock(&mutex);
}
void socketIsWritable()
{
lock(&mutex);
while (!queue.empty()) {
int sent = send(sock, queue.data(), queue.size(), 0);
if (sent == -1) {
if (errno == EINTR) continue;
break;
}
queue.remove(sent);
}
unlock(&mutex);
}
Я не могу получить мьютексы в этих потоках записи переднего плана (кроме случаев EWOULDBLOCK / EAGAIN), так как подразумеваемые сбросы кэша дают мне неприемлемое влияние на процессор (это также означает, что нетиспользование других объектов синхронизации для обхода проблемы, таких как атомарные целые числа).
Синхронизация доступа к сокету - единственный способ обеспечить целостность «сообщений», которые вы хотите отправить. Это означает, что в ваших сообщениях будет небольшое узкое место. Вы не можете начать отправку нового «сообщения» до тех пор, пока предыдущее «сообщение» не будет полностью отправлено.
Насколько я знаю, я не могу гарантировать, что send () не примет решениезаписать некоторые, но не все сообщения, что приведет к (потенциально) чередующимся данным
Исправить.
Есть ли способ заставить send () выполнитьлибо буферизировать все сообщение, либо ничего (при использовании неблокирующего ввода-вывода)?
Нет. И даже если бы это было возможно, это все равно не предотвратило бы переплетение, когда несколько потоков пытаются вызвать send()
одновременно. Вы должны обработать необходимую буферизацию в своем собственном коде.