Выделенный поток (один поток на соединение) с возможностью буферизации (c / c ++) - PullRequest
3 голосов
/ 12 февраля 2011

Мой процесс считывает из одной очереди задачи, которые необходимо отправить нескольким получателям. Нам необходимо поддерживать порядок между задачами (т. Е. Задача, поступившая в очередь в 00:00, должна быть отправлена ​​до задачи, поступившей в 00:01), поэтому мы не можем использовать пул потоков. Заказ должен быть сохранен для каждого пункта назначения.

Одним из решений является создание выделенного потока для пункта назначения. Основной поток читает Задача из очереди и в зависимости от пункта назначения находит правильный поток.

У этого решения есть проблема: если рабочий поток занят, главный поток останется заблокированным, что замедлит работу системы. Что мне нужно, это новая очередь на поток. Мастер поток делит ресурсы с очередями, и рабочий поток читает новые очереди для входящих сообщения ...

Я хотел бы поделиться своими мыслями с SO-сообществом, и я ищу решение C / C ++, близкое мне. Есть ли библиотека, которая реализует такую ​​модель?

Ответы [ 2 ]

4 голосов
/ 12 февраля 2011

Дизайн, который вы хотите, довольно прост;Я думаю, что вы, вероятно, сможете написать нужный код и заставить его работать через час или два.Поиск сторонней библиотеки для реализации этого, вероятно, является излишним (если только я не неправильно понимаю проблему).

В частности, для каждого «рабочего» потока вам нужна структура данных FIFO (например, std :: queue)Mutex и механизм, который «главный» поток может использовать, чтобы сигнализировать потоку о необходимости пробуждения и проверке структуры данных на наличие новых сообщений (например, переменной условия, семафора или даже пары сокетов, которую рабочий блокирует при чтении, и мастер может отправить байт, чтобы разбудить рабочего).

Затем, чтобы отправить задачу в определенный рабочий поток, мастер будет делать что-то вроде этого (псевдокод):

struct WorkerThreadData & workerThread = _workerThreads[threadIndexIWantToSendTo];
workerThread.m_mutex.Lock();
workerThread.m_incomingTasksList.push_back(theNewTaskObject);
workerThread.m_mutex.Unlock();
workerThread.m_signalMechanism.SignalThreadToWakeUp();  // make sure the worker looks at the task list!

... и каждый рабочий поток будет иметь цикл событий, подобный следующему:

struct WorkerThreadData & myData = _workerThreads[myWorkerIndex];
TaskObject * taskObject;
while(1)
{
   myData.m_signalMechanism.WaitForSignal();  // block until the main thread wakes me up

   myData.m_mutex.Lock();
   taskObject = (myData.m_incomingTasks.length() > 0) ? myData.m_incomingTasks.pop_front() : NULL;
   myData.m_mutex.Unlock();

   if (taskObject) 
   {
      taskObject->DoTheWork();
      delete taskObject;
   }
}

Это никогда не заблокирует главный поток (в течение значительного промежутка времени), так как Mutex удерживается только оченькратко кем-либо.В частности, рабочие потоки не удерживают мьютекс во время работы над объектом задачи.

2 голосов
/ 12 февраля 2011

«Необходимость поддерживать порядок» означает, что вы будете выполнять задачи последовательно, независимо от того, сколько у вас потоков.В таком случае вам, вероятно, лучше всего иметь только один поток, обслуживающий запросы.

Вы могли бы получить что-то , если требование немного более слабое, чем это, например, есливсе задачи для одного пункта назначения должны оставаться в порядке, но для задач с разными пунктами назначения требования к упорядочению отсутствуют.Если это так, то ваше решение главной очереди, отправляющей задачи во входную очередь для каждого отдельного потока, звучит довольно неплохо.

Редактировать:

Указание количества потоков /динамически мьютекс это довольно легко.Например, чтобы взять число из командной строки, вы могли бы сделать что-то в этом порядке (не включая проверку ошибок и здравомыслие на данный момент):

std::vector<pthread_t> threads;

int num_threads = atoi(argv[1]);
threads.resize(num_threads);

for (int i=0; i<num_threads; i++)
    pthread_create(&threads[i], NULL, thread_routine, NULL);
...