Синхронизация запуска потока - PullRequest
0 голосов
/ 30 декабря 2018

Я выполняю некоторый код (упрощенный, но все еще ломающий версию ниже), который завершается ошибкой в ​​~ 1/3000 выполнениях в ожидании первого переключения.Что должно произойти:

  • threads[0] запускается и захватывает мьютекс
  • threads[0] уведомляет cond_main, чтобы основной поток мог создать thread[1]
  • thread[1] / thread[0] выполнить некоторую работу, ожидая сигналов друг друга

К сожалению, это не удается в thread[0] - cond.wait заканчивается тайм-аутом и вызывает исключение.Как бы я синхронизировал это, убедившись, что cond_main не получает сигнал слишком рано?

В идеале я хотел бы передать заблокированный мьютекс из основного потока и разблокировать его в первом порожденном потоке, но Rubyтребует, чтобы мьютексы были разблокированы в том же потоке - так что это не сработает.

Автономный репродуктор (сам по себе не имеет особого смысла, но реальная работа исключена):

def run_test
  mutex     = Mutex.new
  cond      = ConditionVariable.new
  cond_main = ConditionVariable.new
  threads   = []

  t1_done = false
  t2_done = false

  threads << Thread.new do
    mutex.synchronize do
      # this needs to happen first
      cond_main.signal
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t2_done

      # some work
      t1_done = true
      cond.signal
    end
  end
  cond_main.wait(Mutex.new.lock, 2)

  threads << Thread.new do
    mutex.synchronize do
      cond.signal
      # some work
      t2_done = true
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t1_done
    end
  end

  threads.map(&:join)
end

5000.times { |x|
  puts "Run #{x}"
  run_test
}

Проверено на Ruby 2.5.3

1 Ответ

0 голосов
/ 30 декабря 2018

Установить блок while, чтобы остановить ожидание, если закончится второй поток (см. Подробнее здесь ):

def run_test
  mutex     = Mutex.new
  cond      = ConditionVariable.new
  cond_main = ConditionVariable.new
  threads   = []

  spawned = false

  t1_done = false
  t2_done = false

  threads << Thread.new do
    mutex.synchronize do
      while(!spawned) do
        cond.wait(mutex, 2)
      end
      raise 'timeout waiting for switch' if !t2_done

      # some work
      t1_done = true
      cond.signal
    end
  end

  threads << Thread.new do
    mutex.synchronize do
      spawned = true
      cond.signal
      # some work
      t2_done = true
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t1_done
    end
  end

  threads.map(&:join)
end

50000.times { |x| 
  puts x 
  run_test 
}

В качестве альтернативы, используя семафор подсчета , мы можемназначить потокам некоторые приоритеты:

require 'concurrent-ruby'

def run_test
  mutex     = Mutex.new
  sync      = Concurrent::Semaphore.new(0)
  cond      = ConditionVariable.new
  cond_main = ConditionVariable.new
  threads   = []

  t1_done = false
  t2_done = false

  threads << Thread.new do
    mutex.synchronize do
      sync.release(1)
      # this needs to happen first
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t2_done

      # some work
      t1_done = true
      cond.signal
    end
  end

  threads << Thread.new do
    sync.acquire(1)
    mutex.synchronize do
      cond.signal
      # some work
      t2_done = true
      cond.wait(mutex, 2)
      raise 'timeout waiting for switch' if !t1_done
    end
  end

  threads.map(&:join)
end

50000.times { |x| 
  puts x 
  run_test 
}

Я предпочитаю второе решение, так как оно позволяет вам контролировать порядок ваших потоков, хотя и выглядит немного грязнее.

Как любопытство,в Ruby 2.6 ваш код не вызывает исключений (проверено> 10M запусков).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...