Самый простой способ устранить тупик - это использовать std::lock(l1, l2)
, который был изобретен именно для этой цели.
Изменить:
chopstick[i].lock();
cout << i << ": takes: " << i << endl;
chrono::milliseconds dur(20);
this_thread::sleep_for(dur); //Leads to deadlock immediately
chopstick[(i + 1) % 5].lock();
на:
std::lock(chopstick[i], chopstick[(i + 1) % 5]);
cout << i << ": takes: " << i << endl;
Это простое решение, которое игнорирует безопасность исключений, и это хорошо для того, чтобы записать ваш первый урок по предотвращению тупиковых ситуаций.
Чтобы сделать его безопасным для исключений, мьютексы должны быть заключены вУстройство RAII: std::unique_lock
:
unique_lock<mutex> left{chopstick[i], defer_lock};
unique_lock<mutex> right{chopstick[(i + 1) % 5], defer_lock};
lock(left, right);
cout << i << ": takes: " << i << endl;
И затем вы также должны удалить явные операторы unlock
, поскольку деструкторы left
и right
позаботятся об этом.
Теперь, если что-то выдает исключение в go
, деструкторы для left
и right
разблокируют мьютексы по мере распространения исключения.
Чтобы узнать больше о том, что происходит под капотомstd::lock
, чтобы избежать тупика, см .: http://howardhinnant.github.io/dining_philosophers.html
Тест производительности
Вот быстрый и простой тест для сравнения использования std::lock
с болеетрадиционный совет «закажи мьютексы».
#ifndef USE_STD_LOCK
# error #define USE_STD_LOCK as 1 to use std::lock and as 0 to use ordering
#endif
#include <atomic>
#include <chrono>
#include <exception>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <thread>
std::thread task[5];
constexpr auto N = sizeof(task)/sizeof(task[0]);
std::mutex chopstick[N];
std::atomic<bool> stop{false};
unsigned long long counts[N] = {};
using namespace std::chrono_literals;
void
go(decltype(N) i)
{
auto const right = (i + 1) % N;
decltype(right) const left = i;
while (!stop)
{
#if USE_STD_LOCK
std::lock(chopstick[left], chopstick[right]);
#else
if (left < right)
{
chopstick[left].lock();
chopstick[right].lock();
}
else
{
chopstick[right].lock();
chopstick[left].lock();
}
#endif
std::lock_guard<std::mutex> l1{chopstick[left], std::adopt_lock};
std::lock_guard<std::mutex> l2{chopstick[right], std::adopt_lock};
++counts[i];
std::this_thread::sleep_for(1ms);
}
}
void
deadlock_detector(std::chrono::seconds time_out)
{
std::this_thread::sleep_for(time_out);
std::cerr << "Deadlock!\n";
std::terminate();
}
int
main()
{
for (auto i = 0u; i < N; ++i)
task[i] = std::thread{go, i};
std::thread{deadlock_detector, 15s}.detach();
std::this_thread::sleep_for(10s);
stop = true;
for (auto& t : task)
t.join();
std::cout << std::right;
for (auto c : counts)
std::cout << std::setw(6) << c << '\n';
auto count = std::accumulate(std::begin(counts), std::end(counts), 0ULL);
std::cout << "+ ----\n";
std::cout << std::setw(6) << count << '\n';
}
Это должно быть скомпилировано с USE_STD_LOCK
определено:
#define USE_STD_LOCK 0
, чтобы упорядочить мьютексы и заблокировать их по порядку. #define USE_STD_LOCK 1
, чтобы заблокировать мьютексы с помощью std::lock
.
Программа работает в течение 10 секунд, причем каждый поток увеличивает значение unsigned long long
как можно чаще.Но затем, чтобы сделать вещи немного более драматичными, каждая нить также спит в течение 1 мс, удерживая при этом и блокировки (запускайте без этого сна, если хотите).
После 10 с, main
сообщает всем, что сдвигover и подсчитывает результат для каждого потока, а также общее приращение для всех потоков.Чем выше, тем лучше.
Работая с включенной оптимизацией, я получаю такие числа:
USE_STD_LOCK = 1
3318
2644
3254
3004
2876
+ ----
15096
USE_STD_LOCK = 0
19
96
1641
5885
50
+ ----
7691
Обратите внимание, что использование std::lock
не только приводит к увеличению совокупного результата на намного , но и к тому, что каждый поток вносит примерно одинаковую сумму в общую сумму.В отличие от этого, «упорядочение» имеет тенденцию отдавать предпочтение одному потоку, который в некоторых случаях может сильно истощить другие.
Это на 4-ядерном ядре Intel i5.Я объясняю разницу наличием нескольких ядер, чтобы по крайней мере два потока могли работать одновременно.Если это выполняется на одноядерном устройстве, я не ожидал бы этой разницы (я не проверял эту конфигурацию).
Я также снабдил тест с помощью детектора взаимоблокировки.Это не влияет на результаты, которые я получаю.Он предназначен для того, чтобы люди могли экспериментировать с другими алгоритмами блокировки и быстрее определять, заблокирован ли тест.Если этот тупиковый детектор вас как-то беспокоит, просто удалите его из тестового прогона.У меня нет желания обсуждать его достоинства.
Я приветствую конструктивную обратную связь, если вы получаете схожие результаты или другие.Или, если вы думаете, что этот тест так или иначе смещен, и как его можно улучшить.