Многопоточное использование сокетов SOCK_STREAM - PullRequest
0 голосов
/ 14 октября 2019

Я пытаюсь решить проблему использования многопоточных сокетов в моем коде Linux с многопоточностью, и думаю, что это может быть неразрешимо, учитывая мои требования. Можете ли вы дать мне знать, согласны ли вы?

Проблема заключается в следующем:

  1. У меня несколько pthread-записей, записывающих в один сокет SOCK_STREAM (TCP) (установлен для неблокирующего ввода-вывода),Каждый поток пытается написать одно «сообщение» за раз, и данные сообщений из разных потоков не должны чередоваться.
  2. Я подготовлен для send () для возврата EWOULDBLOCK / EAGAIN в меньшем количестве случаев (в этом случае я просто откладываю запись сообщения в асинхронный поток с использованием структуры очередей, защищенной мьютексом)
  3. Я не могу получить мьютексы в этих приоритетных потоках записи (кроме случаев EWOULDBLOCK / EAGAIN), как подразумеваетсясбросы кеша дают мне неприемлемое влияние на процессор (это также означает, что не нужно использовать другие объекты синхронизации для решения проблемы, такие как атомные целые числа).
  4. Насколько я знаю, я не могу гарантировать, что send () победилне решает написать некоторые, но не все данные сообщения, что приводит к (потенциально) чередующимся данным

Является ли эта проблема неразрешимой? Есть ли способ заставить send () либо буферизировать все сообщение, либо ничего (при использовании неблокирующего ввода-вывода)?

Спасибо!

1 Ответ

1 голос
/ 15 октября 2019

У меня есть несколько записей pthreads в один сокет SOCK_STREAM (TCP) (установлен для неблокирующего ввода / вывода). Каждый поток пытается написать одно «сообщение» за раз, и данные сообщений из разных потоков не должны чередоваться.

TCP - это поток байтов, он не имеет понятия «сообщения», напримерUDP делает. Вы ДОЛЖНЫ синхронизировать доступ к сокету TCP, например, с мьютексом, чтобы избежать переплетения «сообщений».

Я подготовлен для send () для возврата EWOULDBLOCK / EAGAIN в меньшем количестве случаев (в этом случае я просто откладываю запись сообщения в асинхронный поток, используя структуру очередей с защитой мьютекса)

В этом случае вам, вероятно, потребуется отправить MOST / ALL ваших «сообщений» черезта же очередь, поэтому он может гарантировать, что они будут отправлены в правильном порядке. Если вы начинаете отправлять «сообщение» напрямую, и он получает ошибку EWOULDBLOCK / EAGAIN, оставшиеся неотправленные данные для этого «сообщения» и все последующие «сообщения» ДОЛЖНЫ быть поставлены в очередь до тех пор, пока очередь не будет очищена. Всякий раз, когда сокет доступен для записи, отправляйте все, что находится в очереди, удаляя только то, что было успешно отправлено, и повторяйте по мере необходимости, пока очередь не будет очищена. Только после этого вы можете безопасно отправлять новые «сообщения» напрямую, пока не появится сообщение об ошибке EWOULDBLOCK / EAGAIN. Но, несмотря ни на что, вам все равно нужно будет синхронизировать отправку, так что одновременно отправляется только 1 «сообщение», напрямую или из очереди.

Например, вы можете сделать что-то вроде этого (псевдокод):

void sendMsg(message msg)
{
    lock(&mutex);
    if (!queue.empty()) {
        queue.add(&msg, sizeof(msg));
    }
    else
    {
        byte *ptr = &msg;
        size_t size = sizeof(msg);
        do {
            int sent = send(sock, ptr, size, 0);
            if (sent == -1) {
                if (errno == EINTR) continue;
                queue.add(ptr, size);
                break;
            }
            ptr += sent;
            size -= sent;
        }
        while (size > 0);
    }
    unlock(&mutex);
}

void socketIsWritable()
{
    lock(&mutex);
    while (!queue.empty()) {
        int sent = send(sock, queue.data(), queue.size(), 0);
        if (sent == -1) {
            if (errno == EINTR) continue;
            break;
        }
        queue.remove(sent);
    }
    unlock(&mutex);
}

Я не могу получить мьютексы в этих потоках записи переднего плана (кроме случаев EWOULDBLOCK / EAGAIN), так как подразумеваемые сбросы кэша дают мне неприемлемое влияние на процессор (это также означает, что нетиспользование других объектов синхронизации для обхода проблемы, таких как атомарные целые числа).

Синхронизация доступа к сокету - единственный способ обеспечить целостность «сообщений», которые вы хотите отправить. Это означает, что в ваших сообщениях будет небольшое узкое место. Вы не можете начать отправку нового «сообщения» до тех пор, пока предыдущее «сообщение» не будет полностью отправлено.

Насколько я знаю, я не могу гарантировать, что send () не примет решениезаписать некоторые, но не все сообщения, что приведет к (потенциально) чередующимся данным

Исправить.

Есть ли способ заставить send () выполнитьлибо буферизировать все сообщение, либо ничего (при использовании неблокирующего ввода-вывода)?

Нет. И даже если бы это было возможно, это все равно не предотвратило бы переплетение, когда несколько потоков пытаются вызвать send() одновременно. Вы должны обработать необходимую буферизацию в своем собственном коде.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...