Как сохранить прокачку сообщений во время ожидания? - PullRequest
3 голосов
/ 21 июня 2009

У меня есть приложение, основанное на архитектуре пула потоков сообщений. Всякий раз, когда есть действие, которое может заблокировать, оно реализуется как действие «обратный вызов при завершении / триггере evnet», поэтому оно не останавливает выполняющийся поток.

Хотя этот метод подходит для большинства случаев, бывают ситуации, когда он становится очень неудобным и чрезмерно усложняет код.

Я бы хотел иметь возможность прозрачно обрабатывать события во время ожидания, не разбивая функцию на части до / после ожидания.

Как мне это сделать?

Я имел в виду два варианта:

  1. Запустить цикл обработки сообщений из выполняющейся функции во время ожидания.
  2. Создать новый рабочий поток во время ожидания и завершить его (надлежащим образом) при возобновлении.

Оба варианта имеют свои недостатки, если назвать несколько:

Для 1:

  • Может привести к переполнению стека.
  • Может оказаться заблокированным.
  • Если внутреннее сообщение приводит к ожиданию завершения второго события, а внешнее событие тем временем завершается, внешняя функция не может продолжаться, пока не завершится второе событие, и эта ситуация может расшириться.

Вариант 2 может просто создать все больше и больше потоков.

Конечно, могут быть и другие варианты, о которых я не подумал.

РЕДАКТИРОВАТЬ: Язык - C ++, поэтому функции не могут быть выведены из и в простой (переносимый?) Способ. Платформа - Windows (API), хотя я не думаю, что это актуально.

Ответы [ 5 ]

2 голосов
/ 22 июня 2009

Для портативного C ++ это не сработает, но, поскольку вы упомянули, что ваша платформа - Windows, почему бы не использовать MsgWaitForMultipleObjects ? Его цель - позволить вам делать именно то, что говорит ваш вопрос - продолжать качать сообщения во время ожидания.

0 голосов
/ 23 июня 2009

Ваша проблема с синхронизацией потоков не так ли? Если это ваша проблема, почему бы не использовать мьютекс? Это может быть связано с интерфейсом. Фактически, вы можете использовать идиому PIMPL, чтобы сделать мьютекс переносимым.

http://msdn.microsoft.com/en-us/library/system.threading.mutex(VS.71).aspx

0 голосов
/ 21 июня 2009

Не зная больше о вашем конкретном приложении (т. Е. Сколько времени занимает обработка сообщений и т. Д.), Будет много размахивать руками:

  • Это управляемый или неуправляемый C ++?

  • Какой ThreadPool вы используете?

    • QueueUserWorkItem
    • Ваш собственный пул через CreateIoCompletionPort?
    • Или VistaThreadpoolWork?

Я думаю, что платформа несколько важна, поскольку важен характер пула потоков.

Например:

Если вы используете ( Порты завершения ) для своего пула потоков (т. Е. CreateIoCompletionPort). У вас есть некоторый контроль над тем, сколько потоков выполняется одновременно (и, следовательно, сколько конечных потоков в итоге создается). Если вы зададите максимальное число одновременных потоков, скажем, 4. Windows будет пытаться разрешить одновременную работу только 4 потоков. Если все 4 потока заняты обработкой и вы поставили в очередь 5-й элемент, окна не позволят этому элементу работать до тех пор, пока один из 4-х элементов не будет завершен (повторное использование потока). Единственное время, когда это правило нарушается, - это когда блокируются потоки (то есть ожидают ввода-вывода), тогда разрешается запускать больше потоков.

Это важная вещь для понимания портов завершения и того, почему платформа актуальна. Очень сложно реализовать что-то подобное без использования ядра. Знание разницы между занятыми потоками и заблокированными потоками требует доступа к состояниям потоков. Порты завершения очень эффективны в отношении количества переключений контекста в ядре.

Вернуться к вашему вопросу:

Казалось бы, у вас должен быть один поток, чтобы обрабатывать / отправлять сообщения, и обработка сообщений выполняется путем отправки рабочих в пул потоков. Пусть порты завершения обрабатывают балансировку нагрузки и параллелизм. Ваш цикл обработки сообщений никогда не блокируется и может продолжать обрабатывать сообщения.

Если скорость входящих сообщений намного превышает вашу способность обрабатывать их, вам, вероятно, придется обратить внимание на размер очереди и блокировку, когда она становится слишком большой.

0 голосов
/ 22 июня 2009

Кажется, ваша проблема фундаментальна и не связана с C ++. Другие языки, возможно, лучше скрывают использование стека, но если вы не вернулись из Foo (), вам нужен стек вызовов для Foo (). И если вы также выполняете Bar (), для этого также требуется стек вызовов.

Потоки являются отличным подходом к этому, так как каждый поток имеет свой собственный стек вызовов. Продолжения - это умный, но сложный способ сохранения стеков вызовов, поэтому там, где это возможно, это тоже вариант. Но если вы не хотите этого, вам придется обойтись одним стэком.

Далинг с одним стеком вызовов требует адресации входа. Здесь нет общего ответа о том, что возможно. В общем, у вас будет набор сообщений M1..Mx, которые обрабатываются функциями F1 ... Fy, с некоторым отображением, зависящим от приложения и, возможно, зависящим от состояния. С повторяющимся циклом сообщений вы можете запускать Fi, когда получаете Mj. Теперь проблема в том, что делать. Не все функции F1 ... Fn могут быть вызваны; в частности, сам Fi не может быть вызван. Однако некоторые другие функции также могут быть недоступны, например, потому что они разделяют ресурсы. Это зависит от приложения.

Если для обработки Mj требуется какая-либо из этих недоступных функций, вы должны отложить ее. Можете ли вы принять следующее сообщение в очереди? Опять же, это зависит от реализации и может даже относиться к типу сообщения и его содержимому. Если сообщения достаточно независимы, их можно выполнить не по порядку. Это быстро становится довольно сложным - чтобы определить, можно ли принять N-е сообщение в очереди, вы должны проверить, может ли оно быть выполнено не по порядку относительно предыдущих N-1 сообщений.

Язык может помочь вам, не скрывая зависимости, но в конце вы должны принять четкие решения. Там нет серебряной пули.

0 голосов
/ 21 июня 2009

РЕДАКТИРОВАТЬ: Вы упоминаете, что не хотите «разбивать функцию на части до / после ожидания».

На каком языке вы развиваете? Если он имеет продолжения (yield return в C #), то это обеспечивает способ написания кода, который представляется процедурным, но который можно легко приостановить до тех пор, пока блокирующая операция не выполнит обратный вызов завершения.

Вот статья об идее: http://msdn.microsoft.com/en-us/magazine/cc546608.aspx

UPDATE:

К сожалению, язык C ++

Это сделало бы отличный слоган футболки.

Хорошо, так что вам может быть полезно структурировать ваш последовательный код как конечный автомат, чтобы он мог работать с прерываниями / возобновлением.

например. вашей боли нужно написать две функции, одну из которых инициирует, а другую выполняет роль обработчика события завершения:

void send_greeting(const std::string &msg)
{
    std::cout << "Sending the greeting" << std::endl;
    begin_sending_string_somehow(msg, greeting_sent_okay);
}

void greeting_sent_okay()
{
    std::cout << "Greeting has been sent successfully." << std::endl;
}

Ваша идея была ждать:

void send_greeting(const std::string &msg)
{
    std::cout << "Sending the greeting" << std::endl;

    waiter w;
    begin_sending_string_somehow(msg, w);
    w.wait_for_completion();

    std::cout << "Greeting has been sent successfully." << std::endl;
}

В этом примере waiter перегружает operator (), чтобы он мог выполнять функцию обратного вызова, и wait_for_completion как-то зависает до тех пор, пока не увидит, что operator () был вызван.

Я предполагаю, что второй параметр begin_sending_string_somehow является параметром шаблона, который может быть любым вызываемым типом, не принимающим параметры.

Но, как вы говорите, у этого есть недостатки. Каждый раз, когда поток ожидает так, вы добавляете еще одну потенциальную тупиковую ситуацию, и вы также потребляете «ресурс» всего потока и его стека, а это означает, что потребуется создать больше потоков в другом месте, чтобы можно было выполнить работу , что противоречит всему смыслу пула потоков.

Вместо этого напишите класс:

class send_greeting
{
    int state_;
    std::string msg_;

public:
    send_greeting(const std::string &msg)
        : state_(0), msg_(msg) {}

    void operator()
    {
        switch (state_++)
        {
            case 0:
                std::cout << "Sending the greeting" << std::endl;
                begin_sending_string_somehow(msg, *this);
                break;

            case 1:
                std::cout << "Greeting has been sent successfully." 
                          << std::endl;
                break;
        }
    }
};

Класс реализует оператор вызова функции (). Каждый раз, когда он вызывается, он выполняет следующий шаг в логике. (Конечно, будучи таким тривиальным примером, теперь это в основном шум управления состоянием, но в более сложном примере с четырьмя или пятью состояниями это может помочь прояснить последовательную природу кода).

Проблемы:

  • Если сигнатура функции обратного вызова события имеет специальные параметры, вам нужно добавить еще одну перегрузку operator(), которая сохраняет параметры в дополнительных полях и затем вызывает перегрузку без параметров. Затем он начинает запутываться, потому что эти поля будут доступны во время компиляции в начальном состоянии, даже если они не имеют смысла во время выполнения в этом состоянии.

  • Как объекты класса создаются и удаляются? Объект должен выжить, пока операция не завершится или не будет прекращена ... центральная ловушка C ++. Я бы порекомендовал реализовать общую схему управления этим. Создайте список «вещей, которые нужно будет удалить» и убедитесь, что это происходит автоматически в определенных безопасных точках, то есть постарайтесь максимально приблизиться к GC, насколько это возможно. Чем дальше вы находитесь от этого, тем больше памяти вы утечете.

...