Хорошо, проблема, кажется, в вашем методе сигнала ThreadPool #. Что может случиться так:
1 - все ваши работники заняты, и вы пытаетесь обработать новую работу
2 - строка 90 получает ноль работников
3 - рабочий освобождается и сигнализирует об этом, но сигнал теряется, поскольку ThreadPool его не ждет
4 - вы попадаете на линию 95, ожидая, хотя есть свободный работник.
Ошибка здесь в том, что вы можете подать сигнал свободному работнику, даже когда никто не слушает. Этот метод сигнала ThreadPool # должен быть:
def signal
@mutex.synchronize { @cv.signal }
end
И проблема та же в объекте Worker. Что может случиться так:
1 - работник только что завершил работу
2 - Он проверяет (строка 17), есть ли ожидание работы: нет
3 - Пул потоков отправляет новое задание и сигнализирует об этом ... но сигнал теряется
4 - работник ожидает сигнала, даже если он помечен как занятый
Вы должны указать метод инициализации следующим образом:
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
@mutex.synchronize do
while @running
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@cv.wait(@mutex)
end
end
end
end
end
Затем методы Worker # get_block и Worker # reset_block больше не должны синхронизироваться. Таким образом, вы не можете назначить блок работнику между тестом блока и ожиданием сигнала.