Каков наилучший способ синхронизации доступа контейнера между несколькими потоками в приложении реального времени - PullRequest
10 голосов
/ 16 января 2010

У меня в приложении std::list<Info> infoList, который разделен между двумя потоками. Эти 2 темы обращаются к этому списку следующим образом:

Тема 1 : использует push_back(), pop_front() или clear() в списке (в зависимости от ситуации) Поток 2 : использует iterator для перебора элементов в списке и выполнения некоторых действий.

Поток 2 выполняет итерацию списка следующим образом:

for(std::list<Info>::iterator i = infoList.begin(); i != infoList.end(); ++i)
{
  DoAction(i);
}

Код скомпилирован с использованием GCC 4.4.2.

Иногда ++ i вызывает segfault и вылетает из приложения. Ошибка вызвана в строке 143 std_list.h в следующей строке:

_M_node = _M_node->_M_next;

Полагаю, это гоночное состояние. Список мог измениться или даже очиститься потоком 1, пока поток 2 его итерировал.

Я использовал Mutex для синхронизации доступа к этому списку, и все прошло нормально во время моего начального теста. Но система просто зависает при стресс-тестировании, что делает это решение совершенно неприемлемым. Это приложение в режиме реального времени, и мне нужно найти решение, чтобы оба потока могли работать максимально быстро, не влияя на общую пропускную способность приложений.

Мой вопрос такой: Поток 1 и поток 2 должны выполняться максимально быстро, поскольку это приложение в реальном времени. Что я могу сделать, чтобы предотвратить эту проблему и сохранить производительность приложения? Существуют ли алгоритмы без блокировок для решения такой проблемы?

Это нормально, если я пропускаю некоторые недавно добавленные Info объекты в итерации потока 2, но что я могу сделать, чтобы итератор не стал висящим указателем?

Спасибо

Ответы [ 12 ]

5 голосов
/ 16 января 2010

Ваш цикл for () потенциально может сохранять блокировку в течение относительно длительного времени, в зависимости от того, сколько элементов он повторяет. Вы можете столкнуться с серьезными проблемами, если он «опрашивает» очередь, постоянно проверяя, стал ли доступным новый элемент. Это заставляет поток владеть мьютексом в течение неоправданно долгого времени, предоставляя потоку продюсера мало возможностей проникнуть и добавить элемент. И сжигание большого количества ненужных циклов ЦП в процессе.

Вам нужна «ограниченная очередь блокировки». Не пишите это сами, дизайн замка не тривиален. Трудно найти хорошие примеры, большинство из которых .NET-код. Эта статья выглядит многообещающе.

4 голосов
/ 16 января 2010

В общем случае использование контейнеров STL небезопасно. Вам нужно будет реализовать конкретный метод, чтобы сделать ваш поток кода безопасным. Решение, которое вы выберете, зависит от ваших потребностей. Я бы, вероятно, решил это, поддерживая два списка, по одному в каждом потоке. И сообщать об изменениях через очередь без блокировки (упоминается в комментариях к этому вопросу). Вы также можете ограничить время жизни ваших объектов Info, поместив их в boost :: shared_ptr, например.

typedef boost::shared_ptr<Info> InfoReference; 
typedef std::list<InfoReference> InfoList;

enum CommandValue
{
    Insert,
    Delete
}

struct Command
{
    CommandValue operation;
    InfoReference reference;
}

typedef LockFreeQueue<Command> CommandQueue;

class Thread1
{
    Thread1(CommandQueue queue) : m_commands(queue) {}
    void run()
    {
        while (!finished)
        {
            //Process Items and use 
            // deleteInfo() or addInfo()
        };

    }

    void deleteInfo(InfoReference reference)
    {
        Command command;
        command.operation = Delete;
        command.reference = reference;
        m_commands.produce(command);
    }

    void addInfo(InfoReference reference)
    {
        Command command;
        command.operation = Insert;
        command.reference = reference;
        m_commands.produce(command);
    }
}

private:
    CommandQueue& m_commands;
    InfoList m_infoList;
}   

class Thread2
{
    Thread2(CommandQueue queue) : m_commands(queue) {}

    void run()
    {
        while(!finished)
        {
            processQueue();
            processList();
        }   
    }

    void processQueue()
    {
        Command command;
        while (m_commands.consume(command))
        {
            switch(command.operation)
            {
                case Insert:
                    m_infoList.push_back(command.reference);
                    break;
                case Delete:
                    m_infoList.remove(command.reference);
                    break;
            }
        }
    }

    void processList()
    {
        // Iterate over m_infoList
    }

private:
    CommandQueue& m_commands;
    InfoList m_infoList;
}   


void main()
{
CommandQueue commands;

Thread1 thread1(commands);
Thread2 thread2(commands);

thread1.start();
thread2.start();

waitforTermination();

}

Это не скомпилировано. Вам все еще нужно убедиться, что доступ к вашим Info объектам является потокобезопасным.

3 голосов
/ 16 января 2010

Я хотел бы знать, какова цель этого списка, тогда было бы легче ответить на вопрос.

Как сказал Хоар, обычно плохая идея пытаться обмениваться данными для обмена между двумя потоками, скорее вам следует общаться для обмена данными: например, обмен сообщениями.

Если этот список моделирует очередь, например, вы можете просто использовать один из различных способов связи (например, сокетов) между двумя потоками. Потребитель / Производитель является стандартной и общеизвестной проблемой.

Если ваши предметы дорогие, то во время общения указывайте только указатели, вы не будете копировать сами предметы.

В общем, делиться данными крайне сложно, хотя, к сожалению, это единственный способ программирования, о котором мы слышали в школе. Обычно только низкоуровневая реализация «каналов» связи должна беспокоиться о синхронизации, и вы должны научиться использовать каналы для связи вместо того, чтобы пытаться эмулировать их.

1 голос
/ 30 ноября 2010

Если вы используете C ++ 0x, вы можете внутренне синхронизировать итерацию списка следующим образом:

Предполагается, что класс имеет шаблонный список с именем objects_ и boost :: mutex с именем mutex_

Метод toAll является методом-членом оболочки списка

 void toAll(std::function<void (T*)> lambda)
 {
 boost::mutex::scoped_lock(this->mutex_);
 for(auto it = this->objects_.begin(); it != this->objects_.end(); it++)
 {
      T* object = it->second;
      if(object != nullptr)
      {
                lambda(object);
           }
      }
 }

Призвание:

synchronizedList1->toAll(
      [&](T* object)->void // Or the class that your list holds
      {
           for(auto it = this->knownEntities->begin(); it != this->knownEntities->end(); it++)
           {
                // Do something
           }
      }
 );
1 голос
/ 17 января 2010

Другие уже предложили альтернативы без блокировок, поэтому я отвечу так, как будто вы застряли с использованием блокировок ...

Когда вы изменяете список, существующие итераторы могут стать недействительными, потому что они больше не указывают на допустимую память (список автоматически перераспределяет больше памяти, когда ему нужно увеличиваться). Чтобы предотвратить недействительные итераторы, вы могли бы сделать блок производителя на мьютексе, пока ваш потребитель перебирает список, но это будет ненужное ожидание для производителя.

Жизнь была бы проще, если бы вы использовали очередь вместо списка, и ваш потребитель использовал синхронизированный вызов queue<Info>::pop_front() вместо итераторов, которые могут быть признаны недействительными за вашей спиной. Если вашему потребителю действительно нужно обрабатывать куски Info за один раз, то используйте условную переменную , которая блокирует вашего потребителя до queue.size() >= minimum.

Библиотека Boost имеет удобную переносимую реализацию условных переменных (которая работает даже с более старыми версиями Windows), а также обычную библиотеку потоков .

Для очереди производитель-потребитель, которая использует (устаревшую) блокировку, проверьте шаблонный класс BlockingQueue библиотеки ZThreads . Я сам не использовал ZThreads, опасаясь нехватки последних обновлений и потому, что он, похоже, не получил широкого распространения. Тем не менее, я использовал его как источник вдохновения для создания собственной поточно-ориентированной очереди производителя-потребителя (до того, как я узнал о безблокированных очередях и TBB ).

Библиотека очереди / стека без блокировки, по-видимому, находится в очереди просмотра Boost. Будем надеяться, что мы увидим новый Boost.Lockfree в ближайшем будущем! :)

Если есть интерес, я могу написать пример блокировки очереди, которая использует std :: queue и блокировку Boost-потока.

EDIT

В блоге, на который ссылается ответ Рика, уже есть пример блокировки очереди, в котором используются condvars std :: queue и Boost. Если вашему потребителю нужны куски сожрать, вы можете расширить пример следующим образом:

void wait_for_data(size_t how_many)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.size() < how_many)
        {
            the_condition_variable.wait(lock);
        }
    }

Вы также можете настроить его, чтобы разрешить тайм-ауты и отмены.

Вы упомянули, что скорость была проблемой. Если ваши Info в полутяжелом весе, вам следует рассмотреть возможность передачи их на shared_ptr. Вы также можете попробовать сделать фиксированный размер Info s и использовать пул памяти (который может быть намного быстрее, чем куча).

1 голос
/ 17 января 2010

Лучший способ сделать это - использовать контейнер с внутренней синхронизацией. TBB и Microsoft concurrent_queue делают это. У Энтони Уильямса также есть хорошая реализация в его блоге здесь

1 голос
/ 16 января 2010

Как вы упомянули, что вам все равно, если ваш итерирующий потребитель пропустит некоторые недавно добавленные записи, вы можете использовать список copy-on-write внизу. Это позволяет итерирующему потребителю работать с непротиворечивым моментальным снимком списка с момента его первого запуска, в то время как в других потоках обновления списка дают свежие, но непротиворечивые копии, не нарушая ни одного из существующих снимков.

Торговля здесь заключается в том, что каждое обновление списка требует блокировки для монопольного доступа достаточно долго, чтобы скопировать весь список. Эта техника склонна к тому, чтобы иметь много одновременных читателей и реже обновлять.

Попытка сначала добавить внутреннюю блокировку к контейнеру требует от вас подумать о том, какие операции должны вести себя в атомарных группах. Например, проверка, является ли список пустым, прежде чем пытаться выскочить из первого элемента, требует атомарной операции pop-if-not-empty ; в противном случае ответ на пустой список может измениться между тем, когда вызывающий абонент получает ответ и пытается действовать в соответствии с ним.

В приведенном выше примере неясно, что гарантирует выполнение итерации. Должен ли каждый элемент в списке в конечном итоге посещаться итерационным потоком? Это делает несколько проходов? Что означает, что один поток удаляет элемент из списка, пока другой поток запускает DoAction() против него? Я подозреваю, что проработка этих вопросов приведет к значительным изменениям в дизайне.


Вы работаете в C ++, и вы упомянули о необходимости очереди с операцией pop-if-not-empty . Я написал очередь с двумя замками много лет назад с использованием ACE Library примитивов параллелизма, так как библиотека Boost thread не была еще готовый к использованию, и возможность создания стандартной библиотеки C ++, включающей такие средства, была несбыточной мечтой. Перенести его на что-то более современное было бы легко.

Эта моя очередь, называемая concurrent::two_lock_queue - позволяет получить доступ к началу очереди только через RAII. Это гарантирует, что получение блокировки для чтения головы всегда будет сопряжено с снятием блокировки. Потребитель создает объект const_front (постоянный доступ к элементу head), front (неконстантный доступ к элементу head) или renewable_front (неконстантный доступ к элементам head и преемнику) для представления исключительного право доступа к головному элементу очереди. Такие «передние» объекты не могут быть скопированы.

Class two_lock_queue также предлагает функцию pop_front(), которая ожидает, пока будет удален хотя бы один элемент, но в соответствии с std::queue и std::stack * * * * * * * * * * * * * * * * * * *

* * * * * * * * * * * * * * * * * * * * *1046* * * * * * * * * * * * * * * * * * * * * * * * 1046

В сопутствующем файле есть тип с именем concurrent::unconditional_pop, который позволяет через RAII гарантировать, что элемент head очереди будет вытолкнут при выходе из текущей области.

Сопутствующий файл error.hh определяет исключения, возникающие при использовании функции two_lock_queue::interrupt(), используемой для разблокировки потоков, ожидающих доступа к главе очереди.

Посмотрите на код и дайте мне знать, если вам нужно больше объяснений о том, как его использовать.

1 голос
/ 16 января 2010

Вы должны решить, какая нить является более важной. Если это поток обновления, то он должен дать сигнал потоку итератора остановиться, подождать и начать снова. Если это поток итератора, он может просто заблокировать список до завершения итерации.

1 голос
/ 16 января 2010

Чтобы не допустить аннулирования вашего итератора, вы должны заблокировать весь цикл for. Теперь я думаю, что в первом обсуждении могут возникнуть проблемы с обновлением списка. Я бы попытался дать ему возможность выполнять свою работу на каждой (или на каждой N-й итерации).

В псевдокоде, который будет выглядеть так:

mutex_lock();
for(...){
  doAction();
  mutex_unlock();
  thread_yield();  // give first thread a chance
  mutex_lock();
  if(iterator_invalidated_flag) // set by first thread
    reset_iterator();
}
mutex_unlock();
0 голосов
/ 16 января 2010

Если вы хотите продолжить использовать std::list в многопоточной среде, я бы рекомендовал поместить его в класс с мьютексом, который обеспечивает к нему заблокированный доступ. В зависимости от точного использования может иметь смысл переключиться на модель очереди, управляемую событиями, где сообщения передаются в очередь, которую используют несколько рабочих потоков (подсказка: производитель-потребитель).

Я бы серьезно отнесся к мысли Матье . Многие проблемы, которые решаются с помощью многопоточного программирования, лучше решать с помощью передачи сообщений между потоками или процессами. Если вам нужна высокая пропускная способность и абсолютно не требуется, чтобы обработка разделяла одно и то же пространство памяти, рассмотрите вариант использования интерфейса передачи сообщений (MPI) вместо использования собственного многопоточного решения. Существует множество реализаций C ++ - OpenMPI , Boost.MPI , Microsoft MPI и т. Д. И т. Д.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...