Как я могу эффективно использовать сокеты boost asio для полнодуплексной потоковой передачи? - PullRequest
3 голосов
/ 20 июля 2011

Я пишу двунаправленный потоковый сервер, критичный к производительности, используя boost.asio.
Сервер работает следующим образом:

  • Поток A обрабатывает и отправляет объекты для отправки в очередь OUTPUT
  • Поток B ожидает объекты в очереди INPUT для их обработки
  • Поток C является принимающим потоком, который принимает входящие клиенты и создает класс CLIENT для каждого из них

Несколько клиентов работают одновременно, каждый из них имеет свой собственный подключенный сокет, и каждый из них должен делать две вещи одновременно:

  • дождаться (условная переменная), пока хотя бы один объект будет присутствовать в очереди OUTPUT (это может занять много времени), и отправить его как можно быстрее
  • получить любой входящий объект из сокета и поместить его в очередь INPUT

Более того, производительность и многоядерная масштабируемость имеют решающее значение в этом приложении.

Стандартный асинхронный подход здесь не срабатывает (обратные вызовы отправки могут блокировать другие обратные вызовы при ожидании отправки нового объекта), а подход блокировки (использование 1 потока для каждого направления) сложен, и я не могу понять, что делать в случае ошибки в одном из потоков.

Должен ли я использовать 2 сокета для каждого клиента (один для вывода и один для ввода)? Или, может быть, как-то использовать два io_services на сокет в двух разных потоках для поддержки одновременного обратного вызова?

Пожалуйста, объясните мне, как бы вы справились с такой ситуацией. Спасибо.

1 Ответ

5 голосов
/ 20 июля 2011

Стандартный асинхронный подход здесь не срабатывает (обратные вызовы отправки могут блокировать другие обратные вызовы при ожидании отправки нового объекта)

Асинхронная модель должна работать нормально, и при правильном использованииопределенно будет лучше масштабироваться - он развалится только тогда, когда вы введете блокировку.Вместо того, чтобы отключить асинхронность, вот что я бы предложил:

Удалите условную переменную.Вам нужны две очереди: OUTPUT и WAITING.

Затем при обработке клиента:

  1. Если в очереди OUTPUT есть данные, отправьте их.
  2. Если нет,поместите его в очередь WAITING.

Нет необходимости выполнять следующий ввод-вывод из обработчика предыдущего.Здесь вместо блокировки условной переменной мы просто перенаправляем ее в очередь WAITING для последующей обработки.

И в коде OUTPUT-pushing:

  1. Если вОжидание, отправьте данные напрямую.
  2. Если нет, нажмите на очередь OUTPUT.

Вот некоторый псевдокод:

queue<packet> output;
queue<client> waiting;

void try_send(client c)
{
    if(!output.empty())
    {
        // there is output waiting to be sent, send it.
        packet p = output.pop();
        c.async_send(p, on_send_finished);
    }
    else
    {
        // nothing available, go back to waiting.
        waiting.push(c);
    }
}

void on_send_finished(client c)
{
    // send finished, try again if any more output has accumulated:
    try_send(c);
}

void push_output(packet p)
{
    output.push(p);

    if(!waiting.empty())
    {
        // there is a client waiting to send, give it a try.
        client c = waiting.pop();
        try_send(c);
    }
}

Это может всесделать это масштабируемым способом, используя один поток, но несколько потоков довольно легко с asio.Если вы собираетесь использовать несколько потоков, вы захотите ввести блокировку в логику, которая проверяет очереди.

...